From c76a0d161c8fb6ebc9c05c76e9a163f652bf354b Mon Sep 17 00:00:00 2001 From: danielhrisca Date: Fri, 3 Jan 2025 17:51:57 +0200 Subject: [PATCH] barbaric adaptation for the select function --- CMakeLists.txt | 3 + src/asammdf/__init__.py | 3 +- src/asammdf/blocks/cutils.c | 131 ++++++++++++--- src/asammdf/blocks/mdf_v4.py | 11 +- src/asammdf/blocks/utils.py | 6 + src/asammdf/mdf.py | 302 +++++++++++++++++++++++++++++++++++ 6 files changed, 429 insertions(+), 27 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 26981b132..2d0e31992 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,6 +12,9 @@ find_package( ) set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(LIBDEFLATE_COMPRESSION_SUPPORT OFF) +set(LIBDEFLATE_GZIP_SUPPORT OFF) +set(LIBDEFLATE_BUILD_GZIP OFF) # Add submodule libdeflate add_subdirectory(ext/libdeflate EXCLUDE_FROM_ALL) diff --git a/src/asammdf/__init__.py b/src/asammdf/__init__.py index 307bcb2a4..9f627270f 100644 --- a/src/asammdf/__init__.py +++ b/src/asammdf/__init__.py @@ -15,7 +15,7 @@ from .blocks.utils import load_channel_names_from_file from .gui import plot from .mdf import MDF, SUPPORTED_VERSIONS -from .signal import Signal +from .signal import InvalidationArray, Signal from .version import __version__ try: @@ -28,6 +28,7 @@ __all__ = [ "MDF", "SUPPORTED_VERSIONS", + "InvalidationArray", "Signal", "Source", "__cextension__", diff --git a/src/asammdf/blocks/cutils.c b/src/asammdf/blocks/cutils.c index 89bf6f97e..a553c619f 100644 --- a/src/asammdf/blocks/cutils.c +++ b/src/asammdf/blocks/cutils.c @@ -1376,12 +1376,13 @@ static PyObject *get_invalidation_bits_array(PyObject *self, PyObject *args) static PyObject *get_invalidation_bits_array_C(uint8_t * data, int64_t cycles, int64_t invalidation_pos, int64_t invalidation_size) { - if (invalidation_pos<0) { + if (invalidation_pos < 0) { return Py_None; } + else { - PyObject *out; + PyObject *out=NULL; uint8_t mask, *inptr, *outptr; mask = (uint8_t ) (1 << (invalidation_pos % 8)); @@ -1390,6 +1391,7 @@ static PyObject *get_invalidation_bits_array_C(uint8_t * data, int64_t cycles, i npy_intp dims[1]; dims[0] = cycles; out = (PyArrayObject *)PyArray_EMPTY(1, dims, NPY_BOOL, 0); + if (!out) return NULL; outptr = (uint8_t *)PyArray_GETPTR1(out, 0); for (int i=0; i current_uncompressed_size) { - printf("\tThr %d new ptr\n", thread_info->idx); + //printf("\tThr %d new ptr\n", thread_info->idx); if (pUncomp) free(pUncomp); pUncomp = (uint8_t *) malloc(original_size); //if (!pUncomp) printf("\tThr %d pUncomp error\n", thread_info->idx); @@ -2092,13 +2105,13 @@ void * get_channel_raw_bytes_complete_C(void *lpParam ) // reverse transposition if (block_type == 2) { if (current_out_size < original_size) { - printf("\tThr %d new trtrtrptr\n", thread_info->idx); + //printf("\tThr %d new trtrtrptr\n", thread_info->idx); if (pUncompTr) free(pUncompTr); pUncompTr = (uint8_t *) malloc(original_size); //if (!pUncompTr) printf("\tThr %d pUncompTr error\n", thread_info->idx); current_out_size = original_size; } - + start = clock(); read = pUncomp; for (int j = 0; j < (Py_ssize_t)cols; j++) @@ -2112,7 +2125,7 @@ void * get_channel_raw_bytes_complete_C(void *lpParam ) } end = clock(); t7 += end - start; - + data_ptr = pUncompTr; @@ -2156,7 +2169,7 @@ void * get_channel_raw_bytes_complete_C(void *lpParam ) if (pUncomp) free(pUncomp); if (pUncompTr) free(pUncompTr); - printf("t1=%lf t2=%lf t3=%lf t4=%lf t5=%lf t6=%lf t7=%lf\n", t1, t2, t3, t4, t5, t6, t7); + //printf("t1=%lf t2=%lf t3=%lf t4=%lf t5=%lf t6=%lf t7=%lf\n", t1, t2, t3, t4, t5, t6, t7); return 0; } @@ -2164,7 +2177,7 @@ void * get_channel_raw_bytes_complete_C(void *lpParam ) static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args) { Py_ssize_t info_count, signal_count, signal_and_invalidation_count, thread_count=11; - PyObject *data_blocks_info, *signals, *out = NULL, *item, *ref, *obj; + PyObject *data_blocks_info, *signals, *out = NULL, *item, *ref, *obj, *group_index, *InvalidationArray; char *outptr, *file_name; char *read_pos = NULL, *write_pos = NULL; @@ -2186,14 +2199,18 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args) clock_t start, end; double tt=0; - if (!PyArg_ParseTuple(args, "OOsnnn|n", - &data_blocks_info, &signals, &file_name, &cycles, &record_size, &invalidation_bytes, + if (!PyArg_ParseTuple(args, "OOsnnnO|n", + &data_blocks_info, &signals, &file_name, &cycles, &record_size, &invalidation_bytes, &group_index, &thread_count)) { return NULL; } else { + + ref = PyImport_ImportModule("asammdf"); + InvalidationArray = PyObject_GetAttrString(ref, "InvalidationArray"); + Py_XDECREF(ref); //fptr = fopen(file_name,"rb"); TCHAR *lpFileName = TEXT(file_name); HANDLE hFile; @@ -2283,10 +2300,18 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args) if (invalidation_bytes) { signal_and_invalidation_count = signal_count +1; signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * (signal_count + 1)); + if (!signal_info) { + PyErr_SetString(PyExc_ValueError, "Memmory allocation error for signal_info\n\0"); + return NULL; + } } else { signal_and_invalidation_count = signal_count; signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * signal_count); + if (!signal_info) { + PyErr_SetString(PyExc_ValueError, "Memmory allocation error for signal_info\n\0"); + return NULL; + } } for (int i=0; i 5: + if (vv := (perf_counter() - tt)) > 10: print(f"{ss / 1024/1024 / vv:.6f} MB/s {cc=} {vv=}") cc = 0 ss = 0 @@ -7691,6 +7691,7 @@ def _get_scalar( view = f"{channel_dtype.byteorder}i{vals.itemsize}" if dtype(view) != vals.dtype: vals = vals.view(view) + elif channel_type == v4c.CHANNEL_TYPE_VALUE and channel.fast_path is None: channel.fast_path = ( gp_nr, @@ -8392,9 +8393,9 @@ def _yield_selected_signals( fragments = [next(stream) for stream in data_streams] except: break - - if perf_counter() - tt > 120: - x = 1 / 0 + # + # if perf_counter() - tt > 120: + # x = 1 / 0 # prepare the master _master = self.get_master(index, data=fragments[master_index], one_piece=True) diff --git a/src/asammdf/blocks/utils.py b/src/asammdf/blocks/utils.py index f3199f23b..cb639541c 100644 --- a/src/asammdf/blocks/utils.py +++ b/src/asammdf/blocks/utils.py @@ -10,6 +10,7 @@ from io import StringIO import json import logging +import os from pathlib import Path from random import randint import re @@ -26,6 +27,7 @@ import lxml from typing_extensions import Literal, TypedDict +THREAD_COUNT = max(os.cpu_count() - 1, 1) TERMINATED = object() NONE = object() COMPARISON_NAME = re.compile(r"(\s*\d+:)?(?P.+)") @@ -1304,6 +1306,10 @@ def get_signal_data_blocks(self, index: int) -> Iterator[SignalDataBlockInfo]: except StopIteration: break + def load_all_data_blocks(self): + for _ in self.get_data_blocks(): + continue + class VirtualChannelGroup: """starting with MDF v4.20 it is possible to use remote masters and column diff --git a/src/asammdf/mdf.py b/src/asammdf/mdf.py index ce19e405d..940071538 100644 --- a/src/asammdf/mdf.py +++ b/src/asammdf/mdf.py @@ -37,9 +37,11 @@ from .blocks import v2_v3_constants as v23c from .blocks import v4_constants as v4c from .blocks.conversion_utils import from_dict +from .blocks.cutils import get_channel_raw_bytes_complete from .blocks.options import FloatInterpolation, IntegerInterpolation from .blocks.source_utils import Source from .blocks.utils import ( + as_non_byte_sized_signed_int, components, csv_bytearray2hex, csv_int2hex, @@ -56,6 +58,7 @@ randomized_string, SUPPORTED_VERSIONS, TERMINATED, + THREAD_COUNT, UINT16_u, UINT64_u, UniqueDB, @@ -3324,6 +3327,305 @@ def select( """ + def validate_blocks(blocks, record_size): + for block in blocks: + if block.original_size % record_size: + return False + + return True + + if record_offset or record_count is not None or (self.version < "4.00" and self._mapped): + return self._select_fallback( + channels, record_offset, raw, copy_master, ignore_value2text_conversions, record_count, validate + ) + + if isinstance(raw, dict): + if "__default__" not in raw: + raise MdfException("The raw argument given as dict must contain the __default__ key") + + __default__ = raw["__default__"] + raw_dict = True + else: + raw_dict = False + + virtual_groups = self.included_channels(channels=channels, minimal=False, skip_master=False) + for virtual_group, groups in virtual_groups.items(): + if len(self._mdf.virtual_groups[virtual_group].groups) > 1: + return self._select_fallback( + channels, record_offset, raw, copy_master, ignore_value2text_conversions, record_count, validate + ) + + output_signals = {} + + for virtual_group, groups in virtual_groups.items(): + group_index = virtual_group + grp = self._mdf.groups[group_index] + grp.load_all_data_blocks() + blocks = grp.data_blocks + record_size = grp.channel_group.samples_byte_nr + grp.channel_group.invalidation_bytes_nr + cycles_nr = grp.channel_group.cycles_nr + channel_indexes = groups[group_index] + + pairs = [(group_index, ch_index) for ch_index in channel_indexes] + + master_index = self.masters_db.get(group_index, None) + if master_index is None or grp.record[master_index] is None: + return self._select_fallback( + channels, record_offset, raw, copy_master, ignore_value2text_conversions, record_count, validate + ) + + channel = grp.channels[master_index] + master_dtype, byte_size, byte_offset, _ = grp.record[master_index] + signals = [(byte_offset, byte_size, channel.pos_invalidation_bit)] + + for ch_index in channel_indexes: + channel = grp.channels[ch_index] + + if (info := grp.record[ch_index]) is None: + print("NASOl") + return self._select_fallback( + channels, record_offset, raw, copy_master, ignore_value2text_conversions, record_count, validate + ) + else: + _, byte_size, byte_offset, _ = info + signals.append((byte_offset, byte_size, channel.pos_invalidation_bit)) + + raw_and_invalidation = get_channel_raw_bytes_complete( + blocks, + signals, + self._mapped_file.name, + cycles_nr, + record_size, + grp.channel_group.invalidation_bytes_nr, + THREAD_COUNT, + ) + master_bytes, _ = raw_and_invalidation[0] + raw_and_invalidation = raw_and_invalidation[1:] + + # prepare the master + master = np.frombuffer(master_bytes, dtype=master_dtype) + + # fake invalidation_bytes + invalidation_bytes = bytes(grp.channel_group.invalidation_bytes_nr * cycles_nr) + + for pair, (raw_data, invalidation_bits) in zip(pairs, raw_and_invalidation): + ch_index = pair[-1] + channel = grp.channels[ch_index] + channel_dtype, byte_size, byte_offset, bit_offset = grp.record[ch_index] + vals = np.frombuffer(raw_data, dtype=channel_dtype) + + data_type = channel.data_type + + if not channel.standard_C_size: + size = byte_size + + if channel_dtype.byteorder == "=" and data_type in ( + v4c.DATA_TYPE_SIGNED_MOTOROLA, + v4c.DATA_TYPE_UNSIGNED_MOTOROLA, + ): + view = np.dtype(f">u{vals.itemsize}") + else: + view = np.dtype(f"{channel_dtype.byteorder}u{vals.itemsize}") + + if view != vals.dtype: + vals = vals.view(view) + + if bit_offset: + vals >>= bit_offset + + if channel.bit_count != size * 8: + if data_type in v4c.SIGNED_INT: + vals = as_non_byte_sized_signed_int(vals, channel.bit_count) + else: + mask = (1 << channel.bit_count) - 1 + vals &= mask + elif data_type in v4c.SIGNED_INT: + view = f"{channel_dtype.byteorder}i{vals.itemsize}" + if np.dtype(view) != vals.dtype: + vals = vals.view(view) + + conversion = channel.conversion + unit = (conversion and conversion.unit) or channel.unit + + source = channel.source + + if source: + source = Source.from_source(source) + else: + cg_source = grp.channel_group.acq_source + if cg_source: + source = Source.from_source(cg_source) + else: + source = None + + master_metadata = self._master_channel_metadata.get(group_index, None) + + output_signals[pair] = Signal( + samples=vals, + timestamps=master, + unit=unit, + name=channel.name, + comment=channel.comment, + conversion=conversion, + raw=True, + master_metadata=master_metadata, + attachment=None, + source=source, + display_names=channel.display_names, + bit_count=channel.bit_count, + flags=Signal.Flags.no_flags, + invalidation_bits=invalidation_bits, + encoding=None, + group_index=group_index, + channel_index=ch_index, + ) + + indexes = [] + + for item in channels: + if not isinstance(item, (list, tuple)): + item = [item] + indexes.append(self._validate_channel_selection(*item)) + + signals = [output_signals[pair] for pair in indexes] + + if copy_master: + for signal in signals: + signal.timestamps = signal.timestamps.copy() + + for signal in signals: + if (raw_dict and not raw.get(signal.name, __default__)) or (not raw_dict and not raw): + conversion = signal.conversion + if conversion: + samples = conversion.convert( + signal.samples, ignore_value2text_conversions=ignore_value2text_conversions + ) + signal.samples = samples + + signal.raw = False + signal.conversion = None + if signal.samples.dtype.kind == "S": + signal.encoding = "utf-8" if self.version >= "4.00" else "latin-1" + + if validate: + signals = [sig.validate(copy=False) for sig in signals] + + for signal, channel in zip(signals, channels): + if isinstance(channel, str): + signal.name = channel + else: + name = channel[0] + if name is not None: + signal.name = name + + unique = set() + for i, signal in enumerate(signals): + obj_id = id(signal) + if id(signal) in unique: + signals[i] = signal.copy() + unique.add(obj_id) + + return signals + + def _select_fallback( + self, + channels: ChannelsType, + record_offset: int = 0, + raw: bool | dict[str, bool] = False, + copy_master: bool = True, + ignore_value2text_conversions: bool = False, + record_count: int | None = None, + validate: bool = False, + ) -> list[Signal]: + """retrieve the channels listed in *channels* argument as *Signal* + objects + + .. note:: the *dataframe* argument was removed in version 5.8.0 + use the ``to_dataframe`` method instead + + Parameters + ---------- + channels : list + list of items to be filtered; each item can be : + + * a channel name string + * (channel name, group index, channel index) list or tuple + * (channel name, group index) list or tuple + * (None, group index, channel index) list or tuple + + record_offset : int + record number offset; optimization to get the last part of signal samples + raw : bool | dict[str, bool] + get raw channel samples; default *False* + + .. versionchanged:: 8.0.0 + + provide individual raw mode based on a dict. If the parameters is given + as dict then it must contain the key ``__default__`` with the default raw value. The dict keys + are the channel names and the values are the boolean raw values for each channel. + + copy_master : bool + option to get a new timestamps array for each selected Signal or to + use a shared array for channels of the same channel group; default *True* + ignore_value2text_conversions (False) : bool + valid only for the channels that have value to text conversions and + if *raw=False*. If this is True then the raw numeric values will be + used, and the conversion will not be applied. + + .. versionchanged:: 5.8.0 + + validate (False) : bool + consider the invalidation bits + + .. versionadded:: 5.16.0 + + Returns + ------- + signals : list + list of *Signal* objects based on the input channel list + + Examples + -------- + >>> from asammdf import MDF, Signal + >>> import numpy as np + >>> t = np.arange(5) + >>> s = np.ones(5) + >>> mdf = MDF() + >>> for i in range(4): + ... sigs = [Signal(s*(i*10+j), t, name='SIG') for j in range(1,4)] + ... mdf.append(sigs) + ... + >>> # select SIG group 0 default index 1 default, SIG group 3 index 1, SIG group 2 index 1 default and channel index 2 from group 1 + ... + >>> mdf.select(['SIG', ('SIG', 3, 1), ['SIG', 2], (None, 1, 2)]) + [ + , + , + , + ] + + """ + if isinstance(raw, dict): if "__default__" not in raw: raise MdfException("The raw argument given as dict must contain the __default__ key")