From c10c651e788bf6cb071922227ee6670a715e066c Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 28 Jan 2022 15:32:33 -0500 Subject: [PATCH 1/2] BP5: Ability to read back BP5 files where the number of writers changes over time. --- source/adios2/engine/bp5/BP5Reader.cpp | 13 +- source/adios2/engine/sst/SstReader.cpp | 9 +- .../toolkit/format/bp5/BP5Deserializer.cpp | 116 +++++++++++++----- .../toolkit/format/bp5/BP5Deserializer.h | 22 ++-- 4 files changed, 111 insertions(+), 49 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 96eda6e064..a22e73a2ed 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -136,7 +136,9 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds) // i++; // } - m_BP5Deserializer->SetupForTimestep(m_CurrentStep); + m_BP5Deserializer->SetupForStep( + m_CurrentStep, + m_WriterMap[m_WriterMapIndex[m_CurrentStep]].WriterCount); InstallMetadataForTimestep(m_CurrentStep); m_IO.ResetVariablesStepSelection(false, @@ -574,9 +576,9 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, // now we are sure the index header has been parsed, first step parsing // done - m_BP5Deserializer = new format::BP5Deserializer( - m_WriterMap[0].WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor, - (m_OpenMode == Mode::ReadRandomAccess)); + m_BP5Deserializer = + new format::BP5Deserializer(m_WriterIsRowMajor, m_ReaderIsRowMajor, + (m_OpenMode == Mode::ReadRandomAccess)); m_BP5Deserializer->m_Engine = this; InstallMetaMetaData(m_MetaMetadata); @@ -587,6 +589,9 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, { for (size_t Step = 0; Step < m_MetadataIndexTable.size(); Step++) { + m_BP5Deserializer->SetupForStep( + Step, + m_WriterMap[m_WriterMapIndex[m_CurrentStep]].WriterCount); InstallMetadataForTimestep(Step); } } diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp index 9bf9b47b36..932dfb90af 100644 --- a/source/adios2/engine/sst/SstReader.cpp +++ b/source/adios2/engine/sst/SstReader.cpp @@ -320,9 +320,8 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec) m_CurrentStepMetaData = SstGetCurMetadata(m_Input); if (!m_BP5Deserializer) { - m_BP5Deserializer = new format::BP5Deserializer( - m_CurrentStepMetaData->WriterCohortSize, m_WriterIsRowMajor, - Params.IsRowMajor); + m_BP5Deserializer = new format::BP5Deserializer(m_WriterIsRowMajor, + Params.IsRowMajor); m_BP5Deserializer->m_Engine = this; } SstMetaMetaList MMList = @@ -355,7 +354,9 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec) } m_IO.RemoveAllVariables(); - m_BP5Deserializer->SetupForTimestep(SstCurrentStep(m_Input)); + m_BP5Deserializer->SetupForStep( + SstCurrentStep(m_Input), + static_cast(m_CurrentStepMetaData->WriterCohortSize)); for (int i = 0; i < m_CurrentStepMetaData->WriterCohortSize; i++) { diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index 6f89265ec0..c5c8d4b6aa 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -213,6 +213,8 @@ BP5Deserializer::BP5VarRec *BP5Deserializer::LookupVarByName(const char *Name) return ret; } +static char *varname_debug; + BP5Deserializer::BP5VarRec *BP5Deserializer::CreateVarRec(const char *ArrayName) { BP5VarRec *Ret = new BP5VarRec(); @@ -222,9 +224,11 @@ BP5Deserializer::BP5VarRec *BP5Deserializer::CreateVarRec(const char *ArrayName) VarByName[Ret->VarName] = Ret; if (!m_RandomAccessMode) { - Ret->PerWriterMetaFieldOffset.resize(m_WriterCohortSize); - Ret->PerWriterBlockStart.resize(m_WriterCohortSize); + const size_t writerCohortSize = WriterCohortSize(MaxSizeT); + Ret->PerWriterMetaFieldOffset.resize(writerCohortSize); + Ret->PerWriterBlockStart.resize(writerCohortSize); } + varname_debug = Ret->VarName; return Ret; } @@ -438,21 +442,53 @@ void *BP5Deserializer::ArrayVarSetup(core::Engine *engine, return (void *)NULL; }; -void BP5Deserializer::SetupForTimestep(size_t Timestep) +void BP5Deserializer::SetupForStep(size_t Step, size_t WriterCount) { - CurTimestep = Timestep; - PendingRequests.clear(); + CurTimestep = Step; + if (m_RandomAccessMode) + { + if (m_WriterCohortSize.size() < Step + 1) + { + m_WriterCohortSize.resize(Step + 1); + } + m_WriterCohortSize[Step] = WriterCount; + } + else + { + PendingRequests.clear(); - for (auto RecPair : VarByKey) + for (auto RecPair : VarByKey) + { + m_Engine->m_IO.RemoveVariable(RecPair.second->VarName); + RecPair.second->Variable = NULL; + } + m_CurrentWriterCohortSize = WriterCount; + } +} + +size_t BP5Deserializer::WriterCohortSize(size_t Step) +{ + if (m_RandomAccessMode) { - m_Engine->m_IO.RemoveVariable(RecPair.second->VarName); - RecPair.second->Variable = NULL; + if (Step < m_WriterCohortSize.size()) + { + return m_WriterCohortSize[Step]; + } + else + { + return m_WriterCohortSize.back(); + } + } + else + { + return m_CurrentWriterCohortSize; } } void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, size_t WriterRank, size_t Step) { + const size_t writerCohortSize = WriterCohortSize(Step); FFSTypeHandle FFSformat; void *BaseData; static int DumpMetadata = -1; @@ -512,7 +548,7 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, } if (m_ControlArray[Step].size() == 0) { - m_ControlArray[Step].resize(m_WriterCohortSize); + m_ControlArray[Step].resize(writerCohortSize); } m_ControlArray[Step][WriterRank] = Control; @@ -520,7 +556,7 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, if (MetadataBaseArray[Step] == nullptr) { m_MetadataBaseAddrs = new std::vector(); - m_MetadataBaseAddrs->resize(m_WriterCohortSize); + m_MetadataBaseAddrs->resize(writerCohortSize); MetadataBaseArray[Step] = m_MetadataBaseAddrs; m_FreeableMBA = nullptr; } @@ -531,7 +567,10 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, { m_MetadataBaseAddrs = new std::vector(); m_FreeableMBA = m_MetadataBaseAddrs; - m_MetadataBaseAddrs->resize(m_WriterCohortSize); + } + if (writerCohortSize > m_MetadataBaseAddrs->size()) + { + m_MetadataBaseAddrs->resize(writerCohortSize); } } (*m_MetadataBaseAddrs)[WriterRank] = BaseData; @@ -546,7 +585,14 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, continue; } if (!m_RandomAccessMode) + { + if (writerCohortSize > VarRec->PerWriterBlockStart.size()) + { + VarRec->PerWriterBlockStart.resize(writerCohortSize); + VarRec->PerWriterMetaFieldOffset.resize(writerCohortSize); + } VarRec->PerWriterMetaFieldOffset[WriterRank] = FieldOffset; + } if ((ControlFields[i].OrigShapeID == ShapeID::GlobalArray) || (ControlFields[i].OrigShapeID == ShapeID::LocalArray)) { @@ -591,11 +637,11 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, if (WriterRank == 0) { VarRec->PerWriterBlockStart[WriterRank] = 0; - if (m_WriterCohortSize > 1) + if (writerCohortSize > 1) VarRec->PerWriterBlockStart[WriterRank + 1] = BlockCount; } - if (WriterRank < static_cast(m_WriterCohortSize - 1)) + if (WriterRank < static_cast(writerCohortSize - 1)) { VarRec->PerWriterBlockStart[WriterRank + 1] = VarRec->PerWriterBlockStart[WriterRank] + BlockCount; @@ -644,7 +690,7 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen, // Local single values show up as global arrays on the // reader size_t zero = 0; - size_t writerSize = m_WriterCohortSize; + size_t writerSize = writerCohortSize; VarRec->Variable = ArrayVarSetup(m_Engine, VarRec->VarName, VarRec->Type, 1, &writerSize, &zero, &writerSize); @@ -890,8 +936,8 @@ bool BP5Deserializer::QueueGetSingle(core::VariableBase &variable, BP5VarRec *VarRec = VarByKey[&variable]; if (VarRec->OrigShapeID == ShapeID::GlobalValue) { - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; - WriterRank++) + const size_t writerCohortSize = WriterCohortSize(Step); + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { if (GetSingleValueFromMetadata(variable, VarRec, DestData, Step, WriterRank)) @@ -1033,13 +1079,14 @@ std::vector BP5Deserializer::GenerateReadRequests() { std::vector Ret; - std::vector WriterInfo(m_WriterCohortSize); + // std::vector WriterInfo(m_WriterCohortSize); typedef std::pair pair; std::map WriterTSNeeded; for (const auto &Req : PendingRequests) { - for (size_t i = 0; i < m_WriterCohortSize; i++) + const size_t writerCohortSize = WriterCohortSize(Req.Step); + for (size_t i = 0; i < writerCohortSize; i++) { if (WriterTSNeeded.count(std::make_pair(Req.Step, i)) == 0) { @@ -1079,8 +1126,8 @@ void BP5Deserializer::FinalizeGets(std::vector Requests) for (const auto &Req : PendingRequests) { // ImplementGapWarning(Reqs); - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; - WriterRank++) + const size_t writerCohortSize = WriterCohortSize(Req.Step); + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { size_t NodeFirst = 0; if (NeedWriter(Req, WriterRank, NodeFirst)) @@ -1449,11 +1496,10 @@ void BP5Deserializer::ExtractSelectionFromPartialCM( free(FirstIndex); } -BP5Deserializer::BP5Deserializer(int WriterCount, bool WriterIsRowMajor, - bool ReaderIsRowMajor, bool RandomAccessMode) +BP5Deserializer::BP5Deserializer(bool WriterIsRowMajor, bool ReaderIsRowMajor, + bool RandomAccessMode) : m_WriterIsRowMajor{WriterIsRowMajor}, m_ReaderIsRowMajor{ReaderIsRowMajor}, - m_WriterCohortSize{static_cast(WriterCount)}, m_RandomAccessMode{ - RandomAccessMode} + m_RandomAccessMode{RandomAccessMode} { FMContext Tmp = create_local_FMcontext(); ReaderFFSContext = create_FFSContext_FM(Tmp); @@ -1537,6 +1583,7 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, Engine::MinVarInfo *MV = new Engine::MinVarInfo(VarRec->DimCount, VarRec->GlobalDims); + const size_t writerCohortSize = WriterCohortSize(Step); MV->Dims = VarRec->DimCount; MV->Shape = VarRec->GlobalDims; MV->IsReverseDims = @@ -1546,9 +1593,8 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, (VarRec->OrigShapeID == ShapeID::GlobalValue)) { MV->IsValue = true; - MV->BlocksInfo.reserve(m_WriterCohortSize); - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; - WriterRank++) + MV->BlocksInfo.reserve(writerCohortSize); + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { MetaArrayRec *writer_meta_base = (MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank); @@ -1564,7 +1610,7 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, return MV; } size_t Id = 0; - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; WriterRank++) + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { MetaArrayRec *writer_meta_base = (MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank); @@ -1580,7 +1626,7 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, MV->BlocksInfo.reserve(Id); Id = 0; - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; WriterRank++) + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { MetaArrayRec *writer_meta_base = (MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank); @@ -1725,7 +1771,8 @@ size_t BP5Deserializer::RelativeToAbsoluteStep(const BP5VarRec *VarRec, while (RelStep != 0) { size_t WriterRank = 0; - while (WriterRank < m_WriterCohortSize) + const size_t writerCohortSize = WriterCohortSize(AbsStep); + while (WriterRank < writerCohortSize) { BP5MetadataInfoStruct *BaseData; BaseData = (BP5MetadataInfoStruct @@ -1735,7 +1782,7 @@ size_t BP5Deserializer::RelativeToAbsoluteStep(const BP5VarRec *VarRec, { // variable appeared on this step RelStep--; - break; // exit while (WriterRank < m_WriterCohortSize) + break; // exit while (WriterRank < writerCohortSize) } WriterRank++; } @@ -1762,6 +1809,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step, MinMax.Init(VarRec->Type); + const size_t writerCohortSize = WriterCohortSize(Step); size_t StartStep = Step, StopStep = Step + 1; if (Step == DefaultSizeT) { @@ -1775,7 +1823,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step, if ((VarRec->OrigShapeID == ShapeID::LocalArray) || (VarRec->OrigShapeID == ShapeID::GlobalArray)) { - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { MetaArrayRec *writer_meta_base = @@ -1807,7 +1855,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step, void *writer_meta_base = NULL; size_t WriterRank = 0; while ((writer_meta_base == NULL) && - (WriterRank < m_WriterCohortSize)) + (WriterRank < writerCohortSize)) { writer_meta_base = GetMetadataBase(VarRec, RelStep, WriterRank++); @@ -1816,7 +1864,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step, } else if (VarRec->OrigShapeID == ShapeID::LocalValue) { - for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; + for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++) { void *writer_meta_base = diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.h b/source/adios2/toolkit/format/bp5/BP5Deserializer.h index 86be0d40a7..9b3b72a9b2 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.h @@ -33,8 +33,8 @@ class BP5Deserializer : virtual public BP5Base { public: - BP5Deserializer(int WriterCount, bool WriterIsRowMajor, - bool ReaderIsRowMajor, bool RandomAccessMode = false); + BP5Deserializer(bool WriterIsRowMajor, bool ReaderIsRowMajor, + bool RandomAccessMode = false); ~BP5Deserializer(); @@ -52,7 +52,7 @@ class BP5Deserializer : virtual public BP5Base size_t WriterRank, size_t Step = SIZE_MAX); void InstallAttributeData(void *AttributeBlock, size_t BlockLen, size_t Step = SIZE_MAX); - void SetupForTimestep(size_t t); + void SetupForStep(size_t Step, size_t WriterCount); // return from QueueGet is true if a sync is needed to fill the data bool QueueGet(core::VariableBase &variable, void *DestData); bool QueueGetSingle(core::VariableBase &variable, void *DestData, @@ -68,8 +68,8 @@ class BP5Deserializer : virtual public BP5Base bool VariableMinMax(const VariableBase &var, const size_t Step, Engine::MinMaxStruct &MinMax); - bool m_WriterIsRowMajor = 1; - bool m_ReaderIsRowMajor = 1; + const bool m_WriterIsRowMajor; + const bool m_ReaderIsRowMajor; core::Engine *m_Engine = NULL; private: @@ -126,8 +126,16 @@ class BP5Deserializer : virtual public BP5Base }; FFSContext ReaderFFSContext; - size_t m_WriterCohortSize; - bool m_RandomAccessMode; + + const bool m_RandomAccessMode; + + std::vector m_WriterCohortSize; // per step, in random mode + size_t m_CurrentWriterCohortSize; // valid in streaming mode + // return the number of writers + // m_CurrentWriterCohortSize in streaming mode + // m_WriterCohortSize[Step] in random access mode + size_t WriterCohortSize(size_t Step); + size_t m_LastAttrStep = MaxSizeT; // invalid timestep for start std::unordered_map VarByName; From 335193479e1b567bfd8462f19529b06e86538d40 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 28 Jan 2022 16:35:01 -0500 Subject: [PATCH 2/2] remove debug variable that causes compiler warning --- source/adios2/toolkit/format/bp5/BP5Deserializer.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index c5c8d4b6aa..b59f168b5a 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -213,8 +213,6 @@ BP5Deserializer::BP5VarRec *BP5Deserializer::LookupVarByName(const char *Name) return ret; } -static char *varname_debug; - BP5Deserializer::BP5VarRec *BP5Deserializer::CreateVarRec(const char *ArrayName) { BP5VarRec *Ret = new BP5VarRec(); @@ -228,7 +226,6 @@ BP5Deserializer::BP5VarRec *BP5Deserializer::CreateVarRec(const char *ArrayName) Ret->PerWriterMetaFieldOffset.resize(writerCohortSize); Ret->PerWriterBlockStart.resize(writerCohortSize); } - varname_debug = Ret->VarName; return Ret; }