Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some BP5 refactoring; Rank 0 bugfixes #2833

Merged
merged 2 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/adios2/core/Variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class Variable : public VariableBase
using Span = core::Span<T>;

/** Needs a map to preserve iterator as it resizes and the key to match the
* m_BlocksInfo index */
* m_BlocksInfo index (BP4 ONLY) */
std::map<size_t, Span> m_BlocksSpan;

Variable<T>(const std::string &name, const Dims &shape, const Dims &start,
Expand Down
93 changes: 65 additions & 28 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void BP5Deserializer::SetupForTimestep(size_t Timestep)
}

void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
size_t WriterRank)
size_t WriterRank, size_t Step)
{
FFSTypeHandle FFSformat;
void *BaseData;
Expand Down Expand Up @@ -387,7 +387,12 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}
ControlArray = &Control->Controls[0];

// if (m_RandomAccessMode) {
// PrepareForTimestep(Step);
// }
MetadataBaseAddrs[WriterRank] = BaseData;
// } else {
// Loaded
for (int i = 0; i < Control->ControlCount; i++)
{
int FieldOffset = ControlArray[i].FieldOffset;
Expand All @@ -412,6 +417,12 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}
if (WriterRank == 0)
{
// use the shape from rank 0
VarRec->GlobalDims = meta_base->Shape;
}
else if (VarRec->GlobalDims == NULL)
{
// unless there wasn't one before
VarRec->GlobalDims = meta_base->Shape;
}
if (!VarRec->Variable)
Expand All @@ -422,23 +433,19 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
VarByKey[VarRec->Variable] = VarRec;
}
VarRec->DimCount = meta_base->Dims;
VarRec->PerWriterBlockCount[WriterRank] =
size_t BlockCount =
meta_base->Dims ? meta_base->DBCount / meta_base->Dims : 1;
VarRec->PerWriterStart[WriterRank] = meta_base->Offsets;
VarRec->PerWriterCounts[WriterRank] = meta_base->Count;
VarRec->PerWriterDataLocation[WriterRank] = meta_base->DataLocation;
if (WriterRank == 0)
{
VarRec->PerWriterBlockStart[WriterRank] = 0;
if (m_WriterCohortSize > 1)
VarRec->PerWriterBlockStart[WriterRank + 1] =
VarRec->PerWriterBlockCount[WriterRank];
VarRec->PerWriterBlockStart[WriterRank + 1] = BlockCount;
}
if (WriterRank < static_cast<size_t>(m_WriterCohortSize - 1))
{
VarRec->PerWriterBlockStart[WriterRank + 1] =
VarRec->PerWriterBlockStart[WriterRank] +
VarRec->PerWriterBlockCount[WriterRank];
VarRec->PerWriterBlockStart[WriterRank] + BlockCount;
}
}
else
Expand All @@ -454,7 +461,7 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}

void BP5Deserializer::InstallAttributeData(void *AttributeBlock,
size_t BlockLen)
size_t BlockLen, size_t Step)
{
static int DumpMetadata = -1;
FMFieldList FieldList;
Expand Down Expand Up @@ -582,29 +589,38 @@ bool BP5Deserializer::QueueGet(core::VariableBase &variable, void *DestData)
return true;
}

bool BP5Deserializer::NeedWriter(BP5ArrayRequest Req, size_t i)
bool BP5Deserializer::NeedWriter(BP5ArrayRequest Req, size_t WriterRank)
{
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)(((char *)MetadataBaseAddrs[WriterRank]) +
Req.VarRec->PerWriterMetaFieldOffset[WriterRank]);

if (Req.RequestType == Local)
{
size_t NodeFirst = Req.VarRec->PerWriterBlockStart[i];
size_t NodeLast = Req.VarRec->PerWriterBlockCount[i] + NodeFirst - 1;
size_t WriterBlockCount =
writer_meta_base->Dims
? writer_meta_base->DBCount / writer_meta_base->Dims
: 1;
size_t NodeFirst = Req.VarRec->PerWriterBlockStart[WriterRank];
size_t NodeLast = WriterBlockCount + NodeFirst - 1;
bool res = (NodeFirst <= Req.BlockID) && (NodeLast >= Req.BlockID);
return res;
}
// else Global case
for (size_t j = 0; j < Req.VarRec->DimCount; j++)
for (size_t j = 0; j < writer_meta_base->Dims; j++)
{
size_t SelOffset = Req.Start[j];
size_t SelSize = Req.Count[j];
size_t RankOffset;
size_t RankSize;
if (Req.VarRec->PerWriterStart[i] == NULL)

if (writer_meta_base->Offsets == NULL)
/* this writer didn't write */
{
return false;
}
RankOffset = Req.VarRec->PerWriterStart[i][j];
RankSize = Req.VarRec->PerWriterCounts[i][j];
RankOffset = writer_meta_base->Offsets[j];
RankSize = writer_meta_base->Count[j];
if ((SelSize == 0) || (RankSize == 0))
{
return false;
Expand Down Expand Up @@ -663,39 +679,51 @@ void BP5Deserializer::FinalizeGets(std::vector<ReadRequest> Requests)
for (const auto &Req : PendingRequests)
{
// ImplementGapWarning(Reqs);
for (size_t i = 0; i < m_WriterCohortSize; i++)
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
WriterRank++)
{
if (NeedWriter(Req, i))
if (NeedWriter(Req, WriterRank))
{
/* if needed this writer fill destination with acquired data */
int ElementSize = Req.VarRec->ElementSize;
int DimCount = Req.VarRec->DimCount;
size_t *GlobalDimensions = Req.VarRec->GlobalDims;
size_t *RankOffset = Req.VarRec->PerWriterStart[i];
const size_t *RankSize = Req.VarRec->PerWriterCounts[i];
MetaArrayRec *writer_meta_base =
(MetaArrayRec
*)(((char *)MetadataBaseAddrs[WriterRank]) +
Req.VarRec->PerWriterMetaFieldOffset[WriterRank]);
int DimCount = writer_meta_base->Dims;
size_t *RankOffset = writer_meta_base->Offsets;
const size_t *RankSize = writer_meta_base->Count;
std::vector<size_t> ZeroSel(DimCount);
std::vector<size_t> ZeroRankOffset(DimCount);
std::vector<size_t> ZeroGlobalDimensions(DimCount);
const size_t *SelOffset = NULL;
const size_t *SelSize = Req.Count.data();
int ReqIndex = 0;
while (Requests[ReqIndex].WriterRank != static_cast<size_t>(i))
while (Requests[ReqIndex].WriterRank !=
static_cast<size_t>(WriterRank))
ReqIndex++;
if (Req.VarRec->PerWriterDataLocation[WriterRank] == NULL)
{
// No Data from this writer
continue;
}
char *IncomingData =
(char *)Requests[ReqIndex].DestinationAddr +
Req.VarRec->PerWriterDataLocation[i][0];

Req.VarRec->PerWriterDataLocation[WriterRank][0];
if (Req.Start.size())
{
SelOffset = Req.Start.data();
}
if (Req.RequestType == Local)
{
int LocalBlockID =
Req.BlockID - Req.VarRec->PerWriterBlockStart[i];
Req.BlockID -
Req.VarRec->PerWriterBlockStart[WriterRank];
IncomingData =
(char *)Requests[ReqIndex].DestinationAddr +
Req.VarRec->PerWriterDataLocation[i][LocalBlockID];
Req.VarRec
->PerWriterDataLocation[WriterRank][LocalBlockID];

RankOffset = ZeroRankOffset.data();
GlobalDimensions = ZeroGlobalDimensions.data();
Expand Down Expand Up @@ -1021,7 +1049,14 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
int Id = 0;
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; WriterRank++)
{
Id += VarRec->PerWriterBlockCount[WriterRank];
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)(((char *)MetadataBaseAddrs[WriterRank]) +
VarRec->PerWriterMetaFieldOffset[WriterRank]);
size_t WriterBlockCount =
writer_meta_base->Dims
? writer_meta_base->DBCount / writer_meta_base->Dims
: 1;
Id += WriterBlockCount;
}
MV->BlocksInfo.reserve(Id);

Expand All @@ -1032,7 +1067,9 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
(MetaArrayRec *)(((char *)MetadataBaseAddrs[WriterRank]) +
VarRec->PerWriterMetaFieldOffset[WriterRank]);

for (size_t i = 0; i < VarRec->PerWriterBlockCount[WriterRank]; i++)
size_t WriterBlockCount =
meta_base->Dims ? meta_base->DBCount / meta_base->Dims : 1;
for (size_t i = 0; i < WriterBlockCount; i++)
{
size_t *Offsets = NULL;
size_t *Count = NULL;
Expand Down
18 changes: 6 additions & 12 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ class BP5Deserializer : virtual public BP5Base
char *DestinationAddr;
void *Internal;
};
void SetupForRandomAccess();
void InstallMetaMetaData(MetaMetaInfoBlock &MMList);
void InstallMetaData(void *MetadataBlock, size_t BlockLen,
size_t WriterRank);
void InstallAttributeData(void *AttributeBlock, size_t BlockLen);
size_t WriterRank, size_t Step = SIZE_MAX);
void InstallAttributeData(void *AttributeBlock, size_t BlockLen,
size_t Step = SIZE_MAX);
void SetupForTimestep(size_t t);
// return from QueueGet is true if a sync is needed to fill the data
bool QueueGet(core::VariableBase &variable, void *DestData);
Expand All @@ -64,6 +66,7 @@ class BP5Deserializer : virtual public BP5Base
bool m_WriterIsRowMajor = 1;
bool m_ReaderIsRowMajor = 1;
core::Engine *m_Engine = NULL;
bool m_RandomAccessMode = 0;

private:
struct BP5VarRec
Expand All @@ -74,23 +77,14 @@ class BP5Deserializer : virtual public BP5Base
DataType Type;
int ElementSize = 0;
size_t *GlobalDims = NULL;
size_t LastTSAdded = SIZE_MAX;
std::vector<size_t> PerWriterMetaFieldOffset;
std::vector<size_t> PerWriterBlockStart;
std::vector<size_t> PerWriterBlockCount;
std::vector<size_t *> PerWriterStart;
std::vector<size_t *> PerWriterCounts;
std::vector<void *> PerWriterIncomingData;
std::vector<size_t> PerWriterIncomingSize; // important for compression
std::vector<size_t *> PerWriterDataLocation;
BP5VarRec(int WriterSize)
{
PerWriterMetaFieldOffset.resize(WriterSize);
PerWriterBlockStart.resize(WriterSize);
PerWriterBlockCount.resize(WriterSize);
PerWriterStart.resize(WriterSize);
PerWriterCounts.resize(WriterSize);
PerWriterIncomingData.resize(WriterSize);
PerWriterIncomingSize.resize(WriterSize);
PerWriterDataLocation.resize(WriterSize);
}
};
Expand Down
14 changes: 7 additions & 7 deletions testing/adios2/engine/staging-common/run_test.py.gen.in
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ reader_command_line.extend([reader_executable, canonical_engine, args.filename])
if args.rarg is not None:
reader_command_line.extend(args.rarg)

if args.num_writers == 0:
writer_command_line = None
mpmd_possible = False
print("TestDriver: No writers, setting MPMD false\n")
else:
print("TestDriver: Writer command line : " + " ".join(writer_command_line))

if args.num_readers == 0:
reader_command_line = None
mpmd_possible = False
Expand All @@ -329,13 +336,6 @@ if args.num_readers == 0:
else:
print("TestDriver: Reader command line : " + " ".join(reader_command_line))

if args.num_writers == 0:
writer_command_line = None
mpmd_possible = False
print("TestDriver: No writers, setting MPMD false\n")
else:
print("TestDriver: Writer command line : " + " ".join(writer_command_line))

if is_file_engine[args.engine.lower()]:
print("TestDriver: Is file engine, setting MPMD false\n")
mpmd_possible = False
Expand Down