Skip to content

Commit

Permalink
Merge pull request #2833 from eisenhauer/BP5
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer authored Aug 19, 2021
2 parents 7d7c64b + 7baf3be commit 67ebd6d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 48 deletions.
2 changes: 1 addition & 1 deletion source/adios2/core/Variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class Variable : public VariableBase
using Span = core::Span<T>;

/** Needs a map to preserve iterator as it resizes and the key to match the
* m_BlocksInfo index */
* m_BlocksInfo index (BP4 ONLY) */
std::map<size_t, Span> m_BlocksSpan;

Variable<T>(const std::string &name, const Dims &shape, const Dims &start,
Expand Down
93 changes: 65 additions & 28 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void BP5Deserializer::SetupForTimestep(size_t Timestep)
}

void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
size_t WriterRank)
size_t WriterRank, size_t Step)
{
FFSTypeHandle FFSformat;
void *BaseData;
Expand Down Expand Up @@ -387,7 +387,12 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}
ControlArray = &Control->Controls[0];

// if (m_RandomAccessMode) {
// PrepareForTimestep(Step);
// }
MetadataBaseAddrs[WriterRank] = BaseData;
// } else {
// Loaded
for (int i = 0; i < Control->ControlCount; i++)
{
int FieldOffset = ControlArray[i].FieldOffset;
Expand All @@ -412,6 +417,12 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}
if (WriterRank == 0)
{
// use the shape from rank 0
VarRec->GlobalDims = meta_base->Shape;
}
else if (VarRec->GlobalDims == NULL)
{
// unless there wasn't one before
VarRec->GlobalDims = meta_base->Shape;
}
if (!VarRec->Variable)
Expand All @@ -422,23 +433,19 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
VarByKey[VarRec->Variable] = VarRec;
}
VarRec->DimCount = meta_base->Dims;
VarRec->PerWriterBlockCount[WriterRank] =
size_t BlockCount =
meta_base->Dims ? meta_base->DBCount / meta_base->Dims : 1;
VarRec->PerWriterStart[WriterRank] = meta_base->Offsets;
VarRec->PerWriterCounts[WriterRank] = meta_base->Count;
VarRec->PerWriterDataLocation[WriterRank] = meta_base->DataLocation;
if (WriterRank == 0)
{
VarRec->PerWriterBlockStart[WriterRank] = 0;
if (m_WriterCohortSize > 1)
VarRec->PerWriterBlockStart[WriterRank + 1] =
VarRec->PerWriterBlockCount[WriterRank];
VarRec->PerWriterBlockStart[WriterRank + 1] = BlockCount;
}
if (WriterRank < static_cast<size_t>(m_WriterCohortSize - 1))
{
VarRec->PerWriterBlockStart[WriterRank + 1] =
VarRec->PerWriterBlockStart[WriterRank] +
VarRec->PerWriterBlockCount[WriterRank];
VarRec->PerWriterBlockStart[WriterRank] + BlockCount;
}
}
else
Expand All @@ -454,7 +461,7 @@ void BP5Deserializer::InstallMetaData(void *MetadataBlock, size_t BlockLen,
}

void BP5Deserializer::InstallAttributeData(void *AttributeBlock,
size_t BlockLen)
size_t BlockLen, size_t Step)
{
static int DumpMetadata = -1;
FMFieldList FieldList;
Expand Down Expand Up @@ -582,29 +589,38 @@ bool BP5Deserializer::QueueGet(core::VariableBase &variable, void *DestData)
return true;
}

bool BP5Deserializer::NeedWriter(BP5ArrayRequest Req, size_t i)
bool BP5Deserializer::NeedWriter(BP5ArrayRequest Req, size_t WriterRank)
{
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)(((char *)MetadataBaseAddrs[WriterRank]) +
Req.VarRec->PerWriterMetaFieldOffset[WriterRank]);

if (Req.RequestType == Local)
{
size_t NodeFirst = Req.VarRec->PerWriterBlockStart[i];
size_t NodeLast = Req.VarRec->PerWriterBlockCount[i] + NodeFirst - 1;
size_t WriterBlockCount =
writer_meta_base->Dims
? writer_meta_base->DBCount / writer_meta_base->Dims
: 1;
size_t NodeFirst = Req.VarRec->PerWriterBlockStart[WriterRank];
size_t NodeLast = WriterBlockCount + NodeFirst - 1;
bool res = (NodeFirst <= Req.BlockID) && (NodeLast >= Req.BlockID);
return res;
}
// else Global case
for (size_t j = 0; j < Req.VarRec->DimCount; j++)
for (size_t j = 0; j < writer_meta_base->Dims; j++)
{
size_t SelOffset = Req.Start[j];
size_t SelSize = Req.Count[j];
size_t RankOffset;
size_t RankSize;
if (Req.VarRec->PerWriterStart[i] == NULL)

if (writer_meta_base->Offsets == NULL)
/* this writer didn't write */
{
return false;
}
RankOffset = Req.VarRec->PerWriterStart[i][j];
RankSize = Req.VarRec->PerWriterCounts[i][j];
RankOffset = writer_meta_base->Offsets[j];
RankSize = writer_meta_base->Count[j];
if ((SelSize == 0) || (RankSize == 0))
{
return false;
Expand Down Expand Up @@ -663,39 +679,51 @@ void BP5Deserializer::FinalizeGets(std::vector<ReadRequest> Requests)
for (const auto &Req : PendingRequests)
{
// ImplementGapWarning(Reqs);
for (size_t i = 0; i < m_WriterCohortSize; i++)
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize;
WriterRank++)
{
if (NeedWriter(Req, i))
if (NeedWriter(Req, WriterRank))
{
/* if needed this writer fill destination with acquired data */
int ElementSize = Req.VarRec->ElementSize;
int DimCount = Req.VarRec->DimCount;
size_t *GlobalDimensions = Req.VarRec->GlobalDims;
size_t *RankOffset = Req.VarRec->PerWriterStart[i];
const size_t *RankSize = Req.VarRec->PerWriterCounts[i];
MetaArrayRec *writer_meta_base =
(MetaArrayRec
*)(((char *)MetadataBaseAddrs[WriterRank]) +
Req.VarRec->PerWriterMetaFieldOffset[WriterRank]);
int DimCount = writer_meta_base->Dims;
size_t *RankOffset = writer_meta_base->Offsets;
const size_t *RankSize = writer_meta_base->Count;
std::vector<size_t> ZeroSel(DimCount);
std::vector<size_t> ZeroRankOffset(DimCount);
std::vector<size_t> ZeroGlobalDimensions(DimCount);
const size_t *SelOffset = NULL;
const size_t *SelSize = Req.Count.data();
int ReqIndex = 0;
while (Requests[ReqIndex].WriterRank != static_cast<size_t>(i))
while (Requests[ReqIndex].WriterRank !=
static_cast<size_t>(WriterRank))
ReqIndex++;
if (Req.VarRec->PerWriterDataLocation[WriterRank] == NULL)
{
// No Data from this writer
continue;
}
char *IncomingData =
(char *)Requests[ReqIndex].DestinationAddr +
Req.VarRec->PerWriterDataLocation[i][0];

Req.VarRec->PerWriterDataLocation[WriterRank][0];
if (Req.Start.size())
{
SelOffset = Req.Start.data();
}
if (Req.RequestType == Local)
{
int LocalBlockID =
Req.BlockID - Req.VarRec->PerWriterBlockStart[i];
Req.BlockID -
Req.VarRec->PerWriterBlockStart[WriterRank];
IncomingData =
(char *)Requests[ReqIndex].DestinationAddr +
Req.VarRec->PerWriterDataLocation[i][LocalBlockID];
Req.VarRec
->PerWriterDataLocation[WriterRank][LocalBlockID];

RankOffset = ZeroRankOffset.data();
GlobalDimensions = ZeroGlobalDimensions.data();
Expand Down Expand Up @@ -1021,7 +1049,14 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
int Id = 0;
for (size_t WriterRank = 0; WriterRank < m_WriterCohortSize; WriterRank++)
{
Id += VarRec->PerWriterBlockCount[WriterRank];
MetaArrayRec *writer_meta_base =
(MetaArrayRec *)(((char *)MetadataBaseAddrs[WriterRank]) +
VarRec->PerWriterMetaFieldOffset[WriterRank]);
size_t WriterBlockCount =
writer_meta_base->Dims
? writer_meta_base->DBCount / writer_meta_base->Dims
: 1;
Id += WriterBlockCount;
}
MV->BlocksInfo.reserve(Id);

Expand All @@ -1032,7 +1067,9 @@ Engine::MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var,
(MetaArrayRec *)(((char *)MetadataBaseAddrs[WriterRank]) +
VarRec->PerWriterMetaFieldOffset[WriterRank]);

for (size_t i = 0; i < VarRec->PerWriterBlockCount[WriterRank]; i++)
size_t WriterBlockCount =
meta_base->Dims ? meta_base->DBCount / meta_base->Dims : 1;
for (size_t i = 0; i < WriterBlockCount; i++)
{
size_t *Offsets = NULL;
size_t *Count = NULL;
Expand Down
18 changes: 6 additions & 12 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ class BP5Deserializer : virtual public BP5Base
char *DestinationAddr;
void *Internal;
};
void SetupForRandomAccess();
void InstallMetaMetaData(MetaMetaInfoBlock &MMList);
void InstallMetaData(void *MetadataBlock, size_t BlockLen,
size_t WriterRank);
void InstallAttributeData(void *AttributeBlock, size_t BlockLen);
size_t WriterRank, size_t Step = SIZE_MAX);
void InstallAttributeData(void *AttributeBlock, size_t BlockLen,
size_t Step = SIZE_MAX);
void SetupForTimestep(size_t t);
// return from QueueGet is true if a sync is needed to fill the data
bool QueueGet(core::VariableBase &variable, void *DestData);
Expand All @@ -64,6 +66,7 @@ class BP5Deserializer : virtual public BP5Base
bool m_WriterIsRowMajor = 1;
bool m_ReaderIsRowMajor = 1;
core::Engine *m_Engine = NULL;
bool m_RandomAccessMode = 0;

private:
struct BP5VarRec
Expand All @@ -74,23 +77,14 @@ class BP5Deserializer : virtual public BP5Base
DataType Type;
int ElementSize = 0;
size_t *GlobalDims = NULL;
size_t LastTSAdded = SIZE_MAX;
std::vector<size_t> PerWriterMetaFieldOffset;
std::vector<size_t> PerWriterBlockStart;
std::vector<size_t> PerWriterBlockCount;
std::vector<size_t *> PerWriterStart;
std::vector<size_t *> PerWriterCounts;
std::vector<void *> PerWriterIncomingData;
std::vector<size_t> PerWriterIncomingSize; // important for compression
std::vector<size_t *> PerWriterDataLocation;
BP5VarRec(int WriterSize)
{
PerWriterMetaFieldOffset.resize(WriterSize);
PerWriterBlockStart.resize(WriterSize);
PerWriterBlockCount.resize(WriterSize);
PerWriterStart.resize(WriterSize);
PerWriterCounts.resize(WriterSize);
PerWriterIncomingData.resize(WriterSize);
PerWriterIncomingSize.resize(WriterSize);
PerWriterDataLocation.resize(WriterSize);
}
};
Expand Down
14 changes: 7 additions & 7 deletions testing/adios2/engine/staging-common/run_test.py.gen.in
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ reader_command_line.extend([reader_executable, canonical_engine, args.filename])
if args.rarg is not None:
reader_command_line.extend(args.rarg)

if args.num_writers == 0:
writer_command_line = None
mpmd_possible = False
print("TestDriver: No writers, setting MPMD false\n")
else:
print("TestDriver: Writer command line : " + " ".join(writer_command_line))

if args.num_readers == 0:
reader_command_line = None
mpmd_possible = False
Expand All @@ -329,13 +336,6 @@ if args.num_readers == 0:
else:
print("TestDriver: Reader command line : " + " ".join(reader_command_line))

if args.num_writers == 0:
writer_command_line = None
mpmd_possible = False
print("TestDriver: No writers, setting MPMD false\n")
else:
print("TestDriver: Writer command line : " + " ".join(writer_command_line))

if is_file_engine[args.engine.lower()]:
print("TestDriver: Is file engine, setting MPMD false\n")
mpmd_possible = False
Expand Down

0 comments on commit 67ebd6d

Please sign in to comment.