# -*- coding: UTF-8 -*-
"""
| This Modules handles all the query to the nodes.
| it will receive the data from the router_handler and it will send the query to the nodes.
| It will also retry sending the message if the node doesn't answer right or if doesn't answer at all.
| If the nodes confirm the command was received then this module will tell onos to set the web object status
| to reflect the new node status after the command was received.
|
"""
from conf import *
#import pyserial_port
[docs]def make_query_to_radio_node(serialCom,node_serial_number,query,number_of_retry_already_done):
"""
| This function make a query to a radio/serial node and wait the answer from the serial gateway.
| If the answer is positive
| it will add to the priorityCmdQueue the command to change the web_object status
| from pending to succesfully changed .
| If the answer from the node is an error or the node is not responding
| the query will be repeated x times before giving up.
"""
logprint("make_query_to_radio_node executed with number_of_retry_already_done:"+str(number_of_retry_already_done))
max_retry=1
for m in range(0,max_retry): #retry n times to get the answer from node #retry n times to get the answer from node
# [S_001dw06001_#]
node_address=nodeDict[node_serial_number].getNodeAddress()
query=query[0:3]+node_address+query[6:] #change the address query if the node get a new one
data=""
end_of_query=query.find("_#]")
# [S_ok003sr070811_#]
expected_confirm="[S_ok"+query[3:end_of_query+3]
#if data.find("ok"+query[3:end_of_query+3])!=-1:
if number_of_retry_already_done!=0: #look if the node has already answer the previous query..
copy_of_readed_packets_list=serialCom.uart.readed_packets_list
i=len(serialCom.uart.readed_packets_list)-1
while i>0: #iterate the list from the last element to the first
try:
a=serialCom.uart.readed_packets_list[i]
logprint("check of all received answers000000000 current one was:"+str(a))
if a.find(expected_confirm)!=-1 : #found the answer
serialCom.uart.readed_packets_list.remove(a)
logprint("I have found the answer I was looking for")
return (a)
if a=="[S_ertx1_#]":
serialCom.uart.readed_packets_list.remove(a)
if a=="[S_nocmd0_#]":
serialCom.uart.readed_packets_list.remove(a)
i=i-1
except Exception as e :
message="make_query_to_radio_node"
logprint(message,verbose=8,error_tuple=(e,sys.exc_info()))
return(-1)
time.sleep(0.4)
#if serialCom.uart.ser.isOpen() == False :
# print "serial port is not open in make_query_to_radio_node()"
#priorityCmdQueue.put( {"cmd":"reconnectSerialPort"})
# time.sleep(1)
#return(-1)
try:
data=serialCom.uart.write(query)
#data=pyserial_port.writeToSerial(query)
except Exception as e:
message="error writing to serial port, data to send:"+query+", at:"+getErrorTimeString()
logprint(message,verbose=8,error_tuple=(e,sys.exc_info()))
#time.sleep(0.1*m)
# time.sleep(2)
if data.find(expected_confirm)!=-1:
return(data)
#if data=="error_reception":
# continue
# return(1)
#print "expected confirm:"+expected_confirm
#print "uart rx list:"
#print serialCom.uart.readed_packets_list
# with lock_serial_input:
#for a in serialCom.uart.readed_packets_list.:
copy_of_readed_packets_list=serialCom.uart.readed_packets_list
for a in copy_of_readed_packets_list: #iterate the list from the last element to the first
#a=serialCom.uart.readed_packets_list[i]
logprint("check of all received answers current was:"+str(a))
if a.find(expected_confirm)!=-1 : #found the answer
serialCom.uart.readed_packets_list.remove(a)
logprint("I have found the answer I was looiking for")
return (a)
try:
if a=="[S_ertx1_#]":
serialCom.uart.readed_packets_list.remove(a)
if a=="[S_nocmd0_#]":
serialCom.uart.readed_packets_list.remove(a)
except:
logprint("error serialCom.uart.readed_packets_list.remove(a) ")
#print "uart rx list after:"
#print serialCom.uart.readed_packets_list
#print "answer received from serial port is wrong:'"+data+"'end_data, trying query the serial node the expected answer was:"+expected_confirm+",the number of try is "+str(m)
#errorQueue.put("answer received from serial port is wrong:'"+data+"', trying query the serial node the expected answer was:'"+expected_confirm+"'the number of try is "+str(m)+" at:" +getErrorTimeString() )
#time.sleep(0.2*m)
logprint("Great serial error,answer received from serial port was wrong:"+data+"end_data, trying query the serial,node the query was"+query+"the number of try was "+str(max_retry)+" at:" +getErrorTimeString() )
return(-1)
[docs]def make_query_to_network_node(node_serial_number,query,objName,status_to_set,user,priority,mail_report_list):
"""
| This function make a query to a powerline/ethernet node and wait the answer from the node.
| If the answer is positive
| it will add to the priorityCmdQueue the command to change the web_object status
| from pending to succesfully changed .
| If the answer from the node is an error or the node is not responding
| the query will be repeated x times before giving up.
"""
#time.sleep(0.1)
logprint("make_query_to_network_node() thread executed")
logprint( "i try this query:"+query)
timeout=0.1
html_response="local_error_in_router_handler_cant_connect_to_node"
#wait_timeout=1000
for m in range(0,8): #retry n times to get the answer from node
node_address=nodeDict[node_serial_number].getNodeAddress() #update the node address ..maybe has changed..
logprint("connection try number:"+str(m)+"to ip number"+str(node_address) )
html_response="local_error_in_router_handler_cant_connect_to_node"
received_answer=""
flag=0
while (wait_because_node_is_talking==1): #the node is talking to onos...wait ...banana to make it for each node..
logprint("i wait_because_node_is_talking ..............")
time.sleep(0.1)
flag=1
if flag==1:
time.sleep(0.2)
try:
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # to reuse the same address...prevent address already in use error
s.connect((node_address,node_webserver_port))
# Protocol exchange - sends and receives
time_start=time.time()
s.settimeout(4) #timeout of 2 second ,don't change this!
s.sendall(query)
while (exit==0):
logprint("s.recv(1024aaaa)")
resp=""
try:
resp = s.recv(1024)
except Exception as e :
message="error0 in make_query_to_network_node() router_handler class trying to query a node the query was"+query+"the number of try is "+str(m)
logprint(message,verbose=9,error_tuple=(e,sys.exc_info()))
break
logprint("after s.recv(1024)")
received_answer=received_answer+resp
#if (time.time()> (time_start+wait_timeout) ):
# print "timeout waiting for answer from node....................................................................."
# errorQueue.put("timeout waiting for answer from node, the query was:"+query)
# break
if received_answer.find("_#]")!=-1:
break
logprint(resp)
if received_answer.find("ok")!=-1:
# print "message sent"
m=1000
break
# Close the connection when completed
s.close()
logprint("\ndone")
except Exception as e :
message="error2 in make_query_to_network_node() router_handler class trying to query a node the query was"+query+"the number of try is "+str(m)+"at:"+getErrorTimeString()
logprint(message,verbose=9,error_tuple=(e,sys.exc_info()))
s.close()
if m>5:
timeout=2
time.sleep(timeout)
continue
else: # the connection was succesfull
if received_answer.find("ok_#]")!=-1:
logprint("msg sent correctly")
html_response=received_answer
priorityCmdQueue.put( {"cmd":"setSts","webObjectName":objName,"status_to_set":status_to_set,"write_to_hw":0,"user":user,"priority":priority,"mail_report_list":mail_report_list })
return()
#break
else:
html_response=received_answer
logprint("answer received is wrong:"+received_answer)
time.sleep(timeout)
continue
logprint("great error the node did not answer also if tried :"+str(m)+"times, the node will not be setted anymore(probably is not connected)")
return()
[docs]def handle_new_query_to_radio_node_thread(serialCom):
"""
| This is a thread function that will run until every request in the queryToRadioNodeQueue is done.
| It will get each query from queryToRadioNodeQueue and call make_query_to_radio_node()
| While the query is running the current_node_handler_list will contain the node serialnumber being queried
| In this way onos will avoid to make multiple simultaneos query to the same node.
"""
logprint("executed handle_new_query_to_radio_node_thread() ")
global node_query_radio_threads_executing
global next_node_free_address_list
global nodeDict
node_query_radio_threads_executing=1
time_of_write=time.time() #after n query sent wait a moment to let the remote nodes starts the tranmissions
old_time_of_write=time.time()
time_waiting_for_incoming_msg=time.time()
threshold_of_time_query=0.1
while not queryToRadioNodeQueue.empty():
time.sleep(0.3) #need this to allow the serial node to pick up the messages from the radio nodes..
#query_sent_before_delay=query_sent_before_delay+1
time_of_write=time.time()
old_time_of_write=time_of_write
#time_waiting_for_incoming_msg=time.time()
if (time.time()-time_waiting_for_incoming_msg-threshold_of_time_query)>(time_of_write-old_time_of_write):
logprint("wait to allow rx from radio nodes")
time.sleep(0.7) #need this to allow the serial node to pick up the messages from the radio nodes..
old_time_of_write=time.time()
time_waiting_for_incoming_msg=time.time()
time_of_write=time.time()
query_sent_before_delay=0
currentRadioQueryPacket=queryToRadioNodeQueue.get() #get the tuple:
#((query_order,query,node_serial_number,number_of_retry_done,query_time,objName,status_to_set,user,priority,mail_report_list,cmd))
query_order=currentRadioQueryPacket[0]
query=currentRadioQueryPacket[1]
node_serial_number=currentRadioQueryPacket[2]
number_of_retry_done=currentRadioQueryPacket[3]
query_time=currentRadioQueryPacket[4]
objName=currentRadioQueryPacket[5]
status_to_set=currentRadioQueryPacket[6]
user=currentRadioQueryPacket[7]
priority=currentRadioQueryPacket[8]
mail_report_list=currentRadioQueryPacket[9]
cmd=currentRadioQueryPacket[10]
node_address=nodeDict[node_serial_number].getNodeAddress()
query=query[0:3]+node_address+query[6:] #change the address query if the node get a new one
#if number_of_retry_done>0:
#else:
# query_order=query_time-currentRadioQueryPacket[0] # i used time_when_the_query_was_created - priority..
logprint("current query_order:"+str(query_order)+"for query:"+query)
node_address=nodeDict[node_serial_number].getNodeAddress()
query_answer=make_query_to_radio_node(serialCom,node_serial_number,query,number_of_retry_done)
if query_answer==-1 : #invalid answer received
logprint("error query_answer wrong UUUUUUUUuuuuuuuuuuuuuuUUUUUUUUUUUUuuuuuuuuuuu",verbose=4)
number_of_retry_done=number_of_retry_done+1
if priority==99: #if the priority is 99 then the query will be always retrayed infinites times.
query_order=time.time()+1 #make the query less important..to allow other queries to run
if number_of_retry_done>35: #if greater that n wait a bit
logprint("sleep a bit because number_of_retry_done>35")
time.sleep(0.2) #need this to allow the serial node to pick up the messages from the radio nodes..
else:
query_order=time.time()+queryToRadioNodeQueue.qsize() #make the query less important..to allow other queries to run
if number_of_retry_done>25: #if greater that n don't repeat the query.
logprint("i retried the query:"+query+"more than 15 times , I giveup",verbose=10)
continue
if (time.time()-query_time )>500:
#if more than n seconds has passed since the query was made the first time..don't repeat the query.
logprint("i retried the query "+query+"more than 100 seconds , I giveup",verbose=10)
continue
queryToRadioNodeQueue.put((query_order,query,node_serial_number,number_of_retry_done,query_time,objName,status_to_set,user,priority,mail_report_list,cmd))
else:##if the query was accepted from the radio/serial node
#since onos was able to talk to the node I update the LastNodeSync
layerExchangeDataQueue.put( {"cmd":"updateNodeAddress","nodeSn":node_serial_number,"nodeAddress":node_address})
if cmd=="set_address":
new_address=status_to_set
int_address=int(new_address)
if int_address not in next_node_free_address_list:
next_node_free_address_list.append(int_address)
continue #this cmd don't need to change webobject..so i continue
priorityCmdQueue.put( {"cmd":"setSts","webObjectName":objName,"status_to_set":status_to_set,"write_to_hw":0,"user":user,"priority":priority,"mail_report_list":mail_report_list })
node_query_radio_threads_executing=0
logprint("handle_new_query_to_radio_node_thread closed")
return()
[docs]def handle_new_query_to_network_node_thread():
"""
| This is a thread function that will run until every request in the queryToNetworkNodeQueue is done.
| It will get each query from queryToNetworkNodeQueue and call make_query_to_network_node()
| While the query is running the current_node_handler_list will contain the node serialnumber being queried
| In this way onos will avoid to make multiple simultaneos query to the same node.
"""
logprint("executed handle_new_query_to_network_node_thread() ")
global node_query_threads_executing
try:
#with lock2_query_threads:
node_query_threads_executing=node_query_threads_executing+1
while not queryToNetworkNodeQueue.empty(): #banana maybe to implement Queue.PriorityQueue() to give priority to certain queries
current_query=queryToNetworkNodeQueue.get()
#queryToNetworkNodeQueue.task_done() #banana maybe to remove because not usefull
node_serial_number=current_query["node_serial_number"]
if (nodeDict[node_serial_number].getNodeActivity()==0): # the node is inactive
logprint("the node"+node_serial_number+"is inactive ,so I delete its query",verbose=8)
errorQueue.put("the node"+node_serial_number+"is inactive ,so I delete its query")
continue ##skip to the next query ..
with lock1_current_node_handler_list:
if ((node_serial_number not in current_node_handler_list)): #or (node_query_threads_executing==1)):
current_node_handler_list.append(node_serial_number)
logprint("handle_new_query_to_network_node_thread excuted with "+node_serial_number)
else:
logprint("node is already being contacted:q->"+current_query)
queryToNetworkNodeQueue.put(current_query)
continue
logprint("node_query_threads_executing:"+node_query_threads_executing)
#address=current_query["address"]
query=current_query["query"]
objName=current_query["objName"]
status_to_set=current_query["status_to_set"]
user=current_query["user"]
priority=current_query["priority"]
mail_report_list=current_query["mail_report_list"]
while (wait_because_node_is_talking==1): #the node is talking to onos...wait ...todo to make it for each node..
logprint("the node is talking to onos...wait iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiit")
time.sleep(0.1)
#print "wait!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
make_query_to_network_node(node_serial_number,query,objName,status_to_set,user,priority,mail_report_list)
with lock1_current_node_handler_list:
logprint("lock2b from handle_new_query_to_network_node_thread_remove,query_to_node_dict[node_serial_number]"+node_serial_number)
current_node_handler_list.remove(node_serial_number)
time.sleep(0.1) #delay to not block the node , now the thread will get the next query to execute
#here there is no more queries to make
except Exception as e :
message="main error in handle_new_query_to_network_node_thread, current query:"+str(current_query)
logprint(message,verbose=8,error_tuple=(e,sys.exc_info()))
with lock1_current_node_handler_list:
try:
logprint("lock2c from handle_new_query_to_network_node_thread_remove,query_to_node_dict)[node_serial_number]"+node_serial_number)
current_node_handler_list.remove(node_serial_number)
query_threads_number=query_threads_number-1
except:
logprint("error in current_node_handler_list.remove after main error")
if node_query_threads_executing>0:
node_query_threads_executing=node_query_threads_executing-1
return()