Skip to content

Commit

Permalink
Merge pull request #10 from wirelane/feature/enquire_if_pre_authoriza…
Browse files Browse the repository at this point in the history
…tions_exist

add support for OpenReservationsEnquiry (Enquire if Pre-Authorisations exist (06 23))
  • Loading branch information
bkonetzny authored Jun 29, 2023
2 parents 9e6d57b + 5d5f7c3 commit 6d5f7b1
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 17 deletions.
25 changes: 24 additions & 1 deletion ecrterm/ecr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- ability for incoming and outgoing
"""
import logging
import string
from time import sleep

from ecrterm.common import TERMINAL_STATUS_CODES
Expand All @@ -16,7 +17,8 @@
from ecrterm.packets.base_packets import (
Authorisation, CloseCardSession, Completion, DisplayText, EndOfDay, Initialisation, Packet,
PrintLine, ReadCard, Registration, ReservationBookTotal, ReservationPartialReversal,
ReservationRequest, ResetTerminal, SetTerminalID, StatusEnquiry, StatusInformation, WriteFiles)
ReservationRequest, ResetTerminal, SetTerminalID, StatusEnquiry, StatusInformation, WriteFiles,
OpenReservationsEnquiry)
from ecrterm.packets.types import (ConfigByte, CurrencyCode)
from ecrterm.transmission._transmission import Transmission
from ecrterm.transmission.signals import ACK, DLE, ETX, NAK, STX, TRANSMIT_OK
Expand Down Expand Up @@ -98,6 +100,7 @@ class ECR(object):
MAX_TEXT_LINES = 4
_state_registered = None
_state_connected = None
_status = None

def __init__(self, device='/dev/ttyUSB0', password='123456'):
"""
Expand Down Expand Up @@ -310,6 +313,8 @@ def status(self):
# try to get version
if not self.version:
self.version = self.last.completion.get('sw_version', None)

self._status = self.last.completion.terminal_status
return self.last.completion.status_byte
# no completion means some error.
return False
Expand Down Expand Up @@ -358,6 +363,21 @@ def reverse_reservation(self, receipt_no, amount_cent=50, tlv=[], listener=None)

return self._send_packet(packet, listener)

def get_open_reservations(self, listener=None):
"""
Fetches the receipt-number of the first pre-authorisation not yet reversed by a partial-reversal / book total.
Note: The receipt-number is returned in an Abort package (06 1E).
If no open reservation exist in the PT, ‘FFFF’ is returned as receipt-number.
Instead of a single receipt-number the PT can also transmit a receipt-number list as a TLV-container.
However, for this the ECR must have sent a BMP 06 in the triggering command or in the registration.
@returns: True on success, False on failure
"""
packet = OpenReservationsEnquiry()

return self._send_packet(packet, listener)

def book_reservation(self, receipt_no, amount_cent=50, tlv=[], listener=None):
"""
executes a reservation booking for receipt with used amount in cents.
Expand Down Expand Up @@ -470,6 +490,9 @@ def read_card(self, timeout=1, read_card_args={}):
def close_card(self):
return self.transmit(CloseCardSession())

def get_human_readable_status(self) -> string:
return TERMINAL_STATUS_CODES.get(self._status, 'Unknown Status')


if __name__ == '__main__':
logging.basicConfig(level=9, filename='./terminallog.txt', filemode='aw')
Expand Down
41 changes: 41 additions & 0 deletions ecrterm/packets/base_packets.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ class Completion(Packet):

ALLOWED_BITMAPS = ['tlv', 'status_byte', 'tid', 'currency_code']

def get_serial_number(self):
serial_number = None
tlv = self.get('tlv')
if tlv is not None:
device_information = tlv.get_value('xE4', [])
for item in device_information:
if item.tag_ == 0x1F42:
serial_number = item.value_

return serial_number


class Abort(Packet):
"""
Expand All @@ -206,6 +217,22 @@ class Abort(Packet):
result_code = ByteField()
# FIXME error_code

def get_receipt_numbers(self) -> List[str]:
receipt_numbers = []
if self.get('receipt') is not None and self.get('receipt') != 'ffff':
receipt_numbers.append(self.get('receipt'))

tlv = self.get('tlv')
if tlv is not None:
receipt_numbers_tlv_list = tlv.get_value('x23', [])
for receipt_number_tlv in receipt_numbers_tlv_list:
if receipt_number_tlv.tag_ == 8 \
and receipt_number_tlv.value_ != 'ffff' \
and receipt_number_tlv.value_ not in receipt_numbers:
receipt_numbers.append(receipt_number_tlv.value_)

return receipt_numbers


class StatusInformation(Packet):
"""
Expand Down Expand Up @@ -564,6 +591,20 @@ class ReservationPartialReversal(Packet):
'aid', 'tlv']


class OpenReservationsEnquiry(Packet):
"""
06 23 03 87 FF FF
"""
CMD_CLASS = 0x06
CMD_INSTR = 0x23
wait_for_completion = True

def __init__(self, *args, **kwargs):
super().__init__(*args, **dict(kwargs, receipt='FFFF'))

ALLOWED_BITMAPS = ['receipt']


class ReservationBookTotal(Packet):
"""
06 24
Expand Down
19 changes: 13 additions & 6 deletions ecrterm/packets/fields.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import string
from enum import Enum
from typing import Any, Union, List, Optional, Tuple

Expand Down Expand Up @@ -25,7 +26,7 @@ def __init__(self, required=True, ignore_parse_error=False, data_type=None, name
self.ignore_parse_error = ignore_parse_error
self.data_type = data_type or self.DATA_TYPE
self.name = name # Only used in TLV
super().__init__(*args, **kwargs)
super().__init__()

def from_bytes(self, v: Union[bytes, List[int]]) -> Any:
return v
Expand Down Expand Up @@ -81,7 +82,8 @@ def serialize(self, data: Any) -> bytes:

def validate(self, data: Any) -> None:
if len(self.to_bytes(data)) != self.length:
raise ValueError("Field must be exactly {} bytes long".format(self.length))
raise ValueError("Field must be exactly {} bytes long (got {} bytes)"
.format(self.length, len(self.to_bytes(data))))


class LVARField(Field):
Expand Down Expand Up @@ -212,8 +214,9 @@ def from_bytes(self, v: Union[bytes, List[int]]) -> str:
return bytearray(v).hex()

def to_bytes(self, v: str, length: Optional[int] = None) -> bytes:
if not v.isdigit():
raise ValueError("BCD field contents can only be numeric")
# Note: we need to allow pseudo-tetrades - e.g. for open reservation enquiry (06 23 03 87 FF FF)
if any(x not in string.hexdigits for x in v):
raise ValueError("BCD field contents can only be hexdigits")

if len(v) % 2 != 0:
v = '0' + v
Expand All @@ -222,8 +225,9 @@ def to_bytes(self, v: str, length: Optional[int] = None) -> bytes:

def validate(self, data: str) -> None:
super().validate(data)
if not str(data).isdigit():
raise ValueError("BCD field contents can only be numeric")
# Note: we need to allow pseudo-tetrades - e.g. for open reservation enquiry (06 23 03 87 FF FF)
if any(x not in string.hexdigits for x in data):
raise ValueError("BCD field contents can only be hexdigits")


class BCDField(BCDVariableLengthField, FixedLengthField):
Expand Down Expand Up @@ -311,10 +315,13 @@ def __get__(self, instance, objtype=None) -> TLV:
'zvt', {
None: BytesField(),
0x07: StringField(name="text_line"),
0x08: BCDField(name='receipt', length=2),
0x14: FlagByteField(name="character_set", data_type=CharacterSet),
0x15: StringField(name="language_code", character_set=CharacterSet.ASCII_7BIT),
0x23: ContainerType(name='receipt-numbers'),
0x1d: BEIntField(name='file_id', length=1),
0x1e: BEIntField(name='start_position', length=4),
0x40: BytesField(name='emv_config'),
0x1f00: BEIntField(name='file_size', length=4),
0x1f10: FlagByteField(name="cardholder_identification", data_type=CardholderIdentification),
0x1f11: FlagByteField(name='online_tag', data_type=OnlineTag),
Expand Down
31 changes: 22 additions & 9 deletions ecrterm/packets/tlv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import string
from enum import IntEnum
from typing import Union, Tuple, TypeVar, Type, List, Dict, Tuple, Any, Optional
from typing import Union, TypeVar, Type, List, Dict, Tuple, Any, Optional
from .context import CurrentContext, enter_context
from .types import VendorQuirks

Expand All @@ -22,8 +22,9 @@ def __repr__(self):

NOT_PROVIDED = NotProvided()


_FIRST_PARAM_TYPE = Union[NotProvided, TLVType, Tuple[Union[TLVType, List, Tuple]], List[Union[TLVType, List, Tuple]], Dict[Union[int, str], Any]]
_FIRST_PARAM_TYPE = Union[
NotProvided, TLVType, Tuple[Union[TLVType, List, Tuple]], List[Union[TLVType, List, Tuple]], Dict[
Union[int, str], Any]]


class TLV:
Expand Down Expand Up @@ -74,14 +75,16 @@ def _make_tlv_length(length: int) -> bytes:
length = length >> 8
retval.append(0x80 | len(retval))
return bytes(reversed(retval))

# </editor-fold>

def __new__(cls: Type[TLVType], constructed_value_: _FIRST_PARAM_TYPE = NOT_PROVIDED, *args, **kwargs):
if isinstance(constructed_value_, TLV):
return constructed_value_
return super().__new__(cls)

def __init__(self, constructed_value_: _FIRST_PARAM_TYPE =NOT_PROVIDED, tag_=None, value_=NOT_PROVIDED, implicit_=False, **kwargs):
def __init__(self, constructed_value_: _FIRST_PARAM_TYPE = NOT_PROVIDED, tag_=None, value_=NOT_PROVIDED,
implicit_=False, **kwargs):
if isinstance(constructed_value_, TLV):
return # __new__ handled this

Expand Down Expand Up @@ -141,6 +144,7 @@ def tag_(self, value):

if not self._constructed:
self._type = active_dictionary.get(value, active_dictionary[None])

# </editor-fold>

# <editor-fold desc="constructed/class accessors">
Expand All @@ -151,6 +155,7 @@ def constructed_(self):
@property
def class_(self):
return self._class

# </editor-fold>

# <editor-fold desc="value accessors">
Expand All @@ -165,7 +170,7 @@ def value_(self, value):
if value is not None:
self._implicit = False
if self._constructed:
if isinstance(value, (tuple,list)):
if isinstance(value, (tuple, list)):
self._value = []
for item in value:
if isinstance(item, TLV):
Expand Down Expand Up @@ -193,6 +198,7 @@ def value_(self, value):
self._value = self._type.from_bytes(value)
else:
self._value = value

# </editor-fold>

def __getattr__(self, key):
Expand Down Expand Up @@ -257,7 +263,7 @@ def items_(self):
raise TypeError("Cannot access items_ of primitive TLV")
retval = []
for v in self.value_:
retval.append( (v.name_, v.value_) )
retval.append((v.name_, v.value_))
return retval

def append_(self, key, value, overwrite=False):
Expand Down Expand Up @@ -298,7 +304,8 @@ def _serialize_value(self) -> bytes:
return bytes(retval)

@classmethod
def parse(cls: Type[TLVType], data: bytes, empty_tag: bool = False, dictionary: Optional[str]=None) -> Tuple[TLVType, bytes]:
def parse(cls: Type[TLVType], data: bytes, empty_tag: bool = False, dictionary: Optional[str] = None) \
-> Tuple[TLVType, bytes]:
pos = 0

if empty_tag:
Expand All @@ -308,7 +315,7 @@ def parse(cls: Type[TLVType], data: bytes, empty_tag: bool = False, dictionary:

length, pos = cls._read_tlv_length(data, pos)

value_ = data[pos:(pos+length)]
value_ = data[pos:(pos + length)]
pos = pos + length

if dictionary is not None:
Expand All @@ -332,9 +339,16 @@ def serialize(self) -> bytes:

return bytes(retval) + d

def get_value(self, key, default=None):
for item in self.items_:
if item[0] == key:
return item[1]
return default


class TLVDataType:
name = None

def __init__(self, name=None, *args, **kwargs):
if name is not None:
self.name = name
Expand Down Expand Up @@ -377,7 +391,6 @@ class ContainerType(TLVDataType):
},
)


# FIXME store active dictionary in container
# FIXME Tag value assignment with non-byte data
# FIXME dictionary tag names (set and get)
Expand Down
7 changes: 6 additions & 1 deletion ecrterm/tests/test_apdu.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ecrterm.packets.apdu import CommandAPDU, ParseError
from ecrterm.packets.fields import ByteField, BytesField, BCDIntField
from ecrterm.packets.base_packets import LogOff, Initialisation, Registration, DisplayText, PrintLine, Authorisation, \
WriteFiles
WriteFiles, OpenReservationsEnquiry
from unittest import TestCase, main


Expand Down Expand Up @@ -163,6 +163,11 @@ def test_create_tlv(self):
c2.tlv.xf2.xc1 = b'\x12\x23'
self.assertEqual(bytearray.fromhex('0601080606f204c1021223'), c2.serialize())

def test_create_open_reservations_enquiry_packet(self):
packet = OpenReservationsEnquiry()

self.assertEqual(bytearray.fromhex('06230387FFFF'), packet.serialize())

def test_override_bitmaps(self):
c = CommandAPDU.parse(bytearray.fromhex('ffaa040602ffaa'))

Expand Down
21 changes: 21 additions & 0 deletions ecrterm/tests/test_parsing_completion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from unittest import TestCase, main

from ecrterm.common import TERMINAL_STATUS_CODES
from ecrterm.ecr import parse_represented_data
from ecrterm.packets.base_packets import Completion


class TestParsingCompletion(TestCase):

def test_completion(self):
packet = '060f89f0f4f04745522d4150502d76322e302e393b635230322e30312e30312d30302e30392d322d323b4343323600065b1f440452500245e4431f400a6356454e4420626f782b1f41284745522d4150502d76322e302e393b635230322e30312e30312d30302e30392d322d323b434332361f420411e930ec1f430100340d1f0e04202306221f0f03085556'

parsed = parse_represented_data(packet)
self.assertIsInstance(parsed, Completion)
self.assertEqual(parsed.get_serial_number(), '11e930ec')
self.assertEqual(parsed.sw_version, 'GER-APP-v2.0.9;cR02.01.01-00.09-2-2;CC26')
self.assertEqual(TERMINAL_STATUS_CODES.get(parsed.terminal_status), 'PT ready')


if __name__ == '__main__':
main()
Loading

0 comments on commit 6d5f7b1

Please sign in to comment.