From d51ed312270d07f33650787c7f16c6cfc08e6afb Mon Sep 17 00:00:00 2001 From: pnorbert Date: Fri, 30 Aug 2024 11:02:19 -0400 Subject: [PATCH] Fix in python API: (#4331) * Fix in python API: a. BlocksInfo(name, step) b. stream.write() sets selection if either 'start' or 'count' is provided (fix for local arrays). * format * black (cherry picked from commit fa5d3aa780952f07ca1da5ff64cb94b5be7d74dd) --- bindings/Python/py11Engine.cpp | 3 +- python/adios2/stream.py | 2 +- .../TestBPWriteTypesHighLevelAPILocal.py | 106 ++++++++++++++---- 3 files changed, 89 insertions(+), 22 deletions(-) diff --git a/bindings/Python/py11Engine.cpp b/bindings/Python/py11Engine.cpp index 8787bd1419..090c18f6d9 100644 --- a/bindings/Python/py11Engine.cpp +++ b/bindings/Python/py11Engine.cpp @@ -280,7 +280,8 @@ std::vector> Engine::BlocksInfo(std::string & MinVarInfo *minBlocksInfo = nullptr; auto Variable = itVariable->second.get(); - minBlocksInfo = m_Engine->MinBlocksInfo(*Variable, 0); + size_t s = (m_Engine->m_OpenMode == Mode::ReadRandomAccess ? step : 0); + minBlocksInfo = m_Engine->MinBlocksInfo(*Variable, s); if (minBlocksInfo) { for (auto &info : minBlocksInfo->BlocksInfo) diff --git a/python/adios2/stream.py b/python/adios2/stream.py index 58a9c09050..840e4883ea 100644 --- a/python/adios2/stream.py +++ b/python/adios2/stream.py @@ -321,7 +321,7 @@ def _(self, name, content, shape=[], start=[], count=[], operations=None): if shape != [] and not variable.single_value(): variable.set_shape(shape) - if start != [] and count != []: + if start != [] or count != []: variable.set_selection([start, count]) if operations: diff --git a/testing/adios2/python/TestBPWriteTypesHighLevelAPILocal.py b/testing/adios2/python/TestBPWriteTypesHighLevelAPILocal.py index d4eafcb259..7893e9b766 100644 --- a/testing/adios2/python/TestBPWriteTypesHighLevelAPILocal.py +++ b/testing/adios2/python/TestBPWriteTypesHighLevelAPILocal.py @@ -26,19 +26,30 @@ def check_array(np1, np2, hint): # Test data data = SmallTestData(rank) -nx = data.Nx - -shape = [] -start = [] -count = [nx] - # Writer with Stream("types_np_local.bp", "w", comm=comm) as s: - for step in s.steps(5): - data.update(rank, step.current_step(), size) + for _ in s.steps(5): + step = s.current_step() + nx = data.Nx - rank - step + if nx < 1: + nx = data.Nx + shape = [] + start = [] + count = [nx] + + if step == 0 and size > 1 and rank == size - 1: + continue + + data.update(rank, step, size) s.write("varI8", data.i8, shape, start, count) s.write("varI16", data.i16, shape, start, count) + v = s.inquire_variable("varI16") + print( + f"step {step} rank {rank} nx {nx} count = {v.count()} data.I16 = {data.i16[:nx]}", + flush=True, + ) + s.write("varI32", data.i32, shape, start, count) s.write("varI64", data.i64, shape, start, count) s.write("varU8", data.u8, shape, start, count) @@ -53,9 +64,21 @@ def check_array(np1, np2, hint): # Reader data = SmallTestData(rank) +if rank == 0: + print("====================================", flush=True) + print("Read data block by block", flush=True) +comm.Barrier() + with Stream("types_np_local.bp", "r", comm=comm) as s: for fr_step in s.steps(): step = fr_step.current_step() + nx = data.Nx - rank - step + if nx < 1: + nx = data.Nx + + if step == 0 and size > 1 and rank == size - 1: + continue + data.update(rank, step, size) indataI8 = fr_step.read("varI8", block_id=rank) indataI16 = fr_step.read("varI16", block_id=rank) @@ -70,15 +93,58 @@ def check_array(np1, np2, hint): in_int_list = fr_step.read("an_int_list", block_id=rank) - print(f"step {step} rank {rank} I16={indataI16} data.I16 = {data.i16}", flush=True) - - check_array(indataI8, data.i8, "i8") - check_array(indataI16, data.i16, "i16") - check_array(indataI32, data.i32, "i32") - check_array(indataI64, data.i64, "i64") - check_array(indataU8, data.u8, "u8") - check_array(indataU16, data.u16, "u16") - check_array(indataU32, data.u32, "u32") - check_array(indataU64, data.u64, "u64") - check_array(indataR32, data.r32, "r32") - check_array(indataR64, data.r64, "r64") + print( + f"step {step} rank {rank} nx {nx} I16={indataI16} data.I16 = {data.i16[:nx]}", + flush=True, + ) + + check_array(indataI8, data.i8[:nx], "i8") + check_array(indataI16, data.i16[:nx], "i16") + check_array(indataI32, data.i32[:nx], "i32") + check_array(indataI64, data.i64[:nx], "i64") + check_array(indataU8, data.u8[:nx], "u8") + check_array(indataU16, data.u16[:nx], "u16") + check_array(indataU32, data.u32[:nx], "u32") + check_array(indataU64, data.u64[:nx], "u64") + check_array(indataR32, data.r32[:nx], "r32") + check_array(indataR64, data.r64[:nx], "r64") + +if rank == 0: + print("====================================", flush=True) + print("Test blocks_info", flush=True) + with Stream("types_np_local.bp", "rra") as f: + v = f.inquire_variable("varI16") + for step in range(v.steps()): + bis = f.engine.blocks_info("varI16", step) + expected_num_blocks = size + if step == 0 and size > 1: + expected_num_blocks = size - 1 + print( + f"step {step} rank {rank} number of blocks expected = {expected_num_blocks}" + f", reported = {len(bis)}", + flush=True, + ) + if len(bis) != expected_num_blocks: + raise ValueError( + f"Expected number of blocks in step {step} " + f"is {expected_num_blocks} but we found " + f"only {len(bis)} blocks." + ) + + for r in range(size): + nx = data.Nx - r - step + if nx < 1: + nx = data.Nx + if step == 0 and size > 1 and r == size - 1: + continue + cnt = int(bis[r]["Count"]) + print(f" block {r} size expected = {nx}, reported = {cnt}", flush=True) + if nx != cnt: + raise ValueError( + f"Expected size of block in step {step}, rank {r} " + f"is {nx} but we got reported to have " + f"{cnt} elements." + ) + + +comm.Barrier()