Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable local arrays with FFS marshaling #1373

Merged
merged 3 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,25 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
* setup shape of array variable as global (I.E. Count == Shape,
* Start == 0)
*/
for (int i = 0; i < DimCount; i++)
if (Shape)
{
VecShape.push_back(Shape[i]);
VecStart.push_back(0);
VecCount.push_back(Shape[i]);
for (int i = 0; i < DimCount; i++)
{
VecShape.push_back(Shape[i]);
VecStart.push_back(0);
VecCount.push_back(Shape[i]);
}
}
else
{
VecShape = {};
VecStart = {};
for (int i = 0; i < DimCount; i++)
{
VecCount.push_back(Count[i]);
}
}

if (Type == "compound")
{
return (void *)NULL;
Expand Down Expand Up @@ -332,10 +345,29 @@ void SstReader::Init()
{ \
if (m_WriterMarshalMethod == SstMarshalFFS) \
{ \
SstFFSGetDeferred( \
m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
size_t *Start = NULL; \
size_t *Count = NULL; \
size_t DimCount = 0; \
\
if (variable.m_SelectionType == \
adios2::SelectionType::BoundingBox) \
{ \
DimCount = variable.m_Shape.size(); \
Start = variable.m_Start.data(); \
Count = variable.m_Count.data(); \
SstFFSGetDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, Start, \
Count, data); \
} \
else if (variable.m_SelectionType == \
adios2::SelectionType::WriteBlock) \
{ \
DimCount = variable.m_Count.size(); \
Count = variable.m_Count.data(); \
SstFFSGetLocalDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, \
variable.m_BlockID, Count, data); \
} \
SstFFSPerformGets(m_Input); \
} \
if (m_WriterMarshalMethod == SstMarshalBP) \
Expand All @@ -352,10 +384,29 @@ void SstReader::Init()
{ \
if (m_WriterMarshalMethod == SstMarshalFFS) \
{ \
SstFFSGetDeferred( \
m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
size_t *Start = NULL; \
size_t *Count = NULL; \
size_t DimCount = 0; \
\
if (variable.m_SelectionType == \
adios2::SelectionType::BoundingBox) \
{ \
DimCount = variable.m_Shape.size(); \
Start = variable.m_Start.data(); \
Count = variable.m_Count.data(); \
SstFFSGetDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, Start, \
Count, data); \
} \
else if (variable.m_SelectionType == \
adios2::SelectionType::WriteBlock) \
{ \
DimCount = variable.m_Count.size(); \
Count = variable.m_Count.data(); \
SstFFSGetLocalDeferred(m_Input, (void *)&variable, \
variable.m_Name.c_str(), DimCount, \
variable.m_BlockID, Count, data); \
} \
} \
if (m_WriterMarshalMethod == SstMarshalBP) \
{ \
Expand Down
22 changes: 19 additions & 3 deletions source/adios2/engine/sst/SstWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,26 @@ void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values)

if (m_MarshalMethod == SstMarshalFFS)
{
size_t *Shape = NULL;
size_t *Start = NULL;
size_t *Count = NULL;
size_t DimCount = 0;

if (variable.m_ShapeID == ShapeID::GlobalArray)
{
DimCount = variable.m_Shape.size();
Shape = variable.m_Shape.data();
Start = variable.m_Start.data();
Count = variable.m_Count.data();
}
else if (variable.m_ShapeID == ShapeID::LocalArray)
{
DimCount = variable.m_Count.size();
Count = variable.m_Count.data();
}
SstFFSMarshal(m_Output, (void *)&variable, variable.m_Name.c_str(),
variable.m_Type.c_str(), variable.m_ElementSize,
variable.m_Shape.size(), variable.m_Shape.data(),
variable.m_Count.data(), variable.m_Start.data(), values);
variable.m_Type.c_str(), variable.m_ElementSize, DimCount,
Shape, Count, Start, values);
}
else if (m_MarshalMethod == SstMarshalBP)
{
Expand Down
7 changes: 2 additions & 5 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm)
writer_data_t ReturnData;
struct _ReaderActivateMsg Msg;
struct timeval Start, Stop, Diff;
int i;
char *Filename = strdup(Name);
CMConnection rank0_to_rank0_conn = NULL;

Expand Down Expand Up @@ -427,7 +426,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm)
}
}

for (i = 0; i < ReturnData->WriterCohortSize; i++)
for (int i = 0; i < ReturnData->WriterCohortSize; i++)
{
attr_list attrs =
attr_list_from_string(ReturnData->CP_WriterInfo[i]->ContactInfo);
Expand Down Expand Up @@ -933,14 +932,13 @@ static void FreeTimestep(SstStream Stream, long Timestep)

static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep)
{
struct _TimestepMetadataList *Next;
TSMetadataList FoundTS = NULL;
pthread_mutex_lock(&Stream->DataLock);
Next = Stream->Timesteps;
CP_verbose(Stream, "Wait for next metadata after last timestep %d\n",
LastTimestep);
while (1)
{
struct _TimestepMetadataList *Next;
Next = Stream->Timesteps;
while (Next)
{
Expand Down Expand Up @@ -1090,7 +1088,6 @@ static void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg,

extern void SstReleaseStep(SstStream Stream)
{
long MaxTimestep;
long Timestep = Stream->ReaderTimestep;
struct _ReleaseTimestepMsg Msg;

Expand Down
82 changes: 76 additions & 6 deletions source/adios2/toolkit/sst/cp/ffs_marshal.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable,
// Build request structure and enter it into requests list
FFSArrayRequest Req = malloc(sizeof(*Req));
Req->VarRec = Var;
Req->RequestType = Global;
// make a copy of Start and Count request
Req->Start = malloc(sizeof(Start[0]) * Var->DimCount);
memcpy(Req->Start, Start, sizeof(Start[0]) * Var->DimCount);
Expand All @@ -729,8 +730,49 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable,
}
}

extern void SstFFSGetLocalDeferred(SstStream Stream, void *Variable,
const char *Name, size_t DimCount,
const int BlockID, const size_t *Count,
void *Data)
{
struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData;
int GetFromWriter = 0;
FFSVarRec Var = LookupVarByKey(Stream, Variable);

// if Variable is in Metadata (I.E. DimCount == 0), move incoming data to
// Data area
if (DimCount == 0)
{
void *IncomingDataBase =
((char *)Info->MetadataBaseAddrs[GetFromWriter]) +
Var->PerWriterMetaFieldDesc[GetFromWriter]->field_offset;
memcpy(Data, IncomingDataBase,
Var->PerWriterMetaFieldDesc[GetFromWriter]->field_size);
}
else
{
// Build request structure and enter it into requests list
FFSArrayRequest Req = malloc(sizeof(*Req));
memset(Req, 0, sizeof(*Req));
Req->VarRec = Var;
Req->RequestType = Local;
Req->NodeID = BlockID;
// make a copy of Count request
Req->Count = malloc(sizeof(Count[0]) * Var->DimCount);
memcpy(Req->Count, Count, sizeof(Count[0]) * Var->DimCount);
Req->Data = Data;
Req->Next = Info->PendingVarRequests;
Info->PendingVarRequests = Req;
}
}

static int NeedWriter(FFSArrayRequest Req, int i)
{
if (Req->RequestType == Local)
{
return (Req->NodeID == i);
}
// else Global case
for (int j = 0; j < Req->VarRec->DimCount; j++)
{
size_t SelOffset = Req->Start[j];
Expand Down Expand Up @@ -1168,6 +1210,20 @@ static void FillReadRequests(SstStream Stream, FFSArrayRequest Reqs)
size_t IncomingSize = Reqs->VarRec->PerWriterIncomingSize[i];
int FreeIncoming = 0;

if (Reqs->RequestType == Local)
{
RankOffset = calloc(DimCount, sizeof(RankOffset[0]));
GlobalDimensions =
calloc(DimCount, sizeof(GlobalDimensions[0]));
if (SelOffset == NULL)
{
SelOffset = calloc(DimCount, sizeof(RankOffset[0]));
}
for (int i = 0; i < DimCount; i++)
{
GlobalDimensions[i] = RankSize[i];
}
}
if ((Stream->WriterConfigParams->CompressionMethod ==
SstCompressZFP) &&
ZFPcompressionPossible(Type, DimCount))
Expand Down Expand Up @@ -1244,9 +1300,17 @@ extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep)
if (!Info->MetaFormat)
{
struct FFSFormatBlock *Block = malloc(sizeof(*Block));
FMFormat Format = FMregister_simple_format(
Info->LocalFMContext, "MetaData", Info->MetaFields,
FMstruct_size_field_list(Info->MetaFields, sizeof(char *)));
FMStructDescRec struct_list[4] = {
{NULL, NULL, 0, NULL},
{"complex4", fcomplex_field_list, sizeof(fcomplex_struct), NULL},
{"complex8", dcomplex_field_list, sizeof(dcomplex_struct), NULL},
{NULL, NULL, 0, NULL}};
struct_list[0].format_name = "MetaData";
struct_list[0].field_list = Info->MetaFields;
struct_list[0].struct_size =
FMstruct_size_field_list(Info->MetaFields, sizeof(char *));
FMFormat Format =
register_data_format(Info->LocalFMContext, &struct_list[0]);
Info->MetaFormat = Format;
Block->FormatServerRep =
get_server_rep_FMformat(Format, &Block->FormatServerRepLen);
Expand Down Expand Up @@ -1667,7 +1731,7 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData,
VarRec->ElementSize = ElementSize;
VarRec->Variable = Stream->ArraySetupUpcall(
Stream->SetupUpcallReader, ArrayName, Type, meta_base->Dims,
meta_base->Shape, meta_base->Count, meta_base->Offsets);
meta_base->Shape, meta_base->Offsets, meta_base->Count);
}
if (WriterRank == 0)
{
Expand Down Expand Up @@ -1813,9 +1877,15 @@ extern void SstFFSMarshal(SstStream Stream, void *Variable, const char *Name,

/* handle metadata */
MetaEntry->Dims = DimCount;
MetaEntry->Shape = CopyDims(DimCount, Shape);
if (Shape)
MetaEntry->Shape = CopyDims(DimCount, Shape);
else
MetaEntry->Shape = NULL;
MetaEntry->Count = CopyDims(DimCount, Count);
MetaEntry->Offsets = CopyDims(DimCount, Offsets);
if (Offsets)
MetaEntry->Offsets = CopyDims(DimCount, Offsets);
else
MetaEntry->Offsets = NULL;

if ((Stream->ConfigParams->CompressionMethod == SstCompressZFP) &&
ZFPcompressionPossible(Type, DimCount))
Expand Down
8 changes: 8 additions & 0 deletions source/adios2/toolkit/sst/cp/ffs_marshal.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ typedef struct FFSVarRec
size_t *PerWriterIncomingSize; // important for compression
} * FFSVarRec;

enum FFSRequestTypeEnum
{
Global = 0,
Local = 1
};

typedef struct FFSArrayRequest
{
FFSVarRec VarRec;
enum FFSRequestTypeEnum RequestType;
size_t NodeID;
size_t *Start;
size_t *Count;
void *Data;
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/toolkit/sst/sst.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable,
const char *Name, size_t DimCount,
const size_t *Start, const size_t *Count,
void *Data);
extern void SstFFSGetLocalDeferred(SstStream Stream, void *Variable,
const char *Name, size_t DimCount,
const int BlockID, const size_t *Count,
void *Data);

extern SstStatusValue SstFFSPerformGets(SstStream Stream);

Expand Down
3 changes: 0 additions & 3 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ list(FILTER SST_TESTS EXCLUDE REGEX "Fto.*FFS.*")
# remove Fto anything tests that use CommMin because we can't spec it
list(FILTER SST_TESTS EXCLUDE REGEX "Fto.*CommMin.*")

# remove Local tests that use FFS, this doesn't work yet
list(FILTER SST_TESTS EXCLUDE REGEX "Local.*FFS")

list( LENGTH SST_TESTS afterlistlen )
message (STATUS "Staging tests list before was ${beforelistlen}, after is ${afterlistlen}")

Expand Down
2 changes: 2 additions & 0 deletions testing/adios2/engine/staging-common/TestCommonReadLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8)

writerSize = var_time.Shape()[0];

// std::cout << "Writer size is " << writerSize << std::endl;

int rankToRead = mpiRank;
if (writerSize < mpiSize)
{
Expand Down