diff --git a/sdtfile/sdtfile.py b/sdtfile/sdtfile.py index 77d00b8..9855c66 100644 --- a/sdtfile/sdtfile.py +++ b/sdtfile/sdtfile.py @@ -41,7 +41,7 @@ :Author: `Christoph Gohlke `_ :License: BSD 3-Clause -:Version: 2024.11.24 +:Version: 2024.12.6 :DOI: `10.5281/zenodo.10125608 `_ Quickstart @@ -63,12 +63,20 @@ This revision was tested with the following requirements and dependencies (other versions may work): -- `CPython `_ 3.10.11, 3.11.9, 3.12.7, 3.13.0 +- `CPython `_ 3.10.11, 3.11.9, 3.12.8, 3.13.1 - `Numpy `_ 2.1.3 Revisions --------- +2024.12.6 + +- Fix read MeasureInfo fields as scalars (breaking). +- Update some structure field names with BH reference (breaking). +- Parse some SetupBlock binary structures (#7). +- Include more information in str(SdtFile). +- Add subtype to FileRevision. + 2024.11.24 - Update MEASURE_INFO_EXT struct to SPCM v.9.9 (breaking). @@ -110,7 +118,7 @@ 588 >>> sdt.info.id[1:-1] 'SPC Setup & Data File' ->>> int(sdt.measure_info[0].scan_x[0]) +>>> int(sdt.measure_info[0].scan_x) 128 >>> len(sdt.data) 1 @@ -128,6 +136,8 @@ (1, 1024) >>> sdt.times[3].shape (1024,) +>>> int(sdt.setup.bh_bin_hdr['soft_rev']) +850 Read image data from a "SPC FCS Data File" as numpy array: @@ -145,7 +155,7 @@ from __future__ import annotations -__version__ = '2024.11.24' +__version__ = '2024.12.6' __all__ = [ '__version__', @@ -159,6 +169,7 @@ import io import os +import struct import zipfile from typing import TYPE_CHECKING @@ -169,6 +180,10 @@ from numpy.typing import NDArray + Record = list[tuple[str, str]] + Record1 = list[tuple[str, str | Record]] + Record2 = list[tuple[str, str | Record | Record1]] + class SdtFile: """Becker & Hickl SDT file. @@ -255,7 +270,7 @@ def _fromfile(self, fh: BinaryIO, /) -> None: # read measurement description blocks self.measure_info = [] - dtype = struct_dtype( + dtype = record_dtype( MEASURE_INFO, int(self.header.meas_desc_block_length) ) fh.seek(self.header.meas_desc_block_offs) @@ -267,7 +282,7 @@ def _fromfile(self, fh: BinaryIO, /) -> None: dtype=dtype, shape=1, byteorder='<', - ) + )[0] ) fh.seek(self.header.meas_desc_block_length - dtype.itemsize, 1) @@ -312,27 +327,27 @@ def _fromfile(self, fh: BinaryIO, /) -> None: # the following works with DECAY, IMG, MCS, PAGE # assume adc_re is always present - adc_re = int(mi.adc_re[0]) + adc_re = int(mi.adc_re) # the following fields may not be present try: - scan_x = int(mi.scan_x[0]) - scan_y = int(mi.scan_y[0]) + scan_x = int(mi.scan_x) + scan_y = int(mi.scan_y) except AttributeError: scan_x = 0 scan_y = 0 try: - image_x = int(mi.image_x[0]) - image_y = int(mi.image_y[0]) + image_x = int(mi.image_x) + image_y = int(mi.image_y) except AttributeError: image_x = 0 image_y = 0 try: - mcs_points = mi.MeasHISTInfo.mcs_points[0] + mcs_points = mi.MeasHISTInfo.mcs_points except AttributeError: mcs_points = -1 try: - mcs_time = mi.MeasHISTInfo.mcs_time[0] + mcs_time = mi.MeasHISTInfo.mcs_time except AttributeError: mcs_time = 0 @@ -354,7 +369,7 @@ def _fromfile(self, fh: BinaryIO, /) -> None: else: # generate time axis time = numpy.arange(adc_re, dtype=numpy.float64) - time *= mi.tac_r / (float(mi.tac_g[0]) * adc_re) + time *= mi.tac_r / (float(mi.tac_g) * adc_re) self.times.append(time) offset = bh.next_block_offs @@ -384,15 +399,22 @@ def __str__(self) -> str: repr(self), # os.path.normpath(os.path.normcase(self.filename)), FileRevision(self.header.revision), + record_str('header', self.header), indent('info:', self.info.strip()), - # indent('header:', self.header), - # indent('measure_info:', *self.measure_info), - # indent('block_headers:', *self.block_headers), + *( + record_str(f'measure_info[{i}]', rec) + for i, rec in enumerate(self.measure_info) + ), + *( + record_str(f'block_headers[{i}]', rec) + for i, rec in enumerate(self.block_headers) + ), indent( 'blocktypes:', *(BlockType(i.block_type) for i in self.block_headers), ), indent('shapes:', *(i.shape for i in self.data)), + indent('setup:', self.setup), ) @@ -430,27 +452,158 @@ class SetupBlock: """ - __slots__ = ('ascii', 'binary') - ascii: str """ASCII data.""" - binary: bytes | None + binary: bytes """Binary data.""" + bh_bin_hdr: numpy.recarray[Any, Any] | None + """BHBinHdr structure.""" + + spc_bin_hdr: numpy.recarray[Any, Any] | None + """SPCBinHdr structure.""" + + _offset: int + """Offset of structures in binary.""" + def __init__(self, value: bytes, /) -> None: assert value.startswith(b'*SETUP') and value.strip().endswith(b'*END') + + self.ascii = '' + self.binary = b'' + self.bh_bin_hdr = None + self.spc_bin_hdr = None + self._offset = 0 + i = value.find(b'BIN_PARA_BEGIN') - if i: + if i >= 0: self.ascii = value[:i].decode('windows-1250') - self.binary = value[i:] # [i + 15 : -10] - # TODO: parse binary data here + self.binary = value[i:] else: self.ascii = value.decode('windows-1250') - self.binary = None + + if len(self.binary) >= 20: + self.binary_size = struct.unpack('= 20 + dtype1.itemsize: + self.bh_bin_hdr = numpy.rec.fromstring( + # type: ignore[call-overload] + self.binary, + dtype=dtype1, + shape=1, + offset=self._offset, + byteorder='<', + )[0] + + dtype2 = numpy.dtype(SETUP_BIN_SPCHDR) + if len(self.binary) >= 20 + dtype1.itemsize + dtype2.itemsize: + self.spc_bin_hdr = numpy.rec.fromstring( + # type: ignore[call-overload] + self.binary, + dtype=dtype2, + shape=1, + offset=self._offset + dtype1.itemsize, + byteorder='<', + )[0] + + def _read_record( + self, + record: numpy.recarray[Any, Any] | None, + name: str, + dtype: Record | Record1 | Record2, + /, + truncate: bool = True, + warn: bool = False, + ) -> numpy.recarray[Any, Any] | None: + """Return record referenced in SETUP_BIN_SPCHDR/EXT.""" + try: + if ( + record is None + or record[name + '_offs'] == 0 + or record[name + '_size'] == 0 + ): + return None + except ValueError: + # no field of name + return None + + try: + number: slice | int = slice(int(record[name + '_number'])) + except ValueError: + number = 0 + + size = int(record[name + '_size']) + if truncate: + dtype_ = record_dtype(dtype, size) + else: + dtype_ = numpy.dtype(dtype) + if dtype_.itemsize != size: + if warn: + log_warning( + f'{name}_size={size} does not match ' + f'structure size={dtype_.itemsize}' + ) + return None + + return numpy.rec.fromstring( + # type: ignore[call-overload] + self.binary, + dtype=dtype_, + shape=1, + offset=self._offset + record[name + '_offs'], + byteorder='<', + )[number] + + @property + def spc_bin_hdr_ext(self) -> numpy.recarray[Any, Any] | None: + """SPCBinHdrExt structure.""" + return self._read_record( + self.spc_bin_hdr, 'binhdrext', SETUP_BIN_SPCHDREXT + ) + + @property + def gvd_param(self) -> numpy.recarray[Any, Any] | None: + """GVDParam structure.""" + try: + size = int(self.spc_bin_hdr['GVD_size']) # type: ignore[index] + except (TypeError, ValueError): + return None + + dtype = SETUP_BIN_GVDPARAM.copy() + if size == 110: + dtype[2] = ('gvd_data', SETUP_BIN_GVDDATA[:-1]) + # elif size == 86: + # TODO + # elif size == 84: + # TODO + # elif size == 64: + # TODO + + return self._read_record( + self.spc_bin_hdr, 'GVD', dtype, truncate=False, warn=False + ) + + # TODO: parse more binary data structures here + + def __repr__(self) -> str: + return f'<{self.__class__.__name__}>' def __str__(self) -> str: - return self.ascii + if not self.binary: + return self.ascii + return '\n'.join( + ( + # repr(self), + record_str('bh_bin_hdr', self.bh_bin_hdr), + record_str('spc_bin_hdr', self.spc_bin_hdr), + record_str('spc_bin_hdr_ext', self.spc_bin_hdr_ext), + record_str('gvd_param', self.gvd_param), + indent('ascii:', self.ascii), + ) + ) class BlockNo: @@ -526,13 +679,16 @@ class FileRevision: """ - __slots__ = ('revision', 'module') + __slots__ = ('revision', 'module', 'subtype') revision: int - """Revision.""" + """Software revision.""" module: str - """Module.""" + """BH module type.""" + + subtype: str + """BH module subtype.""" def __init__(self, value: int, /) -> None: self.revision = value & 0b1111 @@ -564,6 +720,10 @@ def __init__(self, value: int, /) -> None: 0x8B: 'SPC-QC-104', 0x8C: 'SPC-QC-004', }.get((value & 0xFF0) >> 4, 'Unknown') + self.subtype = { + 0x0: 'None', + 0x1: 'SPC-150NX-12', + }.get(value >> 12, 'Unknown') def __repr__(self) -> str: return ( @@ -571,7 +731,7 @@ def __repr__(self) -> str: ) -FILE_HEADER: list[tuple[str, str]] = [ +FILE_HEADER: Record = [ ('revision', 'i2'), ('info_offs', 'i4'), ('info_length', 'i2'), @@ -589,15 +749,134 @@ def __repr__(self) -> str: ('chksum', 'u2'), ] -SETUP_BIN_HDR: list[tuple[str, str]] = [ + +# BHBinHdr +SETUP_BIN_HDR: Record = [ ('soft_rev', 'u4'), ('para_length', 'u4'), ('reserved1', 'u4'), ('reserved2', 'u2'), ] + +# SPCBinHdr +SETUP_BIN_SPCHDR: Record = [ + ('FCS_old_offs', 'u4'), + ('FCS_old_size', 'u4'), + ('gr1_offs', 'u4'), + ('gr1_size', 'u4'), + ('FCS_offs', 'u4'), + ('FCS_size', 'u4'), + ('FIDA_offs', 'u4'), + ('FIDA_size', 'u4'), + ('FILDA_offs', 'u4'), + ('FILDA_size', 'u4'), + ('gr2_offs', 'u4'), + ('gr_no', 'u2'), + ('hst_no', 'u2'), + ('hst_offs', 'u4'), + ('GVD_offs', 'u4'), + ('GVD_size', 'u2'), + ('FIT_offs', 'u2'), + ('FIT_size', 'u2'), + ('extdev_offs', 'u2'), + ('extdev_size', 'u2'), + ('binhdrext_offs', 'u4'), + ('binhdrext_size', 'u2'), +] + + +# SPCBinHdrExt +SETUP_BIN_SPCHDREXT: Record = [ + ('MCS_img_offs', 'u4'), + ('MCS_img_size', 'u4'), + ('mom_no', 'u2'), + ('MOM_size', 'u2'), + ('mom_offs', 'u4'), + ('syspar_ext_offs', 'u4'), + ('syspar_ext_size', 'u4'), + ('mosaic_offs', 'u4'), + ('mosaic_size', 'u4'), + ('WF_img_offs', 'u4'), + ('WF_img_size', 'u2'), + ('WndLayout_offs', 'u4'), + ('WndLayout_size', 'u2'), + ('trpar_ext_offs', 'u4'), + ('trpar_ext_size', 'u2'), + ('CorPar_offs', 'u4'), + ('CorPar_size', 'u2'), + ('CorPar_number', 'u2'), + ('LifeTrPar_offs', 'u4'), + ('LifeTrPar_size', 'u2'), + ('LifeTrPar_number', 'u2'), + ('HST3DPar_offs', 'u4'), + ('HST3DPar_size', 'u2'), + ('HST3DPar_number', 'u2'), + ('dcc_ext_offs', 'u4'), + ('dcc_ext_size', 'u2'), + ('extension', 'S162'), +] + + +SETUP_BIN_BHPANELATTR: Record = [ + ('top', 'i4'), + ('left', 'i4'), + ('height', 'i4'), + ('width', 'i4'), + ('flag', 'i4'), +] + +SETUP_BIN_BHPANELDATA: Record1 = [ + ('status', 'i4'), + ('left', 'i4'), + ('data', SETUP_BIN_BHPANELATTR), +] + +# Definitions for GVD-120/140 module +SETUP_BIN_GVDDATA: Record = [ + ('active', 'i2'), + ('frame_size', 'u2'), + ('lasers_active', 'i2'), + ('multiplex', 'u2'), + ('limit_scan', 'i2'), + ('frame_counter', 'u2'), + ('scan_polarity', 'u2'), + ('scan_type', 'i2'), + ('line_time', 'f4'), + ('zoom_factor', 'f4'), + ('offset_x', 'f4'), + ('offset_y', 'f4'), + ('park_offs_x', 'f4'), + ('park_offs_y', 'f4'), + ('l1_power', 'f4'), + ('l2_power', 'f4'), + ('rect_zoom_x', 'f4'), + ('rect_zoom_y', 'f4'), + ('scan_rate', 'i2'), + ('park_center', 'i2'), + ('scan_trigger', 'i2'), + ('l3_power', 'f4'), + ('l4_power', 'f4'), + ('multiplex2', 'u2'), + ('multiplex3', 'u2'), + ('lasers_active1_4', 'u2'), + ('control2', 'u2'), + ('sreserve', 'i2'), +] + + +SETUP_BIN_GVDPARAM: Record1 = [ + ('active', 'i4'), + ('dcs_and_beam_blank', 'u4'), + ('gvd_data', SETUP_BIN_GVDDATA), + ('DAC_per_step', 'u2'), + ('line_pulse_shift', 'i2'), + ('pnl_attr', SETUP_BIN_BHPANELATTR), +] + + # Info collected when measurement finished -MEASURE_STOP_INFO: list[tuple[str, str]] = [ +MEASURE_STOP_INFO: Record = [ ('status', 'u2'), ('flags', 'u2'), ('stop_time', 'f4'), @@ -616,8 +895,9 @@ def __repr__(self) -> str: ('reserved2', 'f4'), ] + # Info collected when FIFO measurement finished -MEASURE_FCS_INFO: list[tuple[str, str]] = [ +MEASURE_FCS_INFO: Record = [ ('chan', 'u2'), ('fcs_decay_calc', 'u2'), ('mt_resol', 'u4'), @@ -633,8 +913,9 @@ def __repr__(self) -> str: ('cross_mt_resol', 'u4'), ] + # Extension of MeasFCSInfo for other histograms -HIST_INFO: list[tuple[str, str]] = [ +HIST_INFO: Record = [ ('fida_time', 'f4'), ('filda_time', 'f4'), ('fida_points', 'i4'), @@ -650,7 +931,8 @@ def __repr__(self) -> str: ('reserved3', 'u4'), ] -HIST_INFO_EXT: list[tuple[str, str]] = [ + +HIST_INFO_EXT: Record = [ ('first_frame_time', 'f4'), ('frame_time', 'f4'), ('line_time', 'f4'), @@ -661,7 +943,8 @@ def __repr__(self) -> str: ('info', 'S40'), ] -MEASURE_INFO_EXT: list[tuple[str, str]] = [ + +MEASURE_INFO_EXT: Record = [ ('DCU_in_use', 'u4'), ('dcu_ser_no', '4S16'), ('scope_name', 'S32'), @@ -680,8 +963,9 @@ def __repr__(self) -> str: ('reserve', 'S1249'), ] + # Measurement description blocks -MEASURE_INFO: list[tuple[str, str | list[tuple[str, str]]]] = [ +MEASURE_INFO: Record2 = [ ('time', 'S9'), ('date', 'S11'), ('mod_ser_no', 'S16'), @@ -713,7 +997,7 @@ def __repr__(self) -> str: ('dither', 'i2'), ('incr', 'i2'), ('mem_bank', 'i2'), - ('mod', 'S16'), + ('mod_type', 'S16'), ('syn_th', 'f4'), ('dead_time_comp', 'i2'), ('polarity_l', 'i2'), @@ -734,7 +1018,7 @@ def __repr__(self) -> str: ('scan_ry', 'i4'), ('fifo_typ', 'i2'), ('epx_div', 'i4'), - ('mod_code', 'u2'), + ('mod_type_code', 'u2'), ('mod_fpga_ver', 'u2'), ('overflow_corr_factor', 'f4'), ('adc_zoom', 'i4'), @@ -746,9 +1030,9 @@ def __repr__(self) -> str: ('image_rx', 'i4'), ('image_ry', 'i4'), ('xy_gain', 'i2'), - ('master_clock', 'i2'), + ('dig_flags', 'i2'), ('adc_de', 'i2'), - ('det', 'i2'), + ('det_type', 'i2'), ('x_axis', 'i2'), ('MeasHISTInfo', HIST_INFO), ('HISTInfoExt', HIST_INFO_EXT), @@ -777,7 +1061,8 @@ def __repr__(self) -> str: ('minfo_ext', MEASURE_INFO_EXT), ] -BLOCK_HEADER_OLD: list[tuple[str, str]] = [ + +BLOCK_HEADER_OLD: Record = [ ('block_no', 'i2'), ('data_offs', 'i4'), ('next_block_offs', 'i4'), @@ -787,7 +1072,8 @@ def __repr__(self) -> str: ('block_length', 'u4'), ] -BLOCK_HEADER: list[tuple[str, str]] = [ + +BLOCK_HEADER: Record = [ ('data_offs_ext', 'u1'), ('next_block_offs_ext', 'u1'), ('data_offs', 'u4'), @@ -798,6 +1084,7 @@ def __repr__(self) -> str: ('block_length', 'u4'), ] + # Mode of creation BLOCK_CREATION: dict[int, str] = { 0: 'NOT_USED', @@ -847,33 +1134,54 @@ def __repr__(self) -> str: } -def struct_dtype( - struct: list[tuple[str, str | list[tuple[str, str]]]], size: int, / +def record_dtype( + record: Record | Record1 | Record2, size: int, / ) -> numpy.dtype[Any]: - """Return numpy dtype for struct not exceeding size bytes.""" + """Return numpy dtype for record not exceeding size bytes.""" assert size > 0 - fields = len(struct) - dtype = numpy.dtype(struct) + fields = len(record) + dtype = numpy.dtype(record) while dtype.itemsize > size and fields > 0: - # last_dtype = struct[-1][1] + # last_dtype = record[-1][1] # if ( # isinstance(last_dtype, list) # and dtype.itemsize - size < numpy.dtype(last_dtype).itemsize # ): - # struct = struct.copy() - # struct[-1] = ( - # struct[-1][0], - # struct_dtype(last_dtype, dtype.itemsize - size).descr + # record = record.copy() + # record[-1] = ( + # record[-1][0], + # record_dtype(last_dtype, dtype.itemsize - size).descr # ) - # dtype = numpy.dtype(struct) + # dtype = numpy.dtype(record) # break fields -= 1 - dtype = numpy.dtype(struct[:fields]) + dtype = numpy.dtype(record[:fields]) # if dtype.itemsize != size: - # log_warning(f'struct size {dtype.itemsize} != {size}') + # log_warning(f'record size {dtype.itemsize} != {size}') return dtype +def record_str(name: str, record: numpy.recarray[Any, Any] | None) -> str: + """Return numpy recarray formatted as string.""" + if record is None: + return f'{name}: None' + lines = [] + for descr in record.dtype.descr: + key, dtype = descr[:2] + value = record[key] + if isinstance(dtype, list): + lines.append(record_str(key, value)) + elif dtype.startswith('|S'): + if isinstance(value, numpy.ndarray): + value = [stripnull(v.tobytes()) for v in value.flat] + lines.append(f'{key}: {value}') + else: + lines.append(f'{key}: {bytes(stripnull(value))!r}') + else: + lines.append(f'{key}: {value}') + return indent(f'{name}:', *lines) + + def indent(*args: Any) -> str: """Return joined string representations of objects with indented lines.""" text = '\n'.join(str(arg) for arg in args) @@ -882,6 +1190,39 @@ def indent(*args: Any) -> str: )[2:] +def stripnull( + string: bytes, + /, + null: bytes | None = None, + *, + first: bool = True, +) -> bytes: + r"""Return string truncated at first null character. + + Use to clean NULL terminated C strings. + + >>> stripnull(b'bytes\x00\x00') + b'bytes' + >>> stripnull(b'bytes\x00bytes\x00\x00', first=False) + b'bytes\x00bytes' + + """ + if null is None: + null = b'\x00' + if first: + i = string.find(null) + return string if i < 0 else string[:i] + null_ = null[0] + i = len(string) + while i: + i -= 1 + if string[i] != null_: + break + else: + i = -1 + return string[: i + 1] + + def log_warning(msg: object, *args: object, **kwargs: Any) -> None: """Log message with level WARNING.""" import logging @@ -897,8 +1238,17 @@ def log_warning(msg: object, *args: object, **kwargs: Any) -> None: assert numpy.dtype(FILE_HEADER).itemsize == 42 # BH_HDR_LENGTH assert numpy.dtype(MEASURE_INFO).itemsize == 2048 assert numpy.dtype(MEASURE_INFO_EXT).itemsize == 1536 + assert numpy.dtype(MEASURE_STOP_INFO).itemsize == 60 + assert numpy.dtype(MEASURE_FCS_INFO).itemsize == 38 assert numpy.dtype(HIST_INFO).itemsize == 48 assert numpy.dtype(HIST_INFO_EXT).itemsize == 64 - assert numpy.dtype(MEASURE_FCS_INFO).itemsize == 38 - assert numpy.dtype(MEASURE_STOP_INFO).itemsize == 60 assert numpy.dtype(SETUP_BIN_HDR).itemsize == 14 + assert numpy.dtype(SETUP_BIN_SPCHDR).itemsize == 72 + assert numpy.dtype(SETUP_BIN_SPCHDREXT).itemsize == 242 + assert numpy.dtype(SETUP_BIN_GVDDATA).itemsize == 80 + assert numpy.dtype(SETUP_BIN_GVDPARAM).itemsize == 112 + assert numpy.dtype(SETUP_BIN_BHPANELATTR).itemsize == 20 + assert numpy.dtype(BLOCK_HEADER_OLD).itemsize == 22 + assert numpy.dtype(BLOCK_HEADER).itemsize == 22 + +# mypy: disable-error-code="no-any-return"