-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcheck_mc30_read.py
executable file
·178 lines (139 loc) · 6.13 KB
/
check_mc30_read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3.10
'''
This uses a CAN link layer to check memmory configuration read command
Usage:
python3.10 check_mc30_read.py
The -h option will display a full list of options.
'''
import sys
import logging
from openlcb.nodeid import NodeID
from openlcb.message import Message
from openlcb.mti import MTI
from openlcb.pip import PIP
from queue import Empty
import olcbchecker.setup
def getReplyDatagram(destination) :
'''
Invoked after a datagram has been sent, this waits
for first the datagram reply message, then a
datagram message that contains a reply. It
replies with a datagram OK, then returns the reply datagram message.
Raises Exception if something went wrong.
'''
# first, wait for the reply
while True :
try :
received = olcbchecker.getMessage() # timeout if no entries
# is this a datagram reply, OK or not?
if not (received.mti == MTI.Datagram_Received_OK or received.mti == MTI.Datagram_Rejected) :
continue # wait for next
if destination != received.source : # check source in message header
continue
if NodeID(olcbchecker.ownnodeid()) != received.destination : # check destination in message header
continue
if received.mti == MTI.Datagram_Received_OK :
# OK, proceed to check reply Pending bit
if not received.data :
raise Exception("Failure - no flags in Datagram Received OK")
if received.data[0] & 0x80 != 0x80 :
raise Exception("Failure - Reply Pending not set in Datagram Received OK")
# at this point, all OK
break
else : # must be datagram rejected
# can't proceed
raise Exception("Failure - Original Datagram rejected")
except Empty:
raise Exception("Failure - no reply to read request")
# now wait for the reply datagram
while True :
try :
received = olcbchecker.getMessage() # timeout if no entries
# is this a datagram reply, OK or not?
if not (received.mti == MTI.Datagram) :
continue # wait for next
if destination != received.source : # check source in message header
continue
if NodeID(olcbchecker.ownnodeid()) != received.destination : # check destination in message header
continue
# here we've received the reply datagram
# send the reply
message = Message(MTI.Datagram_Received_OK, NodeID(olcbchecker.ownnodeid()), destination, [0])
olcbchecker.sendMessage(message)
return received
except Empty:
raise Exception("Failure - no reply datagram received")
def sendAndCheckResponse(destination, request, length) :
# returns non-zero if fail
# send a read datagram
message = Message(MTI.Datagram, NodeID(olcbchecker.ownnodeid()), destination, request)
olcbchecker.sendMessage(message)
try :
reply = getReplyDatagram(destination)
except Exception as e:
logger = logging.getLogger("MEMORY")
logger.warning(str(e))
return (3)
# check length of returned datagram
if len(reply.data) != len(request) -1 + length :
logger = logging.getLogger("MEMORY")
logger.warning("Failure - length was {}, expected {}".format(len(reply.data), len(request) -1 + length) )
return 3
expectedReply = request
expectedReply[1] = expectedReply[1] | 0x10
checkLen = len(request)-1
if expectedReply[:checkLen] != reply.data[:checkLen] :
logger = logging.getLogger("MEMORY")
logger.warning("Failure - header was {}, expected {}".format(reply.data[:checkLen]), expectedReply[:checkLen])
return 3
return 0
def check():
# set up the infrastructure
logger = logging.getLogger("MEMORY")
# pull any early received messages
olcbchecker.purgeMessages()
# get configured DUT node ID - this uses Verify Global in some cases, but not all
destination = olcbchecker.getTargetID()
###############################
# checking sequence starts here
###############################
# check if PIP says this is present
if olcbchecker.isCheckPip() :
pipSet = olcbchecker.gatherPIP(destination)
if pipSet is None:
logger.warning ("Failed in setup, no PIP information received")
return (2)
if not PIP.MEMORY_CONFIGURATION_PROTOCOL in pipSet :
logger.info("Passed - due to Memory Configuration protocol not in PIP")
return(0)
reply = sendAndCheckResponse(destination, [0x20, 0x41, 0,0,0,0, 2], 2)
if reply != 0 :
logger.warning(" in read of 2 bytes from short-form read")
return reply
reply = sendAndCheckResponse(destination, [0x20, 0x41, 0,0,0,0, 10], 10)
if reply != 0 :
logger.warning(" in read of 10 bytes from short-form read")
return reply
reply = sendAndCheckResponse(destination, [0x20, 0x41, 0,0,0,0, 64], 64)
if reply != 0 :
logger.warning(" in read of 64 bytes from short-form read")
return reply
reply = sendAndCheckResponse(destination, [0x20, 0x40, 0,0,0,0, 0xFD, 2], 2)
if reply != 0 :
logger.warning(" in read of 2 bytes from long-form read")
return reply
reply = sendAndCheckResponse(destination, [0x20, 0x40, 0,0,0,0, 0xFD, 10], 10)
if reply != 0 :
logger.warning(" in read of 10 bytes from long-form read")
return reply
reply = sendAndCheckResponse(destination, [0x20, 0x40, 0,0,0,0, 0xFD, 64], 64)
if reply != 0 :
logger.warning(" in read of 64 bytes from long-form read")
return reply
logger.info("Passed")
return 0
if __name__ == "__main__":
result = check()
import olcbchecker
olcbchecker.setup.interface.close()
sys.exit(result)