Skip to content

Commit

Permalink
updates for python3
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Oct 15, 2019
1 parent 932745f commit 4f28858
Showing 1 changed file with 27 additions and 25 deletions.
52 changes: 27 additions & 25 deletions python/uds.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import time
import struct
from enum import IntEnum
from Queue import Queue, Empty
import threading
from queue import Queue, Empty
from threading import Thread
from binascii import hexlify

class SERVICE_TYPE(IntEnum):
Expand Down Expand Up @@ -267,8 +267,10 @@ def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False)
self.tx_addr = tx_addr
if rx_addr == None:
if tx_addr < 0xFFF8:
# standard 11 bit response addr (add 8)
self.rx_addr = tx_addr+8
elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
# standard 19 bit response addr (flip last two bytes)
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
else:
raise ValueError("invalid tx_addr: {}".format(tx_addr))
Expand All @@ -278,7 +280,7 @@ def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False)
self.timeout = timeout
self.debug = debug

self.can_reader_t = threading.Thread(target=self._isotp_thread, args=(self.debug,))
self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,))
self.can_reader_t.daemon = True
self.can_reader_t.start()

Expand Down Expand Up @@ -315,7 +317,7 @@ def _isotp_thread(self, debug):
rx_frame["idx"] = 0
rx_frame["done"] = False
# send flow control message (send all bytes)
msg = "\x30\x00\x00".ljust(8, "\x00")
msg = b"\x30\x00\x00".ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
elif rx_data[0] >> 4 == 0x2:
Expand Down Expand Up @@ -344,7 +346,7 @@ def _isotp_thread(self, debug):
for i in range(start, end, 7):
tx_frame["idx"] += 1
# consecutive tx frames
msg = (chr(0x20 | (tx_frame["idx"] & 0xF)) + tx_frame["data"][i:i+7]).ljust(8, "\x00")
msg = (chr(0x20 | (tx_frame["idx"] & 0xF)).encode("utf8") + tx_frame["data"][i:i+7]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
if delay_ts > 0:
Expand All @@ -360,13 +362,13 @@ def _isotp_thread(self, debug):
if tx_frame["size"] < 8:
# single frame
tx_frame["done"] = True
msg = (chr(tx_frame["size"]) + tx_frame["data"]).ljust(8, "\x00")
msg = (chr(tx_frame["size"]).encode("utf8") + tx_frame["data"]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
else:
# first rx_frame
tx_frame["done"] = False
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, "\x00")
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
else:
Expand All @@ -377,9 +379,9 @@ def _isotp_thread(self, debug):

# generic uds request
def _uds_request(self, service_type, subfunction=None, data=None):
req = chr(service_type)
req = chr(service_type).encode("utf8")
if subfunction is not None:
req += chr(subfunction)
req += chr(subfunction).encode("utf8")
if data is not None:
req += data
self.tx_queue.put(req)
Expand Down Expand Up @@ -450,7 +452,7 @@ def security_access(self, access_type, security_key=None):
return security_seed

def communication_control(self, control_type, message_type):
data = chr(message_type)
data = chr(message_type).encode("utf8")
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)

def tester_present(self, ):
Expand Down Expand Up @@ -503,7 +505,7 @@ def response_on_event(self, response_event_type, store_event, window_time, event
def link_control(self, link_control_type, baud_rate_type=None):
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = chr(baud_rate_type)
data = chr(baud_rate_type).encode("utf8")
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
Expand All @@ -525,7 +527,7 @@ def read_memory_by_address(self, memory_address, memory_size, memory_address_byt
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = chr(memory_size_bytes<<4 | memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand All @@ -547,7 +549,7 @@ def read_scaling_data_by_identifier(self, data_identifier_type):

def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier):
# TODO: support list of identifiers
data = chr(transmission_mode_type) + chr(periodic_data_identifier)
data = chr(transmission_mode_type).encode("utf8") + chr(periodic_data_identifier).encode("utf8")
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)

def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
Expand All @@ -559,9 +561,9 @@ def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_da
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"])
data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]).encode("utf8") + chr(s["memory_size"]).encode("utf8")
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += chr(memory_size_bytes<<4 | memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
Expand All @@ -587,7 +589,7 @@ def write_memory_by_address(self, memory_address, memory_size, data_record, memo
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = chr(memory_size_bytes<<4 | memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand All @@ -612,7 +614,7 @@ def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
data += chr(dtc_status_mask_type)
data += chr(dtc_status_mask_type).encode("utf8")
# dtc_mask_record
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
Expand All @@ -628,11 +630,11 @@ def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
data += chr(dtc_extended_record_num)
data += chr(dtc_extended_record_num).encode("utf8")
# dtc_severity_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type)
data += chr(dtc_severity_mask_type).encode("utf8") + chr(dtc_status_mask_type).encode("utf8")

resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)

Expand All @@ -656,13 +658,13 @@ def routine_control(self, routine_control_type, routine_identifier_type, routine
return resp[2:]

def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
data = chr(data_format)
data = chr(data_format).encode("utf8")

if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += chr(memory_size_bytes<<4 | memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand All @@ -681,13 +683,13 @@ def request_download(self, memory_address, memory_size, memory_address_bytes=4,
return max_num_bytes # max number of bytes per transfer data request

def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
data = chr(data_format)
data = chr(data_format).encode("utf8")

if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += chr(memory_size_bytes<<4 | memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand All @@ -706,7 +708,7 @@ def request_upload(self, memory_address, memory_size, memory_address_bytes=4, me
return max_num_bytes # max number of bytes per transfer data request

def transfer_data(self, block_sequence_count, data=''):
data = chr(block_sequence_count)+data
data = chr(block_sequence_count).encode("utf8") + data
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
Expand Down

0 comments on commit 4f28858

Please sign in to comment.