Skip to content

Commit

Permalink
barbaric adaptation for the select function
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhrisca committed Jan 3, 2025
1 parent 70b80cd commit c76a0d1
Show file tree
Hide file tree
Showing 6 changed files with 429 additions and 27 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ find_package(
)

set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(LIBDEFLATE_COMPRESSION_SUPPORT OFF)
set(LIBDEFLATE_GZIP_SUPPORT OFF)
set(LIBDEFLATE_BUILD_GZIP OFF)

# Add submodule libdeflate
add_subdirectory(ext/libdeflate EXCLUDE_FROM_ALL)
Expand Down
3 changes: 2 additions & 1 deletion src/asammdf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .blocks.utils import load_channel_names_from_file
from .gui import plot
from .mdf import MDF, SUPPORTED_VERSIONS
from .signal import Signal
from .signal import InvalidationArray, Signal
from .version import __version__

try:
Expand All @@ -28,6 +28,7 @@
__all__ = [
"MDF",
"SUPPORTED_VERSIONS",
"InvalidationArray",
"Signal",
"Source",
"__cextension__",
Expand Down
131 changes: 110 additions & 21 deletions src/asammdf/blocks/cutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1376,12 +1376,13 @@ static PyObject *get_invalidation_bits_array(PyObject *self, PyObject *args)

static PyObject *get_invalidation_bits_array_C(uint8_t * data, int64_t cycles, int64_t invalidation_pos, int64_t invalidation_size)
{
if (invalidation_pos<0) {
if (invalidation_pos < 0) {
return Py_None;
}

else {

PyObject *out;
PyObject *out=NULL;
uint8_t mask, *inptr, *outptr;

mask = (uint8_t ) (1 << (invalidation_pos % 8));
Expand All @@ -1390,6 +1391,7 @@ static PyObject *get_invalidation_bits_array_C(uint8_t * data, int64_t cycles, i
npy_intp dims[1];
dims[0] = cycles;
out = (PyArrayObject *)PyArray_EMPTY(1, dims, NPY_BOOL, 0);
if (!out) return NULL;
outptr = (uint8_t *)PyArray_GETPTR1(out, 0);

for (int i=0; i<cycles; i++) {
Expand Down Expand Up @@ -1688,6 +1690,7 @@ static PyObject *data_block_from_arrays(PyObject *self, PyObject *args)
cycles, step = 0;
Py_ssize_t isize = 0, offset = 0,byte_count;
int is_list;
uint8_t* inptr;

PMYDATA pDataArray;
PMyChannelInfo ch_info;
Expand Down Expand Up @@ -1753,14 +1756,24 @@ static PyObject *data_block_from_arrays(PyObject *self, PyObject *args)
byte_count = PyLong_AsSsize_t(PyTuple_GetItem(item, 1));
}

if (!PyArray_IS_C_CONTIGUOUS(array))
{
copy_array = PyArray_NewCopy((PyArrayObject *)array, NPY_CORDER);
array = copy_array;
copy_array = NULL;
if (PyByteArray_Check(array)) {
inptr = (uint8_t *) PyByteArray_AsString(array);
}
else if (PyBytes_Check(array)) {
inptr = (uint8_t *) PyBytes_AsString(array);
}
else {
if (!PyArray_IS_C_CONTIGUOUS(array))
{
copy_array = PyArray_NewCopy((PyArrayObject *)array, NPY_CORDER);
array = copy_array;
copy_array = NULL;
}
inptr = (uint8_t *)PyArray_BYTES((PyArrayObject *)array);
}

pDataArray[i].inptr = (uint8_t *)PyArray_BYTES((PyArrayObject *)array);

pDataArray[i].inptr = inptr;
pDataArray[i].cycles = cycles;
pDataArray[i].byte_offset = total_size;
pDataArray[i].byte_count = byte_count;
Expand Down Expand Up @@ -2073,7 +2086,7 @@ void * get_channel_raw_bytes_complete_C(void *lpParam )

// decompress
if (original_size > current_uncompressed_size) {
printf("\tThr %d new ptr\n", thread_info->idx);
//printf("\tThr %d new ptr\n", thread_info->idx);
if (pUncomp) free(pUncomp);
pUncomp = (uint8_t *) malloc(original_size);
//if (!pUncomp) printf("\tThr %d pUncomp error\n", thread_info->idx);
Expand All @@ -2092,13 +2105,13 @@ void * get_channel_raw_bytes_complete_C(void *lpParam )
// reverse transposition
if (block_type == 2) {
if (current_out_size < original_size) {
printf("\tThr %d new trtrtrptr\n", thread_info->idx);
//printf("\tThr %d new trtrtrptr\n", thread_info->idx);
if (pUncompTr) free(pUncompTr);
pUncompTr = (uint8_t *) malloc(original_size);
//if (!pUncompTr) printf("\tThr %d pUncompTr error\n", thread_info->idx);
current_out_size = original_size;
}

start = clock();
read = pUncomp;
for (int j = 0; j < (Py_ssize_t)cols; j++)
Expand All @@ -2112,7 +2125,7 @@ void * get_channel_raw_bytes_complete_C(void *lpParam )
}
end = clock();
t7 += end - start;


data_ptr = pUncompTr;

Expand Down Expand Up @@ -2156,15 +2169,15 @@ void * get_channel_raw_bytes_complete_C(void *lpParam )

if (pUncomp) free(pUncomp);
if (pUncompTr) free(pUncompTr);
printf("t1=%lf t2=%lf t3=%lf t4=%lf t5=%lf t6=%lf t7=%lf\n", t1, t2, t3, t4, t5, t6, t7);
//printf("t1=%lf t2=%lf t3=%lf t4=%lf t5=%lf t6=%lf t7=%lf\n", t1, t2, t3, t4, t5, t6, t7);
return 0;
}


static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
{
Py_ssize_t info_count, signal_count, signal_and_invalidation_count, thread_count=11;
PyObject *data_blocks_info, *signals, *out = NULL, *item, *ref, *obj;
PyObject *data_blocks_info, *signals, *out = NULL, *item, *ref, *obj, *group_index, *InvalidationArray;

char *outptr, *file_name;
char *read_pos = NULL, *write_pos = NULL;
Expand All @@ -2186,14 +2199,18 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
clock_t start, end;
double tt=0;

if (!PyArg_ParseTuple(args, "OOsnnn|n",
&data_blocks_info, &signals, &file_name, &cycles, &record_size, &invalidation_bytes,
if (!PyArg_ParseTuple(args, "OOsnnnO|n",
&data_blocks_info, &signals, &file_name, &cycles, &record_size, &invalidation_bytes, &group_index,
&thread_count))
{
return NULL;
}
else
{

ref = PyImport_ImportModule("asammdf");
InvalidationArray = PyObject_GetAttrString(ref, "InvalidationArray");
Py_XDECREF(ref);
//fptr = fopen(file_name,"rb");
TCHAR *lpFileName = TEXT(file_name);
HANDLE hFile;
Expand Down Expand Up @@ -2283,10 +2300,18 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
if (invalidation_bytes) {
signal_and_invalidation_count = signal_count +1;
signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * (signal_count + 1));
if (!signal_info) {
PyErr_SetString(PyExc_ValueError, "Memmory allocation error for signal_info\n\0");
return NULL;
}
}
else {
signal_and_invalidation_count = signal_count;
signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * signal_count);
if (!signal_info) {
PyErr_SetString(PyExc_ValueError, "Memmory allocation error for signal_info\n\0");
return NULL;
}
}
for (int i=0; i<signal_count; i++) {
if (is_list) {
Expand All @@ -2308,6 +2333,7 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
}

obj = PyByteArray_FromStringAndSize(NULL, byte_count * cycles);
if (!obj) return NULL;

signal_info[i].byte_offset = byte_offset;
signal_info[i].byte_count = byte_count;
Expand All @@ -2319,6 +2345,7 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)

if (invalidation_bytes) {
obj = PyByteArray_FromStringAndSize(NULL, invalidation_bytes * cycles);
if (!obj) return NULL;
signal_info[signal_count].byte_offset = record_size - invalidation_bytes;
signal_info[signal_count].byte_count = invalidation_bytes;
signal_info[signal_count].invalidation_bit_position = -1;
Expand All @@ -2341,7 +2368,15 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
thread_count = info_count;
}
block_info = (PtrInfoBlock) malloc(sizeof(InfoBlock) * info_count);
if (!block_info) {
PyErr_SetString(PyExc_ValueError, "Memmory allocation error for block_info\n\0");
return NULL;
}
thread_info = (PtrProcessesingBlock) malloc(sizeof(ProcessesingBlock) * thread_count);
if (!thread_info) {
PyErr_SetString(PyExc_ValueError, "Memmory allocation error for thread_info\n\0");
return NULL;
}

for (int i=0; i<info_count; i++) {

Expand Down Expand Up @@ -2430,6 +2465,10 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
thread_info[i].block_ready_lock = block_ready_locks[i];
#endif
thread_info[i].inptr = (uint8_t *) malloc(max_compressed);
if (!thread_info[i].inptr) {
PyErr_SetString(PyExc_ValueError, "Memmory allocation error for thread_info[i].inptr\n\0");
return NULL;
}
thread_info[i].block_info = NULL;
thread_info[i].signals = signal_info;
thread_info[i].signal_count = signal_and_invalidation_count;
Expand All @@ -2440,7 +2479,7 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
thread_info[i].bytes_ready = bytes_ready[i];
}

printf("%d threads %d blocks %d cycles %d size\n", thread_count, info_count, cycles, cycles * byte_count);
//printf("%d threads %d blocks %d cycles %d size\n", thread_count, info_count, cycles, cycles * byte_count);

#if defined(_WIN32)
for (int i=0; i< thread_count; i++) {
Expand All @@ -2452,10 +2491,18 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)
0,
&dwThreadIdArray[i]
);
if (!hThreads[i]) {
PyErr_SetString(PyExc_ValueError, "Failed to create processing thread\n\0");
return NULL;
}
}
#else
for (int i=0; i< thread_count; i++) {
pthread_create(&(dwThreadIdArray[i]), NULL, get_channel_raw_bytes_complete_C, &thread_info[i]);
if (pthread_create(&(dwThreadIdArray[i]), NULL, get_channel_raw_bytes_complete_C, &thread_info[i]))
{
PyErr_SetString(PyExc_ValueError, "Failed to create processing thread\n\0");
return NULL;
}
}
#endif

Expand Down Expand Up @@ -2502,7 +2549,7 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)

}

printf("TT=%lf\n", tt);
//printf("TT=%lf\n", tt);

for (int i=0; i<thread_count; i++) {
thread = &thread_info[position];
Expand Down Expand Up @@ -2556,32 +2603,74 @@ static PyObject *get_channel_raw_bytes_complete(PyObject *self, PyObject *args)

//fclose(fptr);

printf("tuples\n");
PyObject *inv, *inv_array, *origin;

out = PyTuple_New(signal_count);
if (!out) return NULL;

uint8_t * invalidation_data = NULL;
if (invalidation_bytes) {
invalidation_data = signal_info[signal_count].data;
}

PyObject **cache = malloc(sizeof(PyObject *) * invalidation_bytes * 8);
if (!cache) {
PyErr_SetString(PyExc_ValueError, "Failed to allocate memory for cache\n\0");
return NULL;
}
for (int i=0; i< invalidation_bytes * 8; i++) cache[i] = NULL;

for (int i=0; i<signal_count; i++) {
printf("signal %d\n", i);
ref = PyTuple_New(2);
if (!ref) return NULL;
PyTuple_SetItem(ref, 0, signal_info[i].obj);
if (invalidation_data) {
PyTuple_SetItem(ref, 1, get_invalidation_bits_array_C(invalidation_data, cycles, signal_info[i].invalidation_bit_position, invalidation_bytes));
if (cache[signal_info[i].invalidation_bit_position < 0)
PyTuple_SetItem(ref, 1, Py_None);
else {

if (!cache[signal_info[i].invalidation_bit_position]) {
inv = get_invalidation_bits_array_C(invalidation_data, cycles, signal_info[i].invalidation_bit_position, invalidation_bytes);
if (!inv) return NULL;
origin = PyTuple_New(2);
if (!origin) return NULL;
PyTuple_SetItem(origin, 0, Py_NewRef(group_index));
PyTuple_SetItem(origin, 1, PyLong_FromLong(signal_info[i].invalidation_bit_position));
inv_array = PyObject_CallFunction(
InvalidationArray,
"OO",
inv, origin);
if (!inv_array) return NULL;
Py_XDECREF(inv);
cache[signal_info[i].invalidation_bit_position] = inv_array;
Py_XDECREF(origin);
}

PyTuple_SetItem(ref, 1, Py_NewRef(cache[signal_info[i].invalidation_bit_position]));
}
}
else {
PyTuple_SetItem(ref, 1, Py_None);
}
PyTuple_SetItem(out, i, ref);
}

for (int i=0; i< invalidation_bytes * 8; i++) {
if (cache[i]) {
Py_XDECREF(cache[i]);
cache[i] = NULL;
}
}
free(cache);

free(signal_info);
free(block_ready);
free(bytes_ready);
free(dwThreadIdArray);

Py_XDECREF(InvalidationArray);

#if defined(_WIN32)
free(hThreads);
#else
Expand Down
11 changes: 6 additions & 5 deletions src/asammdf/blocks/mdf_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
MdfException,
SignalDataBlockInfo,
TERMINATED,
THREAD_COUNT,
UINT8_uf,
UINT16_uf,
UINT32_p,
Expand Down Expand Up @@ -165,7 +166,6 @@
# 100 extra steps for the sorting, 1 step after sorting and 1 step at finish
SORT_STEPS = 102
DATA_IS_CHANNEL_BYTES = [-2, -2]
THREAD_COUNT = max(os.cpu_count() - 1, 1)


logger = logging.getLogger("asammdf")
Expand Down Expand Up @@ -1706,7 +1706,7 @@ def _load_data(
invalidation_data.append(new_invalidation_data)
cur_invalidation_size += inv_size

if (vv := (perf_counter() - tt)) > 5:
if (vv := (perf_counter() - tt)) > 10:
print(f"{ss / 1024/1024 / vv:.6f} MB/s {cc=} {vv=}")
cc = 0
ss = 0
Expand Down Expand Up @@ -7691,6 +7691,7 @@ def _get_scalar(
view = f"{channel_dtype.byteorder}i{vals.itemsize}"
if dtype(view) != vals.dtype:
vals = vals.view(view)

elif channel_type == v4c.CHANNEL_TYPE_VALUE and channel.fast_path is None:
channel.fast_path = (
gp_nr,
Expand Down Expand Up @@ -8392,9 +8393,9 @@ def _yield_selected_signals(
fragments = [next(stream) for stream in data_streams]
except:
break

if perf_counter() - tt > 120:
x = 1 / 0
#
# if perf_counter() - tt > 120:
# x = 1 / 0

# prepare the master
_master = self.get_master(index, data=fragments[master_index], one_piece=True)
Expand Down
Loading

0 comments on commit c76a0d1

Please sign in to comment.