-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcheck_fr50_capacity.py
executable file
·148 lines (108 loc) · 5.61 KB
/
check_fr50_capacity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3.10
'''
Check ability to keep up with full-speed operation
Usage:
python3.10 check_fr50_capacity.py
The -h option will display a full list of options.
'''
import sys
import logging
from openlcb.nodeid import NodeID
from openlcb.message import Message
from openlcb.mti import MTI
from queue import Empty
import olcbchecker.setup
def sendCheckMessages(beforeMessage, checkMessage, afterMesage):
BEFORE_COUNT = 300;
AFTER_COUNT = BEFORE_COUNT;
for i in range(0, BEFORE_COUNT) :
olcbchecker.sendMessage(beforeMessage)
olcbchecker.sendMessage(checkMessage)
for i in range(0, AFTER_COUNT) :
olcbchecker.sendMessage(afterMesage)
return
def check():
# set up the infrastructure
logger = logging.getLogger("FRAME")
# pull any early received messages
olcbchecker.purgeMessages()
ownnodeid = olcbchecker.setup.configure.ownnodeid
destination = olcbchecker.getTargetID()
timeout = 3.0
localAlias = olcbchecker.setup.canLink.localAlias # just to be shorter
###############################
# checking sequence starts here
###############################
# send 1st check
beforeMessage = Message(MTI.Producer_Consumer_Event_Report , NodeID(olcbchecker.ownnodeid()), destination, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
checkMessage = Message(MTI.Verify_NodeID_Number_Addressed , NodeID(olcbchecker.ownnodeid()), destination)
afterMesage = Message(MTI.Producer_Consumer_Event_Report , NodeID(olcbchecker.ownnodeid()), destination, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
sendCheckMessages(beforeMessage, checkMessage, afterMesage)
# check for reply
while True :
try :
received = olcbchecker.getMessage() # timeout if no entries
# is this a reply from that node?
if not received.mti == MTI.Verified_NodeID and not received.mti == MTI.Verified_NodeID_Simple : continue # wait for next
# this is a Verified Node ID message, success
if destination != received.source : # check source in message header
logger.warning ("Failure - Unexpected source of reply message in check 1: {} {}".format(received, received.source))
return(3)
if len(received.data) < 6:
logger.warning ("Failure - Unexpected length of reply message in check 1: {} {}".format(received, received.data))
return(3)
break
except Empty:
logger.warning ("Failure - Did not receive Verified Node ID reply for check 1")
return(3)
olcbchecker.purgeMessages()
# send 2nd check
beforeMessage = Message(MTI.Verify_NodeID_Number_Addressed , NodeID(olcbchecker.ownnodeid()), NodeID(olcbchecker.ownnodeid()), [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
checkMessage = Message(MTI.Verify_NodeID_Number_Global , NodeID(olcbchecker.ownnodeid()), None)
afterMesage = Message(MTI.Verify_NodeID_Number_Addressed , NodeID(olcbchecker.ownnodeid()), NodeID(olcbchecker.ownnodeid()), [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
sendCheckMessages(beforeMessage, checkMessage, afterMesage)
# check for reply
while True :
try :
received = olcbchecker.getMessage() # timeout if no entries
# is this a reply from that node?
if not received.mti == MTI.Verified_NodeID and not received.mti == MTI.Verified_NodeID_Simple : continue # wait for next
# this is a Verified Node ID message, from right node?
if destination != received.source : # check source in message header
continue # there might be other nodes replying to the global request
if len(received.data) < 6:
logger.warning ("Failure - Unexpected length of reply message in check 2: {} {}".format(received, received.data))
return(3)
break
except Empty:
logger.warning ("Failure - Did not receive Verified Node ID reply for check 2")
return(3)
olcbchecker.purgeMessages()
# send 3rd check
beforeMessage = Message(MTI.Identify_Consumer , NodeID(olcbchecker.ownnodeid()), destination, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
checkMessage = Message(MTI.Verify_NodeID_Number_Global , NodeID(olcbchecker.ownnodeid()), None)
afterMesage = Message(MTI.Identify_Consumer , NodeID(olcbchecker.ownnodeid()), destination, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
sendCheckMessages(beforeMessage, checkMessage, afterMesage)
# check for reply
while True :
try :
received = olcbchecker.getMessage() # timeout if no entries
# is this a reply from that node?
if not received.mti == MTI.Verified_NodeID and not received.mti == MTI.Verified_NodeID_Simple : continue # wait for next
# this is a Verified Node ID message, from right node?
if destination != received.source : # check source in message header
continue # there might be other nodes replying to the global request
if len(received.data) < 6:
logger.warning ("Failure - Unexpected length of reply message in check 3: {} {}".format(received, received.data))
return(3)
break
except Empty:
logger.warning ("Failure - Did not receive Verified Node ID reply for check 3")
return(3)
logger.info("Passed")
return 0
if __name__ == "__main__":
result = check()
import olcbchecker
olcbchecker.setup.interface.close()
sys.exit(result)