Skip to content

Commit

Permalink
Merge pull request #3026 from ornladios/bp5-read-changing-writernum
Browse files Browse the repository at this point in the history
BP5: Ability to read back BP5 files where the number of writers chang…
  • Loading branch information
pnorbert authored Jan 29, 2022
2 parents 6fa3443 + 3351934 commit 46a0c3b
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 49 deletions.
13 changes: 9 additions & 4 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds)
// i++;
// }

m_BP5Deserializer->SetupForTimestep(m_CurrentStep);
m_BP5Deserializer->SetupForStep(
m_CurrentStep,
m_WriterMap[m_WriterMapIndex[m_CurrentStep]].WriterCount);

InstallMetadataForTimestep(m_CurrentStep);
m_IO.ResetVariablesStepSelection(false,
Expand Down Expand Up @@ -574,9 +576,9 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
// now we are sure the index header has been parsed, first step parsing
// done

m_BP5Deserializer = new format::BP5Deserializer(
m_WriterMap[0].WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor,
(m_OpenMode == Mode::ReadRandomAccess));
m_BP5Deserializer =
new format::BP5Deserializer(m_WriterIsRowMajor, m_ReaderIsRowMajor,
(m_OpenMode == Mode::ReadRandomAccess));
m_BP5Deserializer->m_Engine = this;

InstallMetaMetaData(m_MetaMetadata);
Expand All @@ -587,6 +589,9 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
{
for (size_t Step = 0; Step < m_MetadataIndexTable.size(); Step++)
{
m_BP5Deserializer->SetupForStep(
Step,
m_WriterMap[m_WriterMapIndex[m_CurrentStep]].WriterCount);
InstallMetadataForTimestep(Step);
}
}
Expand Down
9 changes: 5 additions & 4 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,8 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec)
m_CurrentStepMetaData = SstGetCurMetadata(m_Input);
if (!m_BP5Deserializer)
{
m_BP5Deserializer = new format::BP5Deserializer(
m_CurrentStepMetaData->WriterCohortSize, m_WriterIsRowMajor,
Params.IsRowMajor);
m_BP5Deserializer = new format::BP5Deserializer(m_WriterIsRowMajor,
Params.IsRowMajor);
m_BP5Deserializer->m_Engine = this;
}
SstMetaMetaList MMList =
Expand Down Expand Up @@ -355,7 +354,9 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec)
}

m_IO.RemoveAllVariables();
m_BP5Deserializer->SetupForTimestep(SstCurrentStep(m_Input));
m_BP5Deserializer->SetupForStep(
SstCurrentStep(m_Input),
static_cast<size_t>(m_CurrentStepMetaData->WriterCohortSize));

for (int i = 0; i < m_CurrentStepMetaData->WriterCohortSize; i++)
{
Expand Down
113 changes: 79 additions & 34 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ BP5Deserializer::BP5VarRec *BP5Deserializer::CreateVarRec(const char *ArrayName)
VarByName[Ret->VarName] = Ret;
if (!m_RandomAccessMode)
{
Ret->PerWriterMetaFieldOffset.resize(m_WriterCohortSize);
Ret->PerWriterBlockStart.resize(m_WriterCohortSize);
const size_t writerCohortSize = WriterCohortSize(MaxSizeT);
Ret->PerWriterMetaFieldOffset.resize(writerCohortSize);
Ret->PerWriterBlockStart.resize(writerCohortSize);
}
return Ret;
}
Expand Down Expand Up @@ -438,21 +439,53 @@ void *BP5Deserializer::ArrayVarSetup(core::Engine *engine,
return (void *)NULL;
};

void BP5Deserializer::SetupForTimestep(size_t Timestep)
void BP5Deserializer::SetupForStep(size_t Step, size_t WriterCount)
{
CurTimestep = Timestep;
PendingRequests.clear();
CurTimestep = Step;
if (m_RandomAccessMode)
{
if (m_WriterCohortSize.size() < Step + 1)
{
m_WriterCohortSize.resize(Step + 1);
}
m_WriterCohortSize[Step] = WriterCount;
}
else
{
PendingRequests.clear();

for (auto RecPair : VarByKey)
for (auto RecPair : VarByKey)
{
m_Engine->m_IO.RemoveVariable(RecPair.second->VarName);
RecPair.second->Variable = NULL;
}
m_CurrentWriterCohortSize = WriterCount;
}
}

size_t BP5Deserializer::WriterCohortSize(size_t Step)
{
if (m_RandomAccessMode)
{
if (Step < m_WriterCohortSize.size())
{
return m_WriterCohortSize[Step];
}
else
{
return m_WriterCohortSize.back();
}
}
else
{
m_Engine->m_IO.RemoveVariable(RecPair.second->VarName);
RecPair.second->Variable = NULL;
return m_CurrentWriterCohortSize;
}
}

void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
size_t WriterRank, size_t Step)
{
const size_t writerCohortSize = WriterCohortSize(Step);
FFSTypeHandle FFSformat;
void *BaseData;
static int DumpMetadata = -1;
Expand Down Expand Up @@ -512,15 +545,15 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}
if (m_ControlArray[Step].size() == 0)
{
m_ControlArray[Step].resize(m_WriterCohortSize);
m_ControlArray[Step].resize(writerCohortSize);
}
m_ControlArray[Step][WriterRank] = Control;

MetadataBaseArray.resize(Step + 1);
if (MetadataBaseArray[Step] == nullptr)
{
m_MetadataBaseAddrs = new std::vector<void *>();
m_MetadataBaseAddrs->resize(m_WriterCohortSize);
m_MetadataBaseAddrs->resize(writerCohortSize);
MetadataBaseArray[Step] = m_MetadataBaseAddrs;
m_FreeableMBA = nullptr;
}
Expand All @@ -531,7 +564,10 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
{
m_MetadataBaseAddrs = new std::vector<void *>();
m_FreeableMBA = m_MetadataBaseAddrs;
m_MetadataBaseAddrs->resize(m_WriterCohortSize);
}
if (writerCohortSize > m_MetadataBaseAddrs->size())
{
m_MetadataBaseAddrs->resize(writerCohortSize);
}
}
(*m_MetadataBaseAddrs)[WriterRank] = BaseData;
Expand All @@ -546,7 +582,14 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
continue;
}
if (!m_RandomAccessMode)
{
if (writerCohortSize > VarRec->PerWriterBlockStart.size())
{
VarRec->PerWriterBlockStart.resize(writerCohortSize);
VarRec->PerWriterMetaFieldOffset.resize(writerCohortSize);
}
VarRec->PerWriterMetaFieldOffset[WriterRank] = FieldOffset;
}
if ((ControlFields[i].OrigShapeID == ShapeID::GlobalArray) ||
(ControlFields[i].OrigShapeID == ShapeID::LocalArray))
{
Expand Down Expand Up @@ -591,11 +634,11 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
if (WriterRank == 0)
{
VarRec->PerWriterBlockStart[WriterRank] = 0;
if (m_WriterCohortSize > 1)
if (writerCohortSize > 1)
VarRec->PerWriterBlockStart[WriterRank + 1] =
BlockCount;
}
if (WriterRank < static_cast<size_t>(m_WriterCohortSize - 1))
if (WriterRank < static_cast<size_t>(writerCohortSize - 1))
{
VarRec->PerWriterBlockStart[WriterRank + 1] =
VarRec->PerWriterBlockStart[WriterRank] + BlockCount;
Expand Down Expand Up @@ -644,7 +687,7 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
// Local single values show up as global arrays on the
// reader
size_t zero = 0;
size_t writerSize = m_WriterCohortSize;
size_t writerSize = writerCohortSize;
VarRec->Variable =
ArrayVarSetup(m_Engine, VarRec->VarName, VarRec->Type,
1, &writerSize, &zero, &writerSize);
Expand Down Expand Up @@ -890,8 +933,8 @@ bool BP5Deserializer::QueueGetSingle(core::VariableBase &variable,
BP5VarRec *VarRec = VarByKey[&variable];
if (VarRec->OrigShapeID == ShapeID::GlobalValue)
{
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
WriterRank++)
const size_t writerCohortSize = WriterCohortSize(Step);
for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++)
{
if (GetSingleValueFromMetadata(variable, VarRec, DestData, Step,
WriterRank))
Expand Down Expand Up @@ -1033,13 +1076,14 @@ std::vector<BP5Deserializer::ReadRequest>
BP5Deserializer::GenerateReadRequests()
{
std::vector<BP5Deserializer::ReadRequest> Ret;
std::vector<FFSReaderPerWriterRec> WriterInfo(m_WriterCohortSize);
// std::vector<FFSReaderPerWriterRec> WriterInfo(m_WriterCohortSize);
typedef std::pair<size_t, size_t> pair;
std::map<pair, bool> WriterTSNeeded;

for (const auto &Req : PendingRequests)
{
for (size_t i = 0; i < m_WriterCohortSize; i++)
const size_t writerCohortSize = WriterCohortSize(Req.Step);
for (size_t i = 0; i < writerCohortSize; i++)
{
if (WriterTSNeeded.count(std::make_pair(Req.Step, i)) == 0)
{
Expand Down Expand Up @@ -1079,8 +1123,8 @@ void BP5Deserializer::FinalizeGets(std::vector<ReadRequest> Requests)
for (const auto &Req : PendingRequests)
{
// ImplementGapWarning(Reqs);
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
WriterRank++)
const size_t writerCohortSize = WriterCohortSize(Req.Step);
for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++)
{
size_t NodeFirst = 0;
if (NeedWriter(Req, WriterRank, NodeFirst))
Expand Down Expand Up @@ -1449,11 +1493,10 @@ void BP5Deserializer::ExtractSelectionFromPartialCM(
free(FirstIndex);
}

BP5Deserializer::BP5Deserializer(int WriterCount, bool WriterIsRowMajor,
bool ReaderIsRowMajor, bool RandomAccessMode)
BP5Deserializer::BP5Deserializer(bool WriterIsRowMajor, bool ReaderIsRowMajor,
bool RandomAccessMode)
: m_WriterIsRowMajor{WriterIsRowMajor}, m_ReaderIsRowMajor{ReaderIsRowMajor},
m_WriterCohortSize{static_cast<size_t>(WriterCount)}, m_RandomAccessMode{
RandomAccessMode}
m_RandomAccessMode{RandomAccessMode}
{
FMContext Tmp = create_local_FMcontext();
ReaderFFSContext = create_FFSContext_FM(Tmp);
Expand Down Expand Up @@ -1537,6 +1580,7 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
Engine::MinVarInfo *MV =
new Engine::MinVarInfo(VarRec->DimCount, VarRec->GlobalDims);

const size_t writerCohortSize = WriterCohortSize(Step);
MV->Dims = VarRec->DimCount;
MV->Shape = VarRec->GlobalDims;
MV->IsReverseDims =
Expand All @@ -1546,9 +1590,8 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
(VarRec->OrigShapeID == ShapeID::GlobalValue))
{
MV->IsValue = true;
MV->BlocksInfo.reserve(m_WriterCohortSize);
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
WriterRank++)
MV->BlocksInfo.reserve(writerCohortSize);
for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++)
{
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank);
Expand All @@ -1564,7 +1607,7 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
return MV;
}
size_t Id = 0;
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; WriterRank++)
for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++)
{
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank);
Expand All @@ -1580,7 +1623,7 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
MV->BlocksInfo.reserve(Id);

Id = 0;
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; WriterRank++)
for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++)
{
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank);
Expand Down Expand Up @@ -1725,7 +1768,8 @@ size_t BP5Deserializer::RelativeToAbsoluteStep(const BP5VarRec *VarRec,
while (RelStep != 0)
{
size_t WriterRank = 0;
while (WriterRank < m_WriterCohortSize)
const size_t writerCohortSize = WriterCohortSize(AbsStep);
while (WriterRank < writerCohortSize)
{
BP5MetadataInfoStruct *BaseData;
BaseData = (BP5MetadataInfoStruct
Expand All @@ -1735,7 +1779,7 @@ size_t BP5Deserializer::RelativeToAbsoluteStep(const BP5VarRec *VarRec,
{
// variable appeared on this step
RelStep--;
break; // exit while (WriterRank < m_WriterCohortSize)
break; // exit while (WriterRank < writerCohortSize)
}
WriterRank++;
}
Expand All @@ -1762,6 +1806,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step,

MinMax.Init(VarRec->Type);

const size_t writerCohortSize = WriterCohortSize(Step);
size_t StartStep = Step, StopStep = Step + 1;
if (Step == DefaultSizeT)
{
Expand All @@ -1775,7 +1820,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step,
if ((VarRec->OrigShapeID == ShapeID::LocalArray) ||
(VarRec->OrigShapeID == ShapeID::GlobalArray))
{
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
for (size_t WriterRank = 0; WriterRank < writerCohortSize;
WriterRank++)
{
MetaArrayRec *writer_meta_base =
Expand Down Expand Up @@ -1807,7 +1852,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step,
void *writer_meta_base = NULL;
size_t WriterRank = 0;
while ((writer_meta_base == NULL) &&
(WriterRank < m_WriterCohortSize))
(WriterRank < writerCohortSize))
{
writer_meta_base =
GetMetadataBase(VarRec, RelStep, WriterRank++);
Expand All @@ -1816,7 +1861,7 @@ bool BP5Deserializer::VariableMinMax(const VariableBase &Var, const size_t Step,
}
else if (VarRec->OrigShapeID == ShapeID::LocalValue)
{
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
for (size_t WriterRank = 0; WriterRank < writerCohortSize;
WriterRank++)
{
void *writer_meta_base =
Expand Down
22 changes: 15 additions & 7 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class BP5Deserializer : virtual public BP5Base
{

public:
BP5Deserializer(int WriterCount, bool WriterIsRowMajor,
bool ReaderIsRowMajor, bool RandomAccessMode = false);
BP5Deserializer(bool WriterIsRowMajor, bool ReaderIsRowMajor,
bool RandomAccessMode = false);

~BP5Deserializer();

Expand All @@ -52,7 +52,7 @@ class BP5Deserializer : virtual public BP5Base
size_t WriterRank, size_t Step = SIZE_MAX);
void InstallAttributeData(void *AttributeBlock, size_t BlockLen,
size_t Step = SIZE_MAX);
void SetupForTimestep(size_t t);
void SetupForStep(size_t Step, size_t WriterCount);
// return from QueueGet is true if a sync is needed to fill the data
bool QueueGet(core::VariableBase &variable, void *DestData);
bool QueueGetSingle(core::VariableBase &variable, void *DestData,
Expand All @@ -68,8 +68,8 @@ class BP5Deserializer : virtual public BP5Base
bool VariableMinMax(const VariableBase &var, const size_t Step,
Engine::MinMaxStruct &MinMax);

bool m_WriterIsRowMajor = 1;
bool m_ReaderIsRowMajor = 1;
const bool m_WriterIsRowMajor;
const bool m_ReaderIsRowMajor;
core::Engine *m_Engine = NULL;

private:
Expand Down Expand Up @@ -126,8 +126,16 @@ class BP5Deserializer : virtual public BP5Base
};

FFSContext ReaderFFSContext;
size_t m_WriterCohortSize;
bool m_RandomAccessMode;

const bool m_RandomAccessMode;

std::vector<size_t> m_WriterCohortSize; // per step, in random mode
size_t m_CurrentWriterCohortSize; // valid in streaming mode
// return the number of writers
// m_CurrentWriterCohortSize in streaming mode
// m_WriterCohortSize[Step] in random access mode
size_t WriterCohortSize(size_t Step);

size_t m_LastAttrStep = MaxSizeT; // invalid timestep for start

std::unordered_map<std::string, BP5VarRec *> VarByName;
Expand Down

0 comments on commit 46a0c3b

Please sign in to comment.