From 27180505824de6dd1a27183391546c3de32e6ba2 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 3 Feb 2023 06:45:37 -0500 Subject: [PATCH] Added support for Joined Arrays in the BP4 format and engine. Declaration: auto var = outIO.DefineVariable(table", {adios2::JoinedDim, Ncols}, {}, {1, Ncols}); Setting block size: var.SetSelection({{}, {Nrows[block], Ncols}}); --- .../basics/joinedArray/joinedArray_write.cpp | 8 +- source/adios2/core/VariableBase.h | 2 + source/adios2/toolkit/format/bp/BPBase.cpp | 3 +- source/adios2/toolkit/format/bp/BPBase.h | 2 + source/adios2/toolkit/format/bp/BPBase.tcc | 42 +++- .../adios2/toolkit/format/bp/BPSerializer.cpp | 38 ++- .../toolkit/format/bp/bp3/BP3Deserializer.tcc | 59 +++-- .../toolkit/format/bp/bp4/BP4Deserializer.tcc | 172 +++++++++++-- .../toolkit/format/bp/bp4/BP4Serializer.cpp | 8 +- testing/adios2/engine/bp/CMakeLists.txt | 3 + .../adios2/engine/bp/TestBPJoinedArray.cpp | 226 ++++++++++++++++++ 11 files changed, 492 insertions(+), 71 deletions(-) create mode 100644 testing/adios2/engine/bp/TestBPJoinedArray.cpp diff --git a/examples/basics/joinedArray/joinedArray_write.cpp b/examples/basics/joinedArray/joinedArray_write.cpp index 15eef59ec9..fd34596b8f 100644 --- a/examples/basics/joinedArray/joinedArray_write.cpp +++ b/examples/basics/joinedArray/joinedArray_write.cpp @@ -49,7 +49,7 @@ int main(int argc, char *argv[]) MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nproc); #endif - const int NSTEPS = 5; + const int NSTEPS = 3; // generate different random numbers on each process, // but always the same sequence at each run @@ -73,6 +73,7 @@ int main(int argc, char *argv[]) // Get io settings from the config file or // create one with default settings here adios2::IO io = adios.DeclareIO("Output"); + io.SetEngine("BP4"); /* * Define joinable local array: type, name, global and local size @@ -82,6 +83,9 @@ int main(int argc, char *argv[]) adios2::Variable varTable = io.DefineVariable( "table", {adios2::JoinedDim, Ncols}, {}, {Nrows, Ncols}); + // adios2::Operator op = adios.DefineOperator("blosc", "blosc"); + // varTable.AddOperation(op, {{"clevel", std::to_string(1)}}); + // Open file. "w" means we overwrite any existing file on disk, // but Advance() will append steps to the same file. adios2::Engine writer = io.Open("joinedArray.bp", adios2::Mode::Write); @@ -95,7 +99,7 @@ int main(int argc, char *argv[]) for (unsigned int col = 0; col < Ncols; col++) { mytable[row * Ncols + col] = static_cast( - rank * 1.0 + row * 0.1 + col * 0.01); + step * 10.0 + rank * 1.0 + row * 0.1 + col * 0.01); } } diff --git a/source/adios2/core/VariableBase.h b/source/adios2/core/VariableBase.h index 1b29e958ff..a1576460d7 100644 --- a/source/adios2/core/VariableBase.h +++ b/source/adios2/core/VariableBase.h @@ -91,6 +91,8 @@ class VariableBase size_t m_StepsStart = 0; size_t m_StepsCount = 1; + size_t m_JoinedDimPos = 0; // the joined dimension in a JoinedArray + /** Index Metadata Position in a serial metadata buffer */ size_t m_IndexStart = 0; diff --git a/source/adios2/toolkit/format/bp/BPBase.cpp b/source/adios2/toolkit/format/bp/BPBase.cpp index 5a8b0d0efc..cde687b3af 100644 --- a/source/adios2/toolkit/format/bp/BPBase.cpp +++ b/source/adios2/toolkit/format/bp/BPBase.cpp @@ -482,7 +482,8 @@ BPBase::TransformTypeEnum(const std::string transformType) const noexcept template BPBase::Characteristics \ BPBase::ReadElementIndexCharacteristics(const std::vector &, \ size_t &, const BPBase::DataTypes, \ - const bool, const bool) const; + size_t &, const bool, const bool) \ + const; ADIOS2_FOREACH_STDTYPE_1ARG(declare_template_instantiation) #undef declare_template_instantiation diff --git a/source/adios2/toolkit/format/bp/BPBase.h b/source/adios2/toolkit/format/bp/BPBase.h index 8d35dd6c69..28a7e06b2c 100644 --- a/source/adios2/toolkit/format/bp/BPBase.h +++ b/source/adios2/toolkit/format/bp/BPBase.h @@ -621,6 +621,7 @@ class BPBase Characteristics ReadElementIndexCharacteristics(const std::vector &buffer, size_t &position, const DataTypes dataType, + size_t &joinedArrayShapePos, const bool untilTimeStep = false, const bool isLittleEndian = true) const; @@ -644,6 +645,7 @@ class BPBase const DataTypes dataType, const bool untilTimeStep, Characteristics &characteristics, + size_t &joinedArrayShapePos, const bool isLittleEndian = true) const; }; diff --git a/source/adios2/toolkit/format/bp/BPBase.tcc b/source/adios2/toolkit/format/bp/BPBase.tcc index 94c99d4793..5978f42d0c 100644 --- a/source/adios2/toolkit/format/bp/BPBase.tcc +++ b/source/adios2/toolkit/format/bp/BPBase.tcc @@ -25,7 +25,8 @@ namespace format template BPBase::Characteristics BPBase::ReadElementIndexCharacteristics( const std::vector &buffer, size_t &position, const DataTypes dataType, - const bool untilTimeStep, const bool isLittleEndian) const + size_t &joinedArrayShapePos, const bool untilTimeStep, + const bool isLittleEndian) const { Characteristics characteristics; characteristics.EntryCount = @@ -34,21 +35,21 @@ BPBase::Characteristics BPBase::ReadElementIndexCharacteristics( helper::ReadValue(buffer, position, isLittleEndian); ParseCharacteristics(buffer, position, dataType, untilTimeStep, - characteristics, isLittleEndian); + characteristics, joinedArrayShapePos, isLittleEndian); return characteristics; } // String specialization template <> -inline void -BPBase::ParseCharacteristics(const std::vector &buffer, size_t &position, - const DataTypes dataType, const bool untilTimeStep, - Characteristics &characteristics, - const bool isLittleEndian) const +inline void BPBase::ParseCharacteristics( + const std::vector &buffer, size_t &position, const DataTypes dataType, + const bool untilTimeStep, Characteristics &characteristics, + size_t &joinedArrayShapePos, const bool isLittleEndian) const { const size_t start = position; size_t localPosition = 0; + joinedArrayShapePos = 0; // irrelevant here bool foundTimeStep = false; @@ -195,12 +196,14 @@ inline void BPBase::ParseCharacteristics(const std::vector &buffer, const DataTypes /*dataType*/, const bool untilTimeStep, Characteristics &characteristics, + size_t &joinedArrayShapePos, const bool isLittleEndian) const { const size_t start = position; size_t localPosition = 0; bool foundTimeStep = false; + bool foundJoinedDim = false; size_t dimensionsSize = 0; // get it from dimensions characteristics while (localPosition < characteristics.EntryLength) @@ -356,9 +359,25 @@ inline void BPBase::ParseCharacteristics(const std::vector &buffer, static_cast(helper::ReadValue( buffer, position, isLittleEndian))); - characteristics.Shape.push_back( + uint64_t shape = static_cast(helper::ReadValue( - buffer, position, isLittleEndian))); + buffer, position, isLittleEndian)); + characteristics.Shape.push_back(shape); + + if (shape == JoinedDim) + { + if (foundJoinedDim) + { + helper::Throw( + "Toolkit", "format::bp::BPBase", + "ParseCharacteristics", + "Invalid Joined Array definition with multiple " + "JoinedDim in Shape."); + } + foundJoinedDim = true; + // this is the Joined Array Start value that must be updated + joinedArrayShapePos = position; + } characteristics.Start.push_back( static_cast(helper::ReadValue( @@ -395,9 +414,12 @@ inline void BPBase::ParseCharacteristics(const std::vector &buffer, characteristics.Count.clear(); characteristics.EntryShapeID = ShapeID::GlobalValue; } + else if (foundJoinedDim) + { + characteristics.EntryShapeID = ShapeID::JoinedArray; + } else { - // TODO joined dimension characteristics.EntryShapeID = ShapeID::GlobalArray; } diff --git a/source/adios2/toolkit/format/bp/BPSerializer.cpp b/source/adios2/toolkit/format/bp/BPSerializer.cpp index 434fda571c..7e06171dba 100644 --- a/source/adios2/toolkit/format/bp/BPSerializer.cpp +++ b/source/adios2/toolkit/format/bp/BPSerializer.cpp @@ -150,16 +150,25 @@ void BPSerializer::PutDimensionsRecord(const Dims &localDimensions, const Dims &offsets, std::vector &buffer) noexcept { - if (offsets.empty()) - { + if (offsets.empty() && globalDimensions.empty()) + { // local array for (const auto localDimension : localDimensions) { helper::InsertU64(buffer, localDimension); buffer.insert(buffer.end(), 2 * sizeof(uint64_t), '\0'); } } + else if (offsets.empty()) + { // joined array has no offsets but has global dims + for (unsigned int d = 0; d < localDimensions.size(); ++d) + { + helper::InsertU64(buffer, localDimensions[d]); + helper::InsertU64(buffer, globalDimensions[d]); + buffer.insert(buffer.end(), sizeof(uint64_t), '\0'); + } + } else - { + { // global array for (unsigned int d = 0; d < localDimensions.size(); ++d) { helper::InsertU64(buffer, localDimensions[d]); @@ -191,7 +200,7 @@ void BPSerializer::PutDimensionsRecord(const Dims &localDimensions, }; // BODY Starts here - if (offsets.empty()) + if (offsets.empty() && globalDimensions.empty()) { unsigned int globalBoundsSkip = 18; if (isCharacteristic) @@ -206,6 +215,19 @@ void BPSerializer::PutDimensionsRecord(const Dims &localDimensions, position += globalBoundsSkip; } } + else if (offsets.empty()) + { + // joined array has no offsets but has global dims + size_t zeroOffset = 0; + for (unsigned int d = 0; d < localDimensions.size(); ++d) + { + lf_CopyDimension(buffer, position, localDimensions[d], + isCharacteristic); + lf_CopyDimension(buffer, position, globalDimensions[d], + isCharacteristic); + lf_CopyDimension(buffer, position, zeroOffset, isCharacteristic); + } + } else { for (unsigned int d = 0; d < localDimensions.size(); ++d) @@ -376,8 +398,10 @@ void BPSerializer::MergeSerializeIndices( #define make_case(T) \ case (TypeTraits::type_enum): \ { \ + size_t irrelevant; \ const auto characteristics = ReadElementIndexCharacteristics( \ - buffer, position, TypeTraits::type_enum, true, isLittleEndian); \ + buffer, position, TypeTraits::type_enum, irrelevant, true, \ + isLittleEndian); \ count = characteristics.EntryCount; \ length = characteristics.EntryLength; \ timeStep = characteristics.Statistics.Step; \ @@ -388,9 +412,11 @@ void BPSerializer::MergeSerializeIndices( case (type_string_array): { + size_t irrelevant; const auto characteristics = ReadElementIndexCharacteristics( - buffer, position, type_string_array, true, isLittleEndian); + buffer, position, type_string_array, irrelevant, true, + isLittleEndian); count = characteristics.EntryCount; length = characteristics.EntryLength; timeStep = characteristics.Statistics.Step; diff --git a/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc b/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc index 56dfdcd80c..c07829db6b 100644 --- a/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc @@ -39,11 +39,12 @@ void BP3Deserializer::GetSyncVariableDataFromStream(core::Variable &variable, auto &buffer = bufferSTL.m_Buffer; size_t position = itStep->second.front(); + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics(buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + TypeTraits::type_enum, irrelevant, + false, m_Minifooter.IsLittleEndian); const size_t payloadOffset = characteristics.Statistics.PayloadOffset; variable.m_Data = reinterpret_cast(&buffer[payloadOffset]); @@ -193,11 +194,12 @@ void BP3Deserializer::SetVariableBlockInfo( const std::vector &buffer = bufferSTL.m_Buffer; size_t position = blockIndexOffset; + size_t irrelevant; const Characteristics blockCharacteristics = - ReadElementIndexCharacteristics(buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + buffer, position, TypeTraits::type_enum, irrelevant, false, + m_Minifooter.IsLittleEndian); // check if they intersect helper::SubStreamBoxInfo subStreamInfo; @@ -311,11 +313,12 @@ void BP3Deserializer::SetVariableBlockInfo( const std::vector &buffer = bufferSTL.m_Buffer; size_t position = blockIndexOffset; + size_t irrelevant; const Characteristics blockCharacteristics = - ReadElementIndexCharacteristics(buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + buffer, position, TypeTraits::type_enum, irrelevant, false, + m_Minifooter.IsLittleEndian); // check if they intersect helper::SubStreamBoxInfo subStreamInfo; @@ -486,10 +489,11 @@ void BP3Deserializer::GetValueFromMetadata(core::Variable &variable, for (size_t b = blocksStart; b < blocksStart + blocksCount; ++b) { size_t localPosition = positions[b]; + size_t irrelevant; const Characteristics characteristics = - ReadElementIndexCharacteristics(buffer, localPosition, - type_string, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + buffer, localPosition, type_string, irrelevant, false, + m_Minifooter.IsLittleEndian); data[dataCounter] = characteristics.Statistics.Value; ++dataCounter; @@ -753,11 +757,12 @@ inline void BP3Deserializer::DefineVariableInEngineIO( const std::vector &buffer, size_t position) const { const size_t initialPosition = position; + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics( - buffer, position, static_cast(header.DataType), false, - m_Minifooter.IsLittleEndian); + buffer, position, static_cast(header.DataType), + irrelevant, false, m_Minifooter.IsLittleEndian); const std::string variableName = header.Path.empty() ? header.Name @@ -807,12 +812,13 @@ inline void BP3Deserializer::DefineVariableInEngineIO( while (position < endPosition) { const size_t subsetPosition = position; + size_t irrelevant; // read until step is found const Characteristics subsetCharacteristics = ReadElementIndexCharacteristics( buffer, position, static_cast(header.DataType), - false, m_Minifooter.IsLittleEndian); + irrelevant, false, m_Minifooter.IsLittleEndian); const bool isNextStep = stepsFound.insert(subsetCharacteristics.Statistics.Step).second; @@ -862,11 +868,12 @@ void BP3Deserializer::DefineVariableInEngineIO(const ElementIndexHeader &header, size_t position) const { const size_t initialPosition = position; + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics( - buffer, position, static_cast(header.DataType), false, - m_Minifooter.IsLittleEndian); + buffer, position, static_cast(header.DataType), + irrelevant, false, m_Minifooter.IsLittleEndian); const std::string variableName = header.Path.empty() ? header.Name @@ -951,12 +958,13 @@ void BP3Deserializer::DefineVariableInEngineIO(const ElementIndexHeader &header, while (position < endPosition) { const size_t subsetPosition = position; + size_t irrelevant; // read until step is found const Characteristics subsetCharacteristics = ReadElementIndexCharacteristics( buffer, position, static_cast(header.DataType), - false, m_Minifooter.IsLittleEndian); + irrelevant, false, m_Minifooter.IsLittleEndian); const T blockMin = characteristics.Statistics.IsValue ? subsetCharacteristics.Statistics.Value @@ -1038,10 +1046,11 @@ void BP3Deserializer::DefineAttributeInEngineIO( const ElementIndexHeader &header, core::Engine &engine, const std::vector &buffer, size_t position) const { + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics( - buffer, position, static_cast(header.DataType), false, - m_Minifooter.IsLittleEndian); + buffer, position, static_cast(header.DataType), + irrelevant, false, m_Minifooter.IsLittleEndian); std::string attributeName(header.Name); if (!header.Path.empty()) @@ -1108,10 +1117,11 @@ BP3Deserializer::GetSubFileInfo(const core::Variable &variable) const // blockPosition gets updated by Read, can't be const for (size_t blockPosition : blockStarts) { + size_t irrelevant; const Characteristics blockCharacteristics = ReadElementIndexCharacteristics( - buffer, blockPosition, TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + buffer, blockPosition, TypeTraits::type_enum, irrelevant, + false, m_Minifooter.IsLittleEndian); // check if they intersect helper::SubFileInfo info; @@ -1165,11 +1175,12 @@ BP3Deserializer::BlocksInfoCommon( for (const size_t blockIndexOffset : blocksIndexOffsets) { size_t position = blockIndexOffset; + size_t irrelevant; const Characteristics blockCharacteristics = - ReadElementIndexCharacteristics(m_Metadata.m_Buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + m_Metadata.m_Buffer, position, TypeTraits::type_enum, + irrelevant, false, m_Minifooter.IsLittleEndian); typename core::Variable::BPInfo blockInfo; blockInfo.Shape = blockCharacteristics.Shape; diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc index c39052cd48..cace837be8 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc @@ -43,10 +43,11 @@ void BP4Deserializer::GetSyncVariableDataFromStream(core::Variable &variable, auto &buffer = bufferSTL.m_Buffer; size_t position = itStep->second.front(); + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics(buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + TypeTraits::type_enum, irrelevant, + false, m_Minifooter.IsLittleEndian); const size_t payloadOffset = characteristics.Statistics.PayloadOffset; variable.m_Data = reinterpret_cast(&buffer[payloadOffset]); @@ -196,11 +197,12 @@ void BP4Deserializer::SetVariableBlockInfo( const std::vector &buffer = bufferSTL.m_Buffer; size_t position = blockIndexOffset; + size_t irrelevant; const Characteristics blockCharacteristics = - ReadElementIndexCharacteristics(buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + buffer, position, TypeTraits::type_enum, irrelevant, false, + m_Minifooter.IsLittleEndian); // check if they intersect helper::SubStreamBoxInfo subStreamInfo; @@ -314,11 +316,12 @@ void BP4Deserializer::SetVariableBlockInfo( const std::vector &buffer = bufferSTL.m_Buffer; size_t position = blockIndexOffset; + size_t irrelevant; const Characteristics blockCharacteristics = - ReadElementIndexCharacteristics(buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + buffer, position, TypeTraits::type_enum, irrelevant, false, + m_Minifooter.IsLittleEndian); // check if they intersect helper::SubStreamBoxInfo subStreamInfo; @@ -487,13 +490,14 @@ void BP4Deserializer::GetValueFromMetadata(core::Variable &variable, variable.m_Name + ", in call to Get"); } + size_t irrelevant; for (size_t b = blocksStart; b < blocksStart + blocksCount; ++b) { size_t localPosition = positions[b]; const Characteristics characteristics = - ReadElementIndexCharacteristics(buffer, localPosition, - type_string, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + buffer, localPosition, type_string, irrelevant, false, + m_Minifooter.IsLittleEndian); data[dataCounter] = characteristics.Statistics.Value; ++dataCounter; @@ -714,11 +718,12 @@ inline void BP4Deserializer::DefineVariableInEngineIOPerStep( const std::vector &buffer, size_t position, size_t step) const { const size_t initialPosition = position; + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics( - buffer, position, static_cast(header.DataType), false, - m_Minifooter.IsLittleEndian); + buffer, position, static_cast(header.DataType), + irrelevant, false, m_Minifooter.IsLittleEndian); const std::string variableName = header.Path.empty() ? header.Name @@ -741,12 +746,13 @@ inline void BP4Deserializer::DefineVariableInEngineIOPerStep( while (position < endPositionCurrentStep) { const size_t subsetPosition = position; + size_t irrelevant; // read until step is found const Characteristics subsetCharacteristics = ReadElementIndexCharacteristics( buffer, position, static_cast(header.DataType), - false, m_Minifooter.IsLittleEndian); + irrelevant, false, m_Minifooter.IsLittleEndian); if (subsetCharacteristics.EntryShapeID == ShapeID::LocalValue) { @@ -812,12 +818,13 @@ inline void BP4Deserializer::DefineVariableInEngineIOPerStep( while (position < endPosition) { const size_t subsetPosition = position; + size_t irrelevant; // read until step is found const Characteristics subsetCharacteristics = ReadElementIndexCharacteristics( buffer, position, static_cast(header.DataType), - false, m_Minifooter.IsLittleEndian); + irrelevant, false, m_Minifooter.IsLittleEndian); const bool isNextStep = stepsFound.insert(subsetCharacteristics.Statistics.Step).second; @@ -867,11 +874,12 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( const std::vector &buffer, size_t position, size_t step) const { const size_t initialPosition = position; + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics( - buffer, position, static_cast(header.DataType), false, - m_Minifooter.IsLittleEndian); + buffer, position, static_cast(header.DataType), + irrelevant, false, m_Minifooter.IsLittleEndian); const std::string variableName = header.Path.empty() ? header.Name @@ -897,12 +905,14 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( while (position < endPositionCurrentStep) { const size_t subsetPosition = position; + size_t joinedArrayStartValuePos; // read until step is found const Characteristics subsetCharacteristics = ReadElementIndexCharacteristics( buffer, position, static_cast(header.DataType), - false, m_Minifooter.IsLittleEndian); + joinedArrayStartValuePos, false, + m_Minifooter.IsLittleEndian); const T blockMin = characteristics.Statistics.IsValue ? subsetCharacteristics.Statistics.Value @@ -935,6 +945,46 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( ++variable->m_Count[0]; } } + else if (subsetCharacteristics.EntryShapeID == ShapeID::JoinedArray) + { + Dims shape = m_ReverseDimensions + ? Dims(subsetCharacteristics.Shape.rbegin(), + subsetCharacteristics.Shape.rend()) + : subsetCharacteristics.Shape; + const Dims count = + m_ReverseDimensions + ? Dims(subsetCharacteristics.Count.rbegin(), + subsetCharacteristics.Count.rend()) + : subsetCharacteristics.Count; + uint64_t newStart; + if (variable->m_AvailableShapes[step].empty()) + { + shape[variable->m_JoinedDimPos] = + count[variable->m_JoinedDimPos]; + newStart = 0; + } + else + { + newStart = static_cast( + variable->m_AvailableShapes[step] + [variable->m_JoinedDimPos]); + + // shape increases with each block + shape[variable->m_JoinedDimPos] = + variable->m_AvailableShapes[step] + [variable->m_JoinedDimPos] + + count[variable->m_JoinedDimPos]; + } + // big hack: modify the metada in place, update the Start[i] + // of the block + char *src = reinterpret_cast(&newStart); + char *dst = + const_cast(&buffer[joinedArrayStartValuePos]); + memcpy(dst, src, sizeof(uint64_t)); + + variable->m_Shape = shape; + variable->m_AvailableShapes[step] = shape; + } else if (subsetCharacteristics.EntryShapeID == ShapeID::GlobalArray) { // Shape definition is by the last block now, not the first @@ -977,6 +1027,33 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( variable->m_Shape; break; } + case (ShapeID::JoinedArray): + { + Dims shape = m_ReverseDimensions + ? Dims(characteristics.Shape.rbegin(), + characteristics.Shape.rend()) + : characteristics.Shape; + const Dims count = m_ReverseDimensions + ? Dims(characteristics.Count.rbegin(), + characteristics.Count.rend()) + : characteristics.Count; + size_t joinedDimPos = 0; + for (size_t i = 0; i < shape.size(); ++i) + { + if (shape[i] == JoinedDim) + { + joinedDimPos = i; + shape[i] = 0; // will increase with each block + } + } + + variable = &engine.m_IO.DefineVariable( + variableName, shape, Dims(shape.size(), 0), shape); + variable->m_AvailableShapes[characteristics.Statistics.Step] = + variable->m_Shape; + variable->m_JoinedDimPos = joinedDimPos; + break; + } case (ShapeID::LocalValue): { variable = @@ -1032,12 +1109,13 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( while (position < endPosition) { const size_t subsetPosition = position; + size_t joinedArrayStartValuePos; // read until step is found const Characteristics subsetCharacteristics = ReadElementIndexCharacteristics( buffer, position, static_cast(header.DataType), - false, m_Minifooter.IsLittleEndian); + joinedArrayStartValuePos, false, m_Minifooter.IsLittleEndian); const T blockMin = characteristics.Statistics.IsValue ? subsetCharacteristics.Statistics.Value @@ -1069,6 +1147,45 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( } } + if (subsetCharacteristics.EntryShapeID == ShapeID::JoinedArray) + { + Dims shape = m_ReverseDimensions + ? Dims(subsetCharacteristics.Shape.rbegin(), + subsetCharacteristics.Shape.rend()) + : subsetCharacteristics.Shape; + const Dims count = m_ReverseDimensions + ? Dims(subsetCharacteristics.Count.rbegin(), + subsetCharacteristics.Count.rend()) + : subsetCharacteristics.Count; + + uint64_t newStart; + if (isNextStep) + { + shape[variable->m_JoinedDimPos] = + count[variable->m_JoinedDimPos]; + newStart = 0; + } + else + { + newStart = static_cast( + variable->m_AvailableShapes[currentStep] + [variable->m_JoinedDimPos]); + // shape increase with each block + shape[variable->m_JoinedDimPos] = + variable->m_AvailableShapes[currentStep] + [variable->m_JoinedDimPos] + + count[variable->m_JoinedDimPos]; + } + // big hack: modify the metada in place, update the Start[i] + // of the block. + char *src = reinterpret_cast(&newStart); + char *dst = const_cast(&buffer[joinedArrayStartValuePos]); + memcpy(dst, src, sizeof(uint64_t)); + + variable->m_Shape = shape; + variable->m_AvailableShapes[currentStep] = shape; + } + // Shape definition is by the last block now, not the first block if (subsetCharacteristics.EntryShapeID == ShapeID::GlobalArray) { @@ -1120,10 +1237,11 @@ void BP4Deserializer::DefineAttributeInEngineIO( const ElementIndexHeader &header, core::Engine &engine, const std::vector &buffer, size_t position) const { + size_t irrelevant; const Characteristics characteristics = ReadElementIndexCharacteristics( - buffer, position, static_cast(header.DataType), false, - m_Minifooter.IsLittleEndian); + buffer, position, static_cast(header.DataType), + irrelevant, false, m_Minifooter.IsLittleEndian); std::string attributeName(header.Name); if (!header.Path.empty()) @@ -1189,10 +1307,11 @@ BP4Deserializer::GetSubFileInfo(const core::Variable &variable) const // blockPosition gets updated by Read, can't be const for (size_t blockPosition : blockStarts) { + size_t irrelevant; const Characteristics blockCharacteristics = ReadElementIndexCharacteristics( - buffer, blockPosition, TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + buffer, blockPosition, TypeTraits::type_enum, irrelevant, + false, m_Minifooter.IsLittleEndian); // check if they intersect helper::SubFileInfo info; @@ -1245,11 +1364,12 @@ BP4Deserializer::BlocksInfoCommon( for (const size_t blockIndexOffset : blocksIndexOffsets) { size_t position = blockIndexOffset; + size_t irrelevant; const Characteristics blockCharacteristics = - ReadElementIndexCharacteristics(m_Metadata.m_Buffer, position, - TypeTraits::type_enum, false, - m_Minifooter.IsLittleEndian); + ReadElementIndexCharacteristics( + m_Metadata.m_Buffer, position, TypeTraits::type_enum, + irrelevant, false, m_Minifooter.IsLittleEndian); typename core::Variable::BPInfo blockInfo; blockInfo.Shape = blockCharacteristics.Shape; diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Serializer.cpp b/source/adios2/toolkit/format/bp/bp4/BP4Serializer.cpp index 1e85fe3e3c..5fe9938041 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Serializer.cpp +++ b/source/adios2/toolkit/format/bp/bp4/BP4Serializer.cpp @@ -709,8 +709,10 @@ void BP4Serializer::AggregateCollectiveMetadataIndices(helper::Comm const &comm, #define make_case(T) \ case (TypeTraits::type_enum): \ { \ + size_t irrelevant; \ const auto characteristics = ReadElementIndexCharacteristics( \ - buffer, position, TypeTraits::type_enum, true, isLittleEndian); \ + buffer, position, TypeTraits::type_enum, irrelevant, true, \ + isLittleEndian); \ count = characteristics.EntryCount; \ length = characteristics.EntryLength; \ timeStep = characteristics.Statistics.Step; \ @@ -721,9 +723,11 @@ void BP4Serializer::AggregateCollectiveMetadataIndices(helper::Comm const &comm, case (type_string_array): { + size_t irrelevant; const auto characteristics = ReadElementIndexCharacteristics( - buffer, position, type_string_array, true, isLittleEndian); + buffer, position, type_string_array, irrelevant, true, + isLittleEndian); count = characteristics.EntryCount; length = characteristics.EntryLength; timeStep = characteristics.Statistics.Step; diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index fb3263bacf..c73cf346d9 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -191,6 +191,9 @@ gtest_add_tests_helper(StepsInSituGlobalArray MPI_ALLOW BP Engine.BP. .BP4 gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .BP4 WORKING_DIRECTORY ${BP4_DIR} EXTRA_ARGS "BP4" ) +gtest_add_tests_helper(JoinedArray MPI_ALLOW BP Engine.BP. .BP4 + WORKING_DIRECTORY ${BP4_DIR} EXTRA_ARGS "BP4" +) gtest_add_tests_helper(InquireVariableException MPI_ALLOW BP Engine.BP. .BP4 WORKING_DIRECTORY ${BP4_DIR} diff --git a/testing/adios2/engine/bp/TestBPJoinedArray.cpp b/testing/adios2/engine/bp/TestBPJoinedArray.cpp new file mode 100644 index 0000000000..c3685120bb --- /dev/null +++ b/testing/adios2/engine/bp/TestBPJoinedArray.cpp @@ -0,0 +1,226 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * TestBPJoinedArray.cpp : + * + * Created on: Dec 17, 2018 + * Author: Norbert Podhorszki, Keichi Takahashi + */ + +#include +#include + +#include +#include + +#include + +#include + +#include "../SmallTestData.h" + +std::string engineName; // comes from command line + +class BPJoinedArray : public ::testing::Test +{ +public: + BPJoinedArray() = default; + + SmallTestData m_TestData; +}; + +TEST_F(BPJoinedArray, MultiBlock) +{ + // Write multiple blocks per process + // Change number of rows per block and per process + // Change total number of rows in each step + // Write two variables to ensure both will end up with the same order of + // rows in reading + + const int nsteps = 3; + const size_t Ncols = 4; + const std::vector nblocksPerProcess = {2, 3, 2, 1, 3, 2}; + + int rank = 0, nproc = 1; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nproc); + adios2::ADIOS adios(MPI_COMM_WORLD); + const int nblocks = (rank < static_cast(nblocksPerProcess.size()) + ? nblocksPerProcess[rank] + : 1); +#else + adios2::ADIOS adios; + const int nblocks = nblocksPerProcess[0]; +#endif + + const std::string fname = + "BPJoinedArrayMultiblock_nproc_" + std::to_string(nproc) + ".bp"; + int nMyTotalRows[nsteps]; + int nTotalRows[nsteps]; + + // Writer + { + adios2::IO outIO = adios.DeclareIO("Output"); + + if (!engineName.empty()) + { + outIO.SetEngine(engineName); + } + + adios2::Engine writer = outIO.Open(fname, adios2::Mode::Write); + auto var = outIO.DefineVariable( + "table", {adios2::JoinedDim, Ncols}, {}, {1, Ncols}); + + if (!rank) + { + std::cout << "Writing to " << fname << std::endl; + } + + for (int step = 0; step < nsteps; step++) + { + // Application variables for output random size per process, 5..10 + // each + std::vector Nrows; + nMyTotalRows[step] = 0; + for (int i = 0; i < nblocks; ++i) + { + int n = rand() % 6 + 5; + Nrows.push_back(static_cast(n)); + nMyTotalRows[step] += n; + } + + nTotalRows[step] = nMyTotalRows[step]; +#if ADIOS2_USE_MPI + MPI_Allreduce(&(nMyTotalRows[step]), &(nTotalRows[step]), 1, + MPI_INT, MPI_SUM, MPI_COMM_WORLD); +#endif + + if (!rank) + { + std::cout << "Writing " << nTotalRows[step] << " rows in step " + << step << std::endl; + } + + writer.BeginStep(); + for (int block = 0; block < nblocks; ++block) + { + std::vector mytable(Nrows[block] * Ncols); + for (size_t row = 0; row < Nrows[block]; row++) + { + for (size_t col = 0; col < Ncols; col++) + { + mytable[row * Ncols + col] = static_cast( + (step + 1) * 1.0 + rank * 0.1 + block * 0.01 + + row * 0.001 + col * 0.0001); + } + } + + var.SetSelection({{}, {Nrows[block], Ncols}}); + + std::cout << "Step " << step << " rank " << rank << " block " + << block << " count (" << var.Count()[0] << ", " + << var.Count()[1] << ")" << std::endl; + + writer.Put(var, mytable.data(), adios2::Mode::Sync); + } + writer.EndStep(); + } + writer.Close(); + } + + // Reader with streaming + { + adios2::IO inIO = adios.DeclareIO("Input"); + + if (!engineName.empty()) + { + inIO.SetEngine(engineName); + } + adios2::Engine reader = inIO.Open(fname, adios2::Mode::Read); + + if (!rank) + { + std::cout << "Reading as stream with BeginStep/EndStep:" + << std::endl; + } + + int step = 0; + while (true) + { + adios2::StepStatus status = + reader.BeginStep(adios2::StepMode::Read); + + if (status != adios2::StepStatus::OK) + { + break; + } + + auto var = inIO.InquireVariable("table"); + EXPECT_TRUE(var); + + if (!rank) + { + std::cout << "Step " << step << " table shape (" + << var.Shape()[0] << ", " << var.Shape()[1] << ")" + << std::endl; + } + + size_t Nrows = static_cast(nTotalRows[step]); + EXPECT_EQ(var.Shape()[0], Nrows); + EXPECT_EQ(var.Shape()[1], Ncols); + + var.SetSelection({{0, 0}, {Nrows, Ncols}}); + + // Check data on rank 0 + if (!rank) + { + std::vector data(Nrows * Ncols); + reader.Get(var, data.data()); + reader.PerformGets(); + for (size_t i = 0; i < Nrows; ++i) + { + for (size_t j = 0; j < Ncols; ++j) + { + EXPECT_GE(data[i * Ncols + j], (step + 1) * 1.0); + EXPECT_LT(data[i * Ncols + j], + (nsteps + 1) * 1.0 + 0.9999); + } + } + } + + reader.EndStep(); + ++step; + } + reader.Close(); + EXPECT_EQ(step, nsteps); + } +} + +int main(int argc, char **argv) +{ +#if ADIOS2_USE_MPI + int provided; + + // MPI_THREAD_MULTIPLE is only required if you enable the SST MPI_DP + MPI_Init_thread(nullptr, nullptr, MPI_THREAD_MULTIPLE, &provided); +#endif + + int result; + ::testing::InitGoogleTest(&argc, argv); + + if (argc > 1) + { + engineName = std::string(argv[1]); + } + + result = RUN_ALL_TESTS(); + +#if ADIOS2_USE_MPI + MPI_Finalize(); +#endif + + return result; +}