diff --git a/ecrterm/ecr.py b/ecrterm/ecr.py index 58dcdfb..9227085 100644 --- a/ecrterm/ecr.py +++ b/ecrterm/ecr.py @@ -7,6 +7,7 @@ - ability for incoming and outgoing """ import logging +import string from time import sleep from ecrterm.common import TERMINAL_STATUS_CODES @@ -16,7 +17,8 @@ from ecrterm.packets.base_packets import ( Authorisation, CloseCardSession, Completion, DisplayText, EndOfDay, Initialisation, Packet, PrintLine, ReadCard, Registration, ReservationBookTotal, ReservationPartialReversal, - ReservationRequest, ResetTerminal, SetTerminalID, StatusEnquiry, StatusInformation, WriteFiles) + ReservationRequest, ResetTerminal, SetTerminalID, StatusEnquiry, StatusInformation, WriteFiles, + OpenReservationsEnquiry) from ecrterm.packets.types import (ConfigByte, CurrencyCode) from ecrterm.transmission._transmission import Transmission from ecrterm.transmission.signals import ACK, DLE, ETX, NAK, STX, TRANSMIT_OK @@ -98,6 +100,7 @@ class ECR(object): MAX_TEXT_LINES = 4 _state_registered = None _state_connected = None + _status = None def __init__(self, device='/dev/ttyUSB0', password='123456'): """ @@ -310,6 +313,8 @@ def status(self): # try to get version if not self.version: self.version = self.last.completion.get('sw_version', None) + + self._status = self.last.completion.terminal_status return self.last.completion.status_byte # no completion means some error. return False @@ -358,6 +363,21 @@ def reverse_reservation(self, receipt_no, amount_cent=50, tlv=[], listener=None) return self._send_packet(packet, listener) + def get_open_reservations(self, listener=None): + """ + Fetches the receipt-number of the first pre-authorisation not yet reversed by a partial-reversal / book total. + Note: The receipt-number is returned in an Abort package (06 1E). + If no open reservation exist in the PT, ‘FFFF’ is returned as receipt-number. + + Instead of a single receipt-number the PT can also transmit a receipt-number list as a TLV-container. + However, for this the ECR must have sent a BMP 06 in the triggering command or in the registration. + + @returns: True on success, False on failure + """ + packet = OpenReservationsEnquiry() + + return self._send_packet(packet, listener) + def book_reservation(self, receipt_no, amount_cent=50, tlv=[], listener=None): """ executes a reservation booking for receipt with used amount in cents. @@ -470,6 +490,9 @@ def read_card(self, timeout=1, read_card_args={}): def close_card(self): return self.transmit(CloseCardSession()) + def get_human_readable_status(self) -> string: + return TERMINAL_STATUS_CODES.get(self._status, 'Unknown Status') + if __name__ == '__main__': logging.basicConfig(level=9, filename='./terminallog.txt', filemode='aw') diff --git a/ecrterm/packets/base_packets.py b/ecrterm/packets/base_packets.py index 8a52b42..4e3fd0c 100644 --- a/ecrterm/packets/base_packets.py +++ b/ecrterm/packets/base_packets.py @@ -193,6 +193,17 @@ class Completion(Packet): ALLOWED_BITMAPS = ['tlv', 'status_byte', 'tid', 'currency_code'] + def get_serial_number(self): + serial_number = None + tlv = self.get('tlv') + if tlv is not None: + device_information = tlv.get_value('xE4', []) + for item in device_information: + if item.tag_ == 0x1F42: + serial_number = item.value_ + + return serial_number + class Abort(Packet): """ @@ -206,6 +217,22 @@ class Abort(Packet): result_code = ByteField() # FIXME error_code + def get_receipt_numbers(self) -> List[str]: + receipt_numbers = [] + if self.get('receipt') is not None and self.get('receipt') != 'ffff': + receipt_numbers.append(self.get('receipt')) + + tlv = self.get('tlv') + if tlv is not None: + receipt_numbers_tlv_list = tlv.get_value('x23', []) + for receipt_number_tlv in receipt_numbers_tlv_list: + if receipt_number_tlv.tag_ == 8 \ + and receipt_number_tlv.value_ != 'ffff' \ + and receipt_number_tlv.value_ not in receipt_numbers: + receipt_numbers.append(receipt_number_tlv.value_) + + return receipt_numbers + class StatusInformation(Packet): """ @@ -564,6 +591,20 @@ class ReservationPartialReversal(Packet): 'aid', 'tlv'] +class OpenReservationsEnquiry(Packet): + """ + 06 23 03 87 FF FF + """ + CMD_CLASS = 0x06 + CMD_INSTR = 0x23 + wait_for_completion = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **dict(kwargs, receipt='FFFF')) + + ALLOWED_BITMAPS = ['receipt'] + + class ReservationBookTotal(Packet): """ 06 24 diff --git a/ecrterm/packets/fields.py b/ecrterm/packets/fields.py index a524a39..9eced13 100644 --- a/ecrterm/packets/fields.py +++ b/ecrterm/packets/fields.py @@ -1,3 +1,4 @@ +import string from enum import Enum from typing import Any, Union, List, Optional, Tuple @@ -25,7 +26,7 @@ def __init__(self, required=True, ignore_parse_error=False, data_type=None, name self.ignore_parse_error = ignore_parse_error self.data_type = data_type or self.DATA_TYPE self.name = name # Only used in TLV - super().__init__(*args, **kwargs) + super().__init__() def from_bytes(self, v: Union[bytes, List[int]]) -> Any: return v @@ -81,7 +82,8 @@ def serialize(self, data: Any) -> bytes: def validate(self, data: Any) -> None: if len(self.to_bytes(data)) != self.length: - raise ValueError("Field must be exactly {} bytes long".format(self.length)) + raise ValueError("Field must be exactly {} bytes long (got {} bytes)" + .format(self.length, len(self.to_bytes(data)))) class LVARField(Field): @@ -212,8 +214,9 @@ def from_bytes(self, v: Union[bytes, List[int]]) -> str: return bytearray(v).hex() def to_bytes(self, v: str, length: Optional[int] = None) -> bytes: - if not v.isdigit(): - raise ValueError("BCD field contents can only be numeric") + # Note: we need to allow pseudo-tetrades - e.g. for open reservation enquiry (06 23 03 87 FF FF) + if any(x not in string.hexdigits for x in v): + raise ValueError("BCD field contents can only be hexdigits") if len(v) % 2 != 0: v = '0' + v @@ -222,8 +225,9 @@ def to_bytes(self, v: str, length: Optional[int] = None) -> bytes: def validate(self, data: str) -> None: super().validate(data) - if not str(data).isdigit(): - raise ValueError("BCD field contents can only be numeric") + # Note: we need to allow pseudo-tetrades - e.g. for open reservation enquiry (06 23 03 87 FF FF) + if any(x not in string.hexdigits for x in data): + raise ValueError("BCD field contents can only be hexdigits") class BCDField(BCDVariableLengthField, FixedLengthField): @@ -311,10 +315,13 @@ def __get__(self, instance, objtype=None) -> TLV: 'zvt', { None: BytesField(), 0x07: StringField(name="text_line"), + 0x08: BCDField(name='receipt', length=2), 0x14: FlagByteField(name="character_set", data_type=CharacterSet), 0x15: StringField(name="language_code", character_set=CharacterSet.ASCII_7BIT), + 0x23: ContainerType(name='receipt-numbers'), 0x1d: BEIntField(name='file_id', length=1), 0x1e: BEIntField(name='start_position', length=4), + 0x40: BytesField(name='emv_config'), 0x1f00: BEIntField(name='file_size', length=4), 0x1f10: FlagByteField(name="cardholder_identification", data_type=CardholderIdentification), 0x1f11: FlagByteField(name='online_tag', data_type=OnlineTag), diff --git a/ecrterm/packets/tlv.py b/ecrterm/packets/tlv.py index 861d150..8ea93b3 100644 --- a/ecrterm/packets/tlv.py +++ b/ecrterm/packets/tlv.py @@ -1,6 +1,6 @@ import string from enum import IntEnum -from typing import Union, Tuple, TypeVar, Type, List, Dict, Tuple, Any, Optional +from typing import Union, TypeVar, Type, List, Dict, Tuple, Any, Optional from .context import CurrentContext, enter_context from .types import VendorQuirks @@ -22,8 +22,9 @@ def __repr__(self): NOT_PROVIDED = NotProvided() - -_FIRST_PARAM_TYPE = Union[NotProvided, TLVType, Tuple[Union[TLVType, List, Tuple]], List[Union[TLVType, List, Tuple]], Dict[Union[int, str], Any]] +_FIRST_PARAM_TYPE = Union[ + NotProvided, TLVType, Tuple[Union[TLVType, List, Tuple]], List[Union[TLVType, List, Tuple]], Dict[ + Union[int, str], Any]] class TLV: @@ -74,6 +75,7 @@ def _make_tlv_length(length: int) -> bytes: length = length >> 8 retval.append(0x80 | len(retval)) return bytes(reversed(retval)) + # def __new__(cls: Type[TLVType], constructed_value_: _FIRST_PARAM_TYPE = NOT_PROVIDED, *args, **kwargs): @@ -81,7 +83,8 @@ def __new__(cls: Type[TLVType], constructed_value_: _FIRST_PARAM_TYPE = NOT_PROV return constructed_value_ return super().__new__(cls) - def __init__(self, constructed_value_: _FIRST_PARAM_TYPE =NOT_PROVIDED, tag_=None, value_=NOT_PROVIDED, implicit_=False, **kwargs): + def __init__(self, constructed_value_: _FIRST_PARAM_TYPE = NOT_PROVIDED, tag_=None, value_=NOT_PROVIDED, + implicit_=False, **kwargs): if isinstance(constructed_value_, TLV): return # __new__ handled this @@ -141,6 +144,7 @@ def tag_(self, value): if not self._constructed: self._type = active_dictionary.get(value, active_dictionary[None]) + # # @@ -151,6 +155,7 @@ def constructed_(self): @property def class_(self): return self._class + # # @@ -165,7 +170,7 @@ def value_(self, value): if value is not None: self._implicit = False if self._constructed: - if isinstance(value, (tuple,list)): + if isinstance(value, (tuple, list)): self._value = [] for item in value: if isinstance(item, TLV): @@ -193,6 +198,7 @@ def value_(self, value): self._value = self._type.from_bytes(value) else: self._value = value + # def __getattr__(self, key): @@ -257,7 +263,7 @@ def items_(self): raise TypeError("Cannot access items_ of primitive TLV") retval = [] for v in self.value_: - retval.append( (v.name_, v.value_) ) + retval.append((v.name_, v.value_)) return retval def append_(self, key, value, overwrite=False): @@ -298,7 +304,8 @@ def _serialize_value(self) -> bytes: return bytes(retval) @classmethod - def parse(cls: Type[TLVType], data: bytes, empty_tag: bool = False, dictionary: Optional[str]=None) -> Tuple[TLVType, bytes]: + def parse(cls: Type[TLVType], data: bytes, empty_tag: bool = False, dictionary: Optional[str] = None) \ + -> Tuple[TLVType, bytes]: pos = 0 if empty_tag: @@ -308,7 +315,7 @@ def parse(cls: Type[TLVType], data: bytes, empty_tag: bool = False, dictionary: length, pos = cls._read_tlv_length(data, pos) - value_ = data[pos:(pos+length)] + value_ = data[pos:(pos + length)] pos = pos + length if dictionary is not None: @@ -332,9 +339,16 @@ def serialize(self) -> bytes: return bytes(retval) + d + def get_value(self, key, default=None): + for item in self.items_: + if item[0] == key: + return item[1] + return default + class TLVDataType: name = None + def __init__(self, name=None, *args, **kwargs): if name is not None: self.name = name @@ -377,7 +391,6 @@ class ContainerType(TLVDataType): }, ) - # FIXME store active dictionary in container # FIXME Tag value assignment with non-byte data # FIXME dictionary tag names (set and get) diff --git a/ecrterm/tests/test_apdu.py b/ecrterm/tests/test_apdu.py index 09e1d15..b2dc7b5 100644 --- a/ecrterm/tests/test_apdu.py +++ b/ecrterm/tests/test_apdu.py @@ -1,7 +1,7 @@ from ecrterm.packets.apdu import CommandAPDU, ParseError from ecrterm.packets.fields import ByteField, BytesField, BCDIntField from ecrterm.packets.base_packets import LogOff, Initialisation, Registration, DisplayText, PrintLine, Authorisation, \ - WriteFiles + WriteFiles, OpenReservationsEnquiry from unittest import TestCase, main @@ -163,6 +163,11 @@ def test_create_tlv(self): c2.tlv.xf2.xc1 = b'\x12\x23' self.assertEqual(bytearray.fromhex('0601080606f204c1021223'), c2.serialize()) + def test_create_open_reservations_enquiry_packet(self): + packet = OpenReservationsEnquiry() + + self.assertEqual(bytearray.fromhex('06230387FFFF'), packet.serialize()) + def test_override_bitmaps(self): c = CommandAPDU.parse(bytearray.fromhex('ffaa040602ffaa')) diff --git a/ecrterm/tests/test_parsing_completion.py b/ecrterm/tests/test_parsing_completion.py new file mode 100644 index 0000000..266d7da --- /dev/null +++ b/ecrterm/tests/test_parsing_completion.py @@ -0,0 +1,21 @@ +from unittest import TestCase, main + +from ecrterm.common import TERMINAL_STATUS_CODES +from ecrterm.ecr import parse_represented_data +from ecrterm.packets.base_packets import Completion + + +class TestParsingCompletion(TestCase): + + def test_completion(self): + packet = '060f89f0f4f04745522d4150502d76322e302e393b635230322e30312e30312d30302e30392d322d323b4343323600065b1f440452500245e4431f400a6356454e4420626f782b1f41284745522d4150502d76322e302e393b635230322e30312e30312d30302e30392d322d323b434332361f420411e930ec1f430100340d1f0e04202306221f0f03085556' + + parsed = parse_represented_data(packet) + self.assertIsInstance(parsed, Completion) + self.assertEqual(parsed.get_serial_number(), '11e930ec') + self.assertEqual(parsed.sw_version, 'GER-APP-v2.0.9;cR02.01.01-00.09-2-2;CC26') + self.assertEqual(TERMINAL_STATUS_CODES.get(parsed.terminal_status), 'PT ready') + + +if __name__ == '__main__': + main() diff --git a/ecrterm/tests/test_parsing_open_reservation_responses.py b/ecrterm/tests/test_parsing_open_reservation_responses.py new file mode 100644 index 0000000..23f3186 --- /dev/null +++ b/ecrterm/tests/test_parsing_open_reservation_responses.py @@ -0,0 +1,44 @@ +from unittest import TestCase, main + +from ecrterm.ecr import parse_represented_data +from ecrterm.packets.base_packets import Abort + + +class TestParsingOpenReservationResponses(TestCase): + + def test_open_reservations_response_no_receipt_numbers(self): + # receipt number FF FF means no open reservations + packet = '06 1E 04 B8 87 FF FF' + + parsed = parse_represented_data(packet) + self.assertIsInstance(parsed, Abort) + self.assertEqual(parsed.get_receipt_numbers(), []) + + def test_open_reservations_response_receipt(self): + packet = '06 1E 04 B8 87 01 23' + + parsed = parse_represented_data(packet) + self.assertIsInstance(parsed, Abort) + self.assertEqual(parsed.get_receipt_numbers(), ['0123']) + + def test_open_reservations_response_tlv(self): + packet = '06 1E 0E B8 06 11 23 09 08 02 01 23 08 02 45 67' + + parsed = parse_represented_data(packet) + self.assertIsInstance(parsed, Abort) + receipt_numbers = parsed.get_receipt_numbers() + + self.assertEqual(['0123', '4567'], receipt_numbers) + + def test_open_reservations_response_receipt_and_tlv(self): + packet = '06 1E 11 B8 87 01 23 06 11 23 09 08 02 45 67 08 02 78 90' + + parsed = parse_represented_data(packet) + self.assertIsInstance(parsed, Abort) + receipt_numbers = parsed.get_receipt_numbers() + + self.assertEqual(['0123', '4567', '7890'], receipt_numbers) + + +if __name__ == '__main__': + main()