From 026ec070b3907453f95a882a5d1192a67f6893d3 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Sat, 6 Apr 2019 16:22:05 -0400 Subject: [PATCH 1/3] Tentative Local support --- source/adios2/engine/sst/SstReader.cpp | 22 +++++++++++----- source/adios2/toolkit/sst/cp/cp_reader.c | 7 ++--- source/adios2/toolkit/sst/cp/ffs_marshal.c | 26 ++++++++++++++----- .../engine/staging-common/CMakeLists.txt | 3 --- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp index 35ed846ebc..055f01011d 100644 --- a/source/adios2/engine/sst/SstReader.cpp +++ b/source/adios2/engine/sst/SstReader.cpp @@ -131,12 +131,22 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode, * setup shape of array variable as global (I.E. Count == Shape, * Start == 0) */ - for (int i = 0; i < DimCount; i++) - { - VecShape.push_back(Shape[i]); - VecStart.push_back(0); - VecCount.push_back(Shape[i]); - } + if (Shape) { + for (int i = 0; i < DimCount; i++) + { + VecShape.push_back(Shape[i]); + VecStart.push_back(0); + VecCount.push_back(Shape[i]); + } + } else { + VecShape = {}; + VecStart = {}; + for (int i = 0; i < DimCount; i++) + { + VecCount.push_back(Count[i]); + } + } + if (Type == "compound") { return (void *)NULL; diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index eae966e533..a0ef929dd2 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -230,7 +230,6 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) writer_data_t ReturnData; struct _ReaderActivateMsg Msg; struct timeval Start, Stop, Diff; - int i; char *Filename = strdup(Name); CMConnection rank0_to_rank0_conn = NULL; @@ -427,7 +426,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) } } - for (i = 0; i < ReturnData->WriterCohortSize; i++) + for (int i = 0; i < ReturnData->WriterCohortSize; i++) { attr_list attrs = attr_list_from_string(ReturnData->CP_WriterInfo[i]->ContactInfo); @@ -933,14 +932,13 @@ static void FreeTimestep(SstStream Stream, long Timestep) static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep) { - struct _TimestepMetadataList *Next; TSMetadataList FoundTS = NULL; pthread_mutex_lock(&Stream->DataLock); - Next = Stream->Timesteps; CP_verbose(Stream, "Wait for next metadata after last timestep %d\n", LastTimestep); while (1) { + struct _TimestepMetadataList *Next; Next = Stream->Timesteps; while (Next) { @@ -1090,7 +1088,6 @@ static void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg, extern void SstReleaseStep(SstStream Stream) { - long MaxTimestep; long Timestep = Stream->ReaderTimestep; struct _ReleaseTimestepMsg Msg; diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index 381d1c9d5e..bd55d7de24 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -1244,9 +1244,17 @@ extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep) if (!Info->MetaFormat) { struct FFSFormatBlock *Block = malloc(sizeof(*Block)); - FMFormat Format = FMregister_simple_format( - Info->LocalFMContext, "MetaData", Info->MetaFields, - FMstruct_size_field_list(Info->MetaFields, sizeof(char *))); + FMStructDescRec struct_list[4] = { + {NULL, NULL, 0, NULL}, + {"complex4", fcomplex_field_list, sizeof(fcomplex_struct), NULL}, + {"complex8", dcomplex_field_list, sizeof(dcomplex_struct), NULL}, + {NULL, NULL, 0, NULL}}; + struct_list[0].format_name = "MetaData"; + struct_list[0].field_list = Info->MetaFields; + struct_list[0].struct_size = + FMstruct_size_field_list(Info->MetaFields, sizeof(char *)); + FMFormat Format = + register_data_format(Info->LocalFMContext, &struct_list[0]); Info->MetaFormat = Format; Block->FormatServerRep = get_server_rep_FMformat(Format, &Block->FormatServerRepLen); @@ -1667,7 +1675,7 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, VarRec->ElementSize = ElementSize; VarRec->Variable = Stream->ArraySetupUpcall( Stream->SetupUpcallReader, ArrayName, Type, meta_base->Dims, - meta_base->Shape, meta_base->Count, meta_base->Offsets); + meta_base->Shape, meta_base->Offsets, meta_base->Count); } if (WriterRank == 0) { @@ -1813,9 +1821,15 @@ extern void SstFFSMarshal(SstStream Stream, void *Variable, const char *Name, /* handle metadata */ MetaEntry->Dims = DimCount; - MetaEntry->Shape = CopyDims(DimCount, Shape); + if (Shape) + MetaEntry->Shape = CopyDims(DimCount, Shape); + else + MetaEntry->Shape = NULL; MetaEntry->Count = CopyDims(DimCount, Count); - MetaEntry->Offsets = CopyDims(DimCount, Offsets); + if (Offsets) + MetaEntry->Offsets = CopyDims(DimCount, Offsets); + else + MetaEntry->Offsets = NULL; if ((Stream->ConfigParams->CompressionMethod == SstCompressZFP) && ZFPcompressionPossible(Type, DimCount)) diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index 39c2dc2b14..a07ede8ac8 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -144,9 +144,6 @@ list(FILTER SST_TESTS EXCLUDE REGEX "Fto.*FFS.*") # remove Fto anything tests that use CommMin because we can't spec it list(FILTER SST_TESTS EXCLUDE REGEX "Fto.*CommMin.*") -# remove Local tests that use FFS, this doesn't work yet -list(FILTER SST_TESTS EXCLUDE REGEX "Local.*FFS") - list( LENGTH SST_TESTS afterlistlen ) message (STATUS "Staging tests list before was ${beforelistlen}, after is ${afterlistlen}") From a3ab3f8858e6ac95d875299267265c0586f35e15 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Thu, 11 Apr 2019 13:56:46 -0400 Subject: [PATCH 2/3] Finish impl --- source/adios2/engine/sst/SstReader.cpp | 89 ++++++++++++++----- source/adios2/engine/sst/SstWriter.tcc | 22 ++++- source/adios2/toolkit/sst/cp/ffs_marshal.c | 59 ++++++++++++ source/adios2/toolkit/sst/cp/ffs_marshal.h | 8 ++ source/adios2/toolkit/sst/sst.h | 4 + .../staging-common/TestCommonReadLocal.cpp | 2 + 6 files changed, 157 insertions(+), 27 deletions(-) diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp index 055f01011d..e49a37b3af 100644 --- a/source/adios2/engine/sst/SstReader.cpp +++ b/source/adios2/engine/sst/SstReader.cpp @@ -131,22 +131,25 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode, * setup shape of array variable as global (I.E. Count == Shape, * Start == 0) */ - if (Shape) { - for (int i = 0; i < DimCount; i++) - { - VecShape.push_back(Shape[i]); - VecStart.push_back(0); - VecCount.push_back(Shape[i]); - } - } else { - VecShape = {}; - VecStart = {}; - for (int i = 0; i < DimCount; i++) - { - VecCount.push_back(Count[i]); - } - } - + if (Shape) + { + for (int i = 0; i < DimCount; i++) + { + VecShape.push_back(Shape[i]); + VecStart.push_back(0); + VecCount.push_back(Shape[i]); + } + } + else + { + VecShape = {}; + VecStart = {}; + for (int i = 0; i < DimCount; i++) + { + VecCount.push_back(Count[i]); + } + } + if (Type == "compound") { return (void *)NULL; @@ -342,10 +345,29 @@ void SstReader::Init() { \ if (m_WriterMarshalMethod == SstMarshalFFS) \ { \ - SstFFSGetDeferred( \ - m_Input, (void *)&variable, variable.m_Name.c_str(), \ - variable.m_Start.size(), variable.m_Start.data(), \ - variable.m_Count.data(), data); \ + size_t *Start = NULL; \ + size_t *Count = NULL; \ + size_t DimCount = 0; \ + \ + if (variable.m_SelectionType == \ + adios2::SelectionType::BoundingBox) \ + { \ + DimCount = variable.m_Shape.size(); \ + Start = variable.m_Start.data(); \ + Count = variable.m_Count.data(); \ + SstFFSGetDeferred(m_Input, (void *)&variable, \ + variable.m_Name.c_str(), DimCount, Start, \ + Count, data); \ + } \ + else if (variable.m_SelectionType == \ + adios2::SelectionType::WriteBlock) \ + { \ + DimCount = variable.m_Count.size(); \ + Count = variable.m_Count.data(); \ + SstFFSGetLocalDeferred(m_Input, (void *)&variable, \ + variable.m_Name.c_str(), DimCount, \ + variable.m_BlockID, Count, data); \ + } \ SstFFSPerformGets(m_Input); \ } \ if (m_WriterMarshalMethod == SstMarshalBP) \ @@ -362,10 +384,29 @@ void SstReader::Init() { \ if (m_WriterMarshalMethod == SstMarshalFFS) \ { \ - SstFFSGetDeferred( \ - m_Input, (void *)&variable, variable.m_Name.c_str(), \ - variable.m_Start.size(), variable.m_Start.data(), \ - variable.m_Count.data(), data); \ + size_t *Start = NULL; \ + size_t *Count = NULL; \ + size_t DimCount = 0; \ + \ + if (variable.m_SelectionType == \ + adios2::SelectionType::BoundingBox) \ + { \ + DimCount = variable.m_Shape.size(); \ + Start = variable.m_Start.data(); \ + Count = variable.m_Count.data(); \ + SstFFSGetDeferred(m_Input, (void *)&variable, \ + variable.m_Name.c_str(), DimCount, Start, \ + Count, data); \ + } \ + else if (variable.m_SelectionType == \ + adios2::SelectionType::WriteBlock) \ + { \ + DimCount = variable.m_Count.size(); \ + Count = variable.m_Count.data(); \ + SstFFSGetLocalDeferred(m_Input, (void *)&variable, \ + variable.m_Name.c_str(), DimCount, \ + variable.m_BlockID, Count, data); \ + } \ } \ if (m_WriterMarshalMethod == SstMarshalBP) \ { \ diff --git a/source/adios2/engine/sst/SstWriter.tcc b/source/adios2/engine/sst/SstWriter.tcc index e4c8bd4203..229a8114e3 100644 --- a/source/adios2/engine/sst/SstWriter.tcc +++ b/source/adios2/engine/sst/SstWriter.tcc @@ -39,10 +39,26 @@ void SstWriter::PutSyncCommon(Variable &variable, const T *values) if (m_MarshalMethod == SstMarshalFFS) { + size_t *Shape = NULL; + size_t *Start = NULL; + size_t *Count = NULL; + size_t DimCount = 0; + + if (variable.m_ShapeID == ShapeID::GlobalArray) + { + DimCount = variable.m_Shape.size(); + Shape = variable.m_Shape.data(); + Start = variable.m_Start.data(); + Count = variable.m_Count.data(); + } + else if (variable.m_ShapeID == ShapeID::LocalArray) + { + DimCount = variable.m_Count.size(); + Count = variable.m_Count.data(); + } SstFFSMarshal(m_Output, (void *)&variable, variable.m_Name.c_str(), - variable.m_Type.c_str(), variable.m_ElementSize, - variable.m_Shape.size(), variable.m_Shape.data(), - variable.m_Count.data(), variable.m_Start.data(), values); + variable.m_Type.c_str(), variable.m_ElementSize, DimCount, + Shape, Count, Start, values); } else if (m_MarshalMethod == SstMarshalBP) { diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index bd55d7de24..42dc992680 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -718,6 +718,7 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable, // Build request structure and enter it into requests list FFSArrayRequest Req = malloc(sizeof(*Req)); Req->VarRec = Var; + Req->RequestType = Global; // make a copy of Start and Count request Req->Start = malloc(sizeof(Start[0]) * Var->DimCount); memcpy(Req->Start, Start, sizeof(Start[0]) * Var->DimCount); @@ -729,8 +730,49 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable, } } +extern void SstFFSGetLocalDeferred(SstStream Stream, void *Variable, + const char *Name, size_t DimCount, + const int BlockID, const size_t *Count, + void *Data) +{ + struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; + int GetFromWriter = 0; + FFSVarRec Var = LookupVarByKey(Stream, Variable); + + // if Variable is in Metadata (I.E. DimCount == 0), move incoming data to + // Data area + if (DimCount == 0) + { + void *IncomingDataBase = + ((char *)Info->MetadataBaseAddrs[GetFromWriter]) + + Var->PerWriterMetaFieldDesc[GetFromWriter]->field_offset; + memcpy(Data, IncomingDataBase, + Var->PerWriterMetaFieldDesc[GetFromWriter]->field_size); + } + else + { + // Build request structure and enter it into requests list + FFSArrayRequest Req = malloc(sizeof(*Req)); + memset(Req, 0, sizeof(*Req)); + Req->VarRec = Var; + Req->RequestType = Local; + Req->NodeID = BlockID; + // make a copy of Count request + Req->Count = malloc(sizeof(Count[0]) * Var->DimCount); + memcpy(Req->Count, Count, sizeof(Count[0]) * Var->DimCount); + Req->Data = Data; + Req->Next = Info->PendingVarRequests; + Info->PendingVarRequests = Req; + } +} + static int NeedWriter(FFSArrayRequest Req, int i) { + if (Req->RequestType == Local) + { + return (Req->NodeID == i); + } + // else Global case for (int j = 0; j < Req->VarRec->DimCount; j++) { size_t SelOffset = Req->Start[j]; @@ -1168,6 +1210,23 @@ static void FillReadRequests(SstStream Stream, FFSArrayRequest Reqs) size_t IncomingSize = Reqs->VarRec->PerWriterIncomingSize[i]; int FreeIncoming = 0; + printf("Requests, request type = %d\n", Reqs->RequestType); + if (Reqs->RequestType == Local) + { + printf("Got a local read, dim count %d, count %d\n", + DimCount, SelSize[0]); + RankOffset = calloc(DimCount, sizeof(RankOffset[0])); + GlobalDimensions = + calloc(DimCount, sizeof(GlobalDimensions[0])); + if (SelOffset == NULL) + { + SelOffset = calloc(DimCount, sizeof(RankOffset[0])); + } + for (int i = 0; i < DimCount; i++) + { + GlobalDimensions[i] = RankSize[i]; + } + } if ((Stream->WriterConfigParams->CompressionMethod == SstCompressZFP) && ZFPcompressionPossible(Type, DimCount)) diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.h b/source/adios2/toolkit/sst/cp/ffs_marshal.h index 6bd101f29e..9ccb022b82 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.h +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.h @@ -44,9 +44,17 @@ typedef struct FFSVarRec size_t *PerWriterIncomingSize; // important for compression } * FFSVarRec; +enum FFSRequestTypeEnum +{ + Global = 0, + Local = 1 +}; + typedef struct FFSArrayRequest { FFSVarRec VarRec; + enum FFSRequestTypeEnum RequestType; + size_t NodeID; size_t *Start; size_t *Count; void *Data; diff --git a/source/adios2/toolkit/sst/sst.h b/source/adios2/toolkit/sst/sst.h index 5e1359bbdf..c1bbdb902c 100644 --- a/source/adios2/toolkit/sst/sst.h +++ b/source/adios2/toolkit/sst/sst.h @@ -144,6 +144,10 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable, const char *Name, size_t DimCount, const size_t *Start, const size_t *Count, void *Data); +extern void SstFFSGetLocalDeferred(SstStream Stream, void *Variable, + const char *Name, size_t DimCount, + const int BlockID, const size_t *Count, + void *Data); extern SstStatusValue SstFFSPerformGets(SstStream Stream); diff --git a/testing/adios2/engine/staging-common/TestCommonReadLocal.cpp b/testing/adios2/engine/staging-common/TestCommonReadLocal.cpp index d58d06c1b1..fac9712f06 100644 --- a/testing/adios2/engine/staging-common/TestCommonReadLocal.cpp +++ b/testing/adios2/engine/staging-common/TestCommonReadLocal.cpp @@ -114,6 +114,8 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8) writerSize = var_time.Shape()[0]; + // std::cout << "Writer size is " << writerSize << std::endl; + int rankToRead = mpiRank; if (writerSize < mpiSize) { From 8db552eaab3ab2a07daec43b8b3ce8eaa986ef1b Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Thu, 11 Apr 2019 22:31:42 -0400 Subject: [PATCH 3/3] extraneous output --- source/adios2/toolkit/sst/cp/ffs_marshal.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index 42dc992680..99452379a4 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -1210,11 +1210,8 @@ static void FillReadRequests(SstStream Stream, FFSArrayRequest Reqs) size_t IncomingSize = Reqs->VarRec->PerWriterIncomingSize[i]; int FreeIncoming = 0; - printf("Requests, request type = %d\n", Reqs->RequestType); if (Reqs->RequestType == Local) { - printf("Got a local read, dim count %d, count %d\n", - DimCount, SelSize[0]); RankOffset = calloc(DimCount, sizeof(RankOffset[0])); GlobalDimensions = calloc(DimCount, sizeof(GlobalDimensions[0]));