Source code for node_query_handler

# -*- coding: UTF-8 -*-

"""
| This Modules handles all the query to the nodes.
| it will receive the data from the router_handler and it will send the query to the nodes.
| It will also retry sending the message if the node doesn't answer right or if doesn't answer at all.
| If the nodes confirm the command was received then this module will tell onos to set the web object status 
| to reflect the new node status after the command was received.
|
"""


from conf import *

#import pyserial_port



[docs]def make_query_to_radio_node(serialCom,node_serial_number,query,number_of_retry_already_done): """ | This function make a query to a radio/serial node and wait the answer from the serial gateway. | If the answer is positive | it will add to the priorityCmdQueue the command to change the web_object status | from pending to succesfully changed . | If the answer from the node is an error or the node is not responding | the query will be repeated x times before giving up. """ logprint("make_query_to_radio_node executed with number_of_retry_already_done:"+str(number_of_retry_already_done)) max_retry=1 for m in range(0,max_retry): #retry n times to get the answer from node #retry n times to get the answer from node # [S_001dw06001_#] node_address=nodeDict[node_serial_number].getNodeAddress() query=query[0:3]+node_address+query[6:] #change the address query if the node get a new one data="" end_of_query=query.find("_#]") # [S_ok003sr070811_#] expected_confirm="[S_ok"+query[3:end_of_query+3] #if data.find("ok"+query[3:end_of_query+3])!=-1: if number_of_retry_already_done!=0: #look if the node has already answer the previous query.. copy_of_readed_packets_list=serialCom.uart.readed_packets_list i=len(serialCom.uart.readed_packets_list)-1 while i>0: #iterate the list from the last element to the first try: a=serialCom.uart.readed_packets_list[i] logprint("check of all received answers000000000 current one was:"+str(a)) if a.find(expected_confirm)!=-1 : #found the answer serialCom.uart.readed_packets_list.remove(a) logprint("I have found the answer I was looking for") return (a) if a=="[S_ertx1_#]": serialCom.uart.readed_packets_list.remove(a) if a=="[S_nocmd0_#]": serialCom.uart.readed_packets_list.remove(a) i=i-1 except Exception as e : message="make_query_to_radio_node" logprint(message,verbose=8,error_tuple=(e,sys.exc_info())) return(-1) time.sleep(0.4) #if serialCom.uart.ser.isOpen() == False : # print "serial port is not open in make_query_to_radio_node()" #priorityCmdQueue.put( {"cmd":"reconnectSerialPort"}) # time.sleep(1) #return(-1) try: data=serialCom.uart.write(query) #data=pyserial_port.writeToSerial(query) except Exception as e: message="error writing to serial port, data to send:"+query+", at:"+getErrorTimeString() logprint(message,verbose=8,error_tuple=(e,sys.exc_info())) #time.sleep(0.1*m) # time.sleep(2) if data.find(expected_confirm)!=-1: return(data) #if data=="error_reception": # continue # return(1) #print "expected confirm:"+expected_confirm #print "uart rx list:" #print serialCom.uart.readed_packets_list # with lock_serial_input: #for a in serialCom.uart.readed_packets_list.: copy_of_readed_packets_list=serialCom.uart.readed_packets_list for a in copy_of_readed_packets_list: #iterate the list from the last element to the first #a=serialCom.uart.readed_packets_list[i] logprint("check of all received answers current was:"+str(a)) if a.find(expected_confirm)!=-1 : #found the answer serialCom.uart.readed_packets_list.remove(a) logprint("I have found the answer I was looiking for") return (a) try: if a=="[S_ertx1_#]": serialCom.uart.readed_packets_list.remove(a) if a=="[S_nocmd0_#]": serialCom.uart.readed_packets_list.remove(a) except: logprint("error serialCom.uart.readed_packets_list.remove(a) ") #print "uart rx list after:" #print serialCom.uart.readed_packets_list #print "answer received from serial port is wrong:'"+data+"'end_data, trying query the serial node the expected answer was:"+expected_confirm+",the number of try is "+str(m) #errorQueue.put("answer received from serial port is wrong:'"+data+"', trying query the serial node the expected answer was:'"+expected_confirm+"'the number of try is "+str(m)+" at:" +getErrorTimeString() ) #time.sleep(0.2*m) logprint("Great serial error,answer received from serial port was wrong:"+data+"end_data, trying query the serial,node the query was"+query+"the number of try was "+str(max_retry)+" at:" +getErrorTimeString() ) return(-1)
[docs]def make_query_to_network_node(node_serial_number,query,objName,status_to_set,user,priority,mail_report_list): """ | This function make a query to a powerline/ethernet node and wait the answer from the node. | If the answer is positive | it will add to the priorityCmdQueue the command to change the web_object status | from pending to succesfully changed . | If the answer from the node is an error or the node is not responding | the query will be repeated x times before giving up. """ #time.sleep(0.1) logprint("make_query_to_network_node() thread executed") logprint( "i try this query:"+query) timeout=0.1 html_response="local_error_in_router_handler_cant_connect_to_node" #wait_timeout=1000 for m in range(0,8): #retry n times to get the answer from node node_address=nodeDict[node_serial_number].getNodeAddress() #update the node address ..maybe has changed.. logprint("connection try number:"+str(m)+"to ip number"+str(node_address) ) html_response="local_error_in_router_handler_cant_connect_to_node" received_answer="" flag=0 while (wait_because_node_is_talking==1): #the node is talking to onos...wait ...banana to make it for each node.. logprint("i wait_because_node_is_talking ..............") time.sleep(0.1) flag=1 if flag==1: time.sleep(0.2) try: s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # to reuse the same address...prevent address already in use error s.connect((node_address,node_webserver_port)) # Protocol exchange - sends and receives time_start=time.time() s.settimeout(4) #timeout of 2 second ,don't change this! s.sendall(query) while (exit==0): logprint("s.recv(1024aaaa)") resp="" try: resp = s.recv(1024) except Exception as e : message="error0 in make_query_to_network_node() router_handler class trying to query a node the query was"+query+"the number of try is "+str(m) logprint(message,verbose=9,error_tuple=(e,sys.exc_info())) break logprint("after s.recv(1024)") received_answer=received_answer+resp #if (time.time()> (time_start+wait_timeout) ): # print "timeout waiting for answer from node....................................................................." # errorQueue.put("timeout waiting for answer from node, the query was:"+query) # break if received_answer.find("_#]")!=-1: break logprint(resp) if received_answer.find("ok")!=-1: # print "message sent" m=1000 break # Close the connection when completed s.close() logprint("\ndone") except Exception as e : message="error2 in make_query_to_network_node() router_handler class trying to query a node the query was"+query+"the number of try is "+str(m)+"at:"+getErrorTimeString() logprint(message,verbose=9,error_tuple=(e,sys.exc_info())) s.close() if m>5: timeout=2 time.sleep(timeout) continue else: # the connection was succesfull if received_answer.find("ok_#]")!=-1: logprint("msg sent correctly") html_response=received_answer priorityCmdQueue.put( {"cmd":"setSts","webObjectName":objName,"status_to_set":status_to_set,"write_to_hw":0,"user":user,"priority":priority,"mail_report_list":mail_report_list }) return() #break else: html_response=received_answer logprint("answer received is wrong:"+received_answer) time.sleep(timeout) continue logprint("great error the node did not answer also if tried :"+str(m)+"times, the node will not be setted anymore(probably is not connected)") return()
[docs]def handle_new_query_to_radio_node_thread(serialCom): """ | This is a thread function that will run until every request in the queryToRadioNodeQueue is done. | It will get each query from queryToRadioNodeQueue and call make_query_to_radio_node() | While the query is running the current_node_handler_list will contain the node serialnumber being queried | In this way onos will avoid to make multiple simultaneos query to the same node. """ logprint("executed handle_new_query_to_radio_node_thread() ") global node_query_radio_threads_executing global next_node_free_address_list global nodeDict node_query_radio_threads_executing=1 time_of_write=time.time() #after n query sent wait a moment to let the remote nodes starts the tranmissions old_time_of_write=time.time() time_waiting_for_incoming_msg=time.time() threshold_of_time_query=0.1 while not queryToRadioNodeQueue.empty(): time.sleep(0.3) #need this to allow the serial node to pick up the messages from the radio nodes.. #query_sent_before_delay=query_sent_before_delay+1 time_of_write=time.time() old_time_of_write=time_of_write #time_waiting_for_incoming_msg=time.time() if (time.time()-time_waiting_for_incoming_msg-threshold_of_time_query)>(time_of_write-old_time_of_write): logprint("wait to allow rx from radio nodes") time.sleep(0.7) #need this to allow the serial node to pick up the messages from the radio nodes.. old_time_of_write=time.time() time_waiting_for_incoming_msg=time.time() time_of_write=time.time() query_sent_before_delay=0 currentRadioQueryPacket=queryToRadioNodeQueue.get() #get the tuple: #((query_order,query,node_serial_number,number_of_retry_done,query_time,objName,status_to_set,user,priority,mail_report_list,cmd)) query_order=currentRadioQueryPacket[0] query=currentRadioQueryPacket[1] node_serial_number=currentRadioQueryPacket[2] number_of_retry_done=currentRadioQueryPacket[3] query_time=currentRadioQueryPacket[4] objName=currentRadioQueryPacket[5] status_to_set=currentRadioQueryPacket[6] user=currentRadioQueryPacket[7] priority=currentRadioQueryPacket[8] mail_report_list=currentRadioQueryPacket[9] cmd=currentRadioQueryPacket[10] node_address=nodeDict[node_serial_number].getNodeAddress() query=query[0:3]+node_address+query[6:] #change the address query if the node get a new one #if number_of_retry_done>0: #else: # query_order=query_time-currentRadioQueryPacket[0] # i used time_when_the_query_was_created - priority.. logprint("current query_order:"+str(query_order)+"for query:"+query) node_address=nodeDict[node_serial_number].getNodeAddress() query_answer=make_query_to_radio_node(serialCom,node_serial_number,query,number_of_retry_done) if query_answer==-1 : #invalid answer received logprint("error query_answer wrong UUUUUUUUuuuuuuuuuuuuuuUUUUUUUUUUUUuuuuuuuuuuu",verbose=4) number_of_retry_done=number_of_retry_done+1 if priority==99: #if the priority is 99 then the query will be always retrayed infinites times. query_order=time.time()+1 #make the query less important..to allow other queries to run if number_of_retry_done>35: #if greater that n wait a bit logprint("sleep a bit because number_of_retry_done>35") time.sleep(0.2) #need this to allow the serial node to pick up the messages from the radio nodes.. else: query_order=time.time()+queryToRadioNodeQueue.qsize() #make the query less important..to allow other queries to run if number_of_retry_done>25: #if greater that n don't repeat the query. logprint("i retried the query:"+query+"more than 15 times , I giveup",verbose=10) continue if (time.time()-query_time )>500: #if more than n seconds has passed since the query was made the first time..don't repeat the query. logprint("i retried the query "+query+"more than 100 seconds , I giveup",verbose=10) continue queryToRadioNodeQueue.put((query_order,query,node_serial_number,number_of_retry_done,query_time,objName,status_to_set,user,priority,mail_report_list,cmd)) else:##if the query was accepted from the radio/serial node #since onos was able to talk to the node I update the LastNodeSync layerExchangeDataQueue.put( {"cmd":"updateNodeAddress","nodeSn":node_serial_number,"nodeAddress":node_address}) if cmd=="set_address": new_address=status_to_set int_address=int(new_address) if int_address not in next_node_free_address_list: next_node_free_address_list.append(int_address) continue #this cmd don't need to change webobject..so i continue priorityCmdQueue.put( {"cmd":"setSts","webObjectName":objName,"status_to_set":status_to_set,"write_to_hw":0,"user":user,"priority":priority,"mail_report_list":mail_report_list }) node_query_radio_threads_executing=0 logprint("handle_new_query_to_radio_node_thread closed") return()
[docs]def handle_new_query_to_network_node_thread(): """ | This is a thread function that will run until every request in the queryToNetworkNodeQueue is done. | It will get each query from queryToNetworkNodeQueue and call make_query_to_network_node() | While the query is running the current_node_handler_list will contain the node serialnumber being queried | In this way onos will avoid to make multiple simultaneos query to the same node. """ logprint("executed handle_new_query_to_network_node_thread() ") global node_query_threads_executing try: #with lock2_query_threads: node_query_threads_executing=node_query_threads_executing+1 while not queryToNetworkNodeQueue.empty(): #banana maybe to implement Queue.PriorityQueue() to give priority to certain queries current_query=queryToNetworkNodeQueue.get() #queryToNetworkNodeQueue.task_done() #banana maybe to remove because not usefull node_serial_number=current_query["node_serial_number"] if (nodeDict[node_serial_number].getNodeActivity()==0): # the node is inactive logprint("the node"+node_serial_number+"is inactive ,so I delete its query",verbose=8) errorQueue.put("the node"+node_serial_number+"is inactive ,so I delete its query") continue ##skip to the next query .. with lock1_current_node_handler_list: if ((node_serial_number not in current_node_handler_list)): #or (node_query_threads_executing==1)): current_node_handler_list.append(node_serial_number) logprint("handle_new_query_to_network_node_thread excuted with "+node_serial_number) else: logprint("node is already being contacted:q->"+current_query) queryToNetworkNodeQueue.put(current_query) continue logprint("node_query_threads_executing:"+node_query_threads_executing) #address=current_query["address"] query=current_query["query"] objName=current_query["objName"] status_to_set=current_query["status_to_set"] user=current_query["user"] priority=current_query["priority"] mail_report_list=current_query["mail_report_list"] while (wait_because_node_is_talking==1): #the node is talking to onos...wait ...todo to make it for each node.. logprint("the node is talking to onos...wait iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiit") time.sleep(0.1) #print "wait!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" make_query_to_network_node(node_serial_number,query,objName,status_to_set,user,priority,mail_report_list) with lock1_current_node_handler_list: logprint("lock2b from handle_new_query_to_network_node_thread_remove,query_to_node_dict[node_serial_number]"+node_serial_number) current_node_handler_list.remove(node_serial_number) time.sleep(0.1) #delay to not block the node , now the thread will get the next query to execute #here there is no more queries to make except Exception as e : message="main error in handle_new_query_to_network_node_thread, current query:"+str(current_query) logprint(message,verbose=8,error_tuple=(e,sys.exc_info())) with lock1_current_node_handler_list: try: logprint("lock2c from handle_new_query_to_network_node_thread_remove,query_to_node_dict)[node_serial_number]"+node_serial_number) current_node_handler_list.remove(node_serial_number) query_threads_number=query_threads_number-1 except: logprint("error in current_node_handler_list.remove after main error") if node_query_threads_executing>0: node_query_threads_executing=node_query_threads_executing-1 return()