diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index b3e86a6079..aeba174b00 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1009,6 +1009,97 @@ extern void AddToLastCallFreeList(void *Block) SharedCMInfo->LastCallFreeCount++; } +static void ReadableSizeString(size_t SizeInBytes, char *Output, size_t size) +{ + int i = 0; + size_t LastSizeInBytes = SizeInBytes; + char *byteUnits[] = {"bytes", "kB", "MB", "GB", "TB", + "PB", "EB", "ZB", "YB"}; + + while (SizeInBytes > 1024) + { + LastSizeInBytes = SizeInBytes; + SizeInBytes = SizeInBytes / 1024; + i++; + } + + if ((SizeInBytes < 100) && (i != 0)) + { + snprintf(Output, size, "%.1f %s", ((double)LastSizeInBytes) / 1024, + byteUnits[i]); + } + else + { + snprintf(Output, size, "%ld %s", SizeInBytes, byteUnits[i]); + } +}; + +extern void DoStreamSummary(SstStream Stream) +{ + SstStats AllStats = NULL; + + if (Stream->Rank == 0) + AllStats = malloc(sizeof(struct _SstStats) * Stream->CohortSize); + + SMPI_Gather(&Stream->Stats, sizeof(struct _SstStats), SMPI_CHAR, AllStats, + sizeof(struct _SstStats), SMPI_CHAR, 0, Stream->mpiComm); + + if (Stream->Rank != 0) + { + return; + } + + for (int i = 1; i < Stream->CohortSize; i++) + { + AllStats[0].MetadataBytesReceived += AllStats[i].MetadataBytesReceived; + AllStats[0].DataBytesReceived += AllStats[i].DataBytesReceived; + AllStats[0].PreloadBytesReceived += AllStats[i].PreloadBytesReceived; + AllStats[0].RunningFanIn += AllStats[i].RunningFanIn; + } + AllStats[0].RunningFanIn /= Stream->CohortSize; + + CP_verbose(Stream, SummaryVerbose, "\nStream \"%s\" (%p) summary info:\n", + Stream->Filename, (void *)Stream); + CP_verbose(Stream, SummaryVerbose, "\tDuration (secs) = %g\n", + Stream->Stats.StreamValidTimeSecs); + if (Stream->Role == WriterRole) + { + CP_verbose(Stream, SummaryVerbose, "\tTimesteps Created = %zu\n", + Stream->Stats.TimestepsCreated); + CP_verbose(Stream, SummaryVerbose, "\tTimesteps Delivered = %zu\n", + Stream->Stats.TimestepsDelivered); + } + else if (Stream->Role == ReaderRole) + { + char OutputString[256]; + CP_verbose(Stream, SummaryVerbose, + "\tTimestep Metadata Received = %zu\n", + Stream->Stats.TimestepMetadataReceived); + CP_verbose(Stream, SummaryVerbose, "\tTimesteps Consumed = %zu\n", + Stream->Stats.TimestepsConsumed); + ReadableSizeString(AllStats[0].MetadataBytesReceived, OutputString, + sizeof(OutputString)); + CP_verbose(Stream, SummaryVerbose, + "\tMetadataBytesReceived = %zu (%s)\n", + AllStats[0].MetadataBytesReceived, OutputString); + ReadableSizeString(AllStats[0].DataBytesReceived, OutputString, + sizeof(OutputString)); + CP_verbose(Stream, SummaryVerbose, "\tDataBytesReceived = %zu (%s)\n", + AllStats[0].DataBytesReceived, OutputString); + ReadableSizeString(AllStats[0].PreloadBytesReceived, OutputString, + sizeof(OutputString)); + CP_verbose(Stream, SummaryVerbose, + "\tPreloadBytesReceived = %zu (%s)\n", + AllStats[0].PreloadBytesReceived, OutputString); + CP_verbose(Stream, SummaryVerbose, "\tPreloadTimestepsReceived = %zu\n", + Stream->Stats.PreloadTimestepsReceived); + CP_verbose(Stream, SummaryVerbose, "\tAverageReadRankFanIn = %.1f\n", + AllStats[0].RunningFanIn); + } + CP_verbose(Stream, SummaryVerbose, "\n"); + free(AllStats); +} + extern void SstStreamDestroy(SstStream Stream) { /* @@ -1120,6 +1211,8 @@ extern void SstStreamDestroy(SstStream Stream) Stream->ConnectionsToWriter = NULL; } free(Stream->Peers); + if (Stream->RanksRead) + free(Stream->RanksRead); } else if (Stream->ConfigParams->MarshalMethod == SstMarshalFFS) { @@ -1452,12 +1545,6 @@ extern void getPeerArrays(int MySize, int MyRank, int PeerSize, } } -extern void SstSetStatsSave(SstStream Stream, SstStats Stats) -{ - Stats->OpenTimeSecs = Stream->OpenTimeSecs; - Stream->Stats = Stats; -} - static void DP_verbose(SstStream s, int Level, char *Format, ...) { if (s->DPVerbosityLevel >= Level) diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index 525d5311a8..aa1721e1a6 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -116,6 +116,7 @@ typedef struct _CPTimestepEntry long Timestep; struct _SstData Data; struct _TimestepMetadataMsg *Msg; + int MetaDataSendCount; int ReferenceCount; int Expired; int PreciousTimestep; @@ -146,7 +147,8 @@ struct _SstStream int DPVerbosityLevel; double OpenTimeSecs; struct timeval ValidStartTime; - SstStats Stats; + struct _SstStats Stats; + char *RanksRead; /* MPI info */ int Rank; @@ -524,14 +526,19 @@ extern void FFSFreeMarshalData(SstStream Stream); extern void getPeerArrays(int MySize, int MyRank, int PeerSize, int **forwardArray, int **reverseArray); extern void AddToLastCallFreeList(void *Block); + enum VerbosityLevel { - TraceVerbose = 5, - PerRankVerbose = 4, - PerStepVerbose = 3, - SummaryVerbose = 2, - CriticalVerbose = 1, - NoVerbose = 0, + NoVerbose = 0, // Generally no output (but not absolutely quiet?) + CriticalVerbose = 1, // Informational output for failures only + SummaryVerbose = + 2, // One-time summary output containing general info (transports used, + // timestep count, stream duration, etc.) + PerStepVerbose = 3, // One-per-step info, generally from rank 0 (metadata + // read, Begin/EndStep verbosity, etc.) + PerRankVerbose = 4, // Per-step info from each rank (for those things that + // might be different per rank). + TraceVerbose = 5, // All debugging available }; extern void CP_verbose(SstStream Stream, enum VerbosityLevel Level, @@ -546,3 +553,4 @@ typedef void (*CPNetworkInfoFunc)(int dataID, const char *net_string, extern char *IPDiagString; extern CPNetworkInfoFunc globalNetinfoCallback; extern void SSTSetNetworkCallback(CPNetworkInfoFunc callback); +extern void DoStreamSummary(SstStream Stream); diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 753c440943..66c8975faf 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -478,7 +478,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) CP_validateParams(Stream, Params, 0 /* reader */); Stream->ConfigParams = Params; - Stream->DP_Interface = SelectDP(&Svcs, Stream, Stream->ConfigParams); + Stream->DP_Interface = + SelectDP(&Svcs, Stream, Stream->ConfigParams, Stream->Rank); Stream->CPInfo = CP_getCPInfo(Stream->DP_Interface, Stream->ConfigParams->ControlModule); @@ -500,7 +501,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) } Stream->DP_Stream = Stream->DP_Interface->initReader( - &Svcs, Stream, &dpInfo, Stream->ConfigParams, WriterContactAttributes); + &Svcs, Stream, &dpInfo, Stream->ConfigParams, WriterContactAttributes, + &Stream->Stats); free_attr_list(WriterContactAttributes); @@ -617,23 +619,27 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) Stream->WriterCohortSize = ReturnData->WriterCohortSize; Stream->WriterConfigParams = ReturnData->WriterConfigParams; - if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) + if ((Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) && + (Stream->Rank == 0)) { CP_verbose(Stream, SummaryVerbose, "Writer is doing FFS-based marshalling\n"); } - if (Stream->WriterConfigParams->MarshalMethod == SstMarshalBP) + if ((Stream->WriterConfigParams->MarshalMethod == SstMarshalBP) && + (Stream->Rank == 0)) { CP_verbose(Stream, SummaryVerbose, "Writer is doing BP-based marshalling\n"); } - if (Stream->WriterConfigParams->CPCommPattern == SstCPCommMin) + if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommMin) && + (Stream->Rank == 0)) { CP_verbose( Stream, SummaryVerbose, "Writer is using Minimum Connection Communication pattern (min)\n"); } - if (Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer) + if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer) && + (Stream->Rank == 0)) { CP_verbose(Stream, SummaryVerbose, "Writer is using Peer-based Communication pattern (peer)\n"); @@ -817,6 +823,12 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, { Stream->Timesteps = New; } + Stream->Stats.TimestepMetadataReceived++; + if (tsm->Metadata) + { + Stream->Stats.MetadataBytesReceived += + (tsm->Metadata->DataSize + tsm->AttributeData->DataSize); + } CP_verbose(Stream, PerRankVerbose, "Received a Timestep metadata message for timestep %d, " "signaling condition\n", @@ -1345,14 +1357,52 @@ extern SstFullMetadata SstGetCurMetadata(SstStream Stream) return Stream->CurrentMetadata; } +static void AddToReadStats(SstStream Stream, int Rank, long Timestep, + size_t Length) +{ + if (!Stream->RanksRead) + Stream->RanksRead = calloc(1, Stream->WriterCohortSize); + Stream->RanksRead[Rank] = 1; + Stream->Stats.BytesRead += Length; +} + +#ifndef min +#define min(a, b) (((a) < (b)) ? (a) : (b)) +#endif + +static void ReleaseTSReadStats(SstStream Stream, long Timestep) +{ + int ThisFanIn = 0; + if (Stream->RanksRead) + { + for (int i = 0; i < Stream->WriterCohortSize; i++) + { + if (Stream->RanksRead[i]) + ThisFanIn++; + } + memset(Stream->RanksRead, 0, Stream->WriterCohortSize); + } + if (Stream->Stats.TimestepsConsumed == 1) + { + Stream->Stats.RunningFanIn = ThisFanIn; + } + else + { + Stream->Stats.RunningFanIn = + Stream->Stats.RunningFanIn + + ((double)ThisFanIn - Stream->Stats.RunningFanIn) / + min(Stream->Stats.TimestepsConsumed, 100); + } +} + // SstReadRemotememory is only called by the main // program thread. extern void *SstReadRemoteMemory(SstStream Stream, int Rank, long Timestep, size_t Offset, size_t Length, void *Buffer, void *DP_TimestepInfo) { - if (Stream->Stats) - Stream->Stats->BytesTransferred += Length; + Stream->Stats.BytesTransferred += Length; + AddToReadStats(Stream, Rank, Timestep, Length); return Stream->DP_Interface->readRemoteMemory( &Svcs, Stream->DP_Stream, Rank, Timestep, Offset, Length, Buffer, DP_TimestepInfo); @@ -1428,6 +1478,7 @@ extern void SstReleaseStep(SstStream Stream) (Stream->DP_Interface->RSReleaseTimestep)(&Svcs, Stream->DP_Stream, Timestep); } + ReleaseTSReadStats(Stream, Timestep); STREAM_MUTEX_UNLOCK(Stream); if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer) || @@ -2000,6 +2051,10 @@ extern SstStatusValue SstAdvanceStep(SstStream Stream, const float timeout_sec) { result = SstAdvanceStepMin(Stream, mode, timeout_sec); } + if (result == SstSuccess) + { + Stream->Stats.TimestepsConsumed++; + } STREAM_MUTEX_UNLOCK(Stream); return result; } @@ -2020,9 +2075,13 @@ extern void SstReaderClose(SstStream Stream) memset(&Msg, 0, sizeof(Msg)); sendOneToEachWriterRank(Stream, Stream->CPInfo->SharedCM->ReaderCloseFormat, &Msg, &Msg.WSR_Stream); - if (Stream->Stats) - Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + Stream->Stats.StreamValidTimeSecs = + (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + if (Stream->CPVerbosityLevel >= (int)SummaryVerbose) + { + DoStreamSummary(Stream); + } CMusleep(Stream->CPInfo->SharedCM->cm, 100000); if (Stream->CurrentMetadata != NULL) { diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 095ec4ba18..5d0c5caafb 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -279,6 +279,10 @@ static void RemoveQueueEntries(SstStream Stream) } Stream->QueuedTimestepCount--; + if (ItemToFree->MetaDataSendCount) + { + Stream->Stats.TimestepsDelivered++; + } CP_verbose(Stream, PerRankVerbose, "Remove queue Entries removing Timestep %ld (exp %d, " "Prec %d, Ref %d), Count now %d\n", @@ -1135,7 +1139,7 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, Entry->Timestep, rank); } Entry->ReferenceCount++; - + Entry->MetaDataSendCount++; CP_verbose(Stream, PerRankVerbose, "ADDING timestep %ld to sent list for reader cohort %d, " "READER %p, reference count is now %d\n", @@ -1289,7 +1293,8 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm) // printf("WRITER main program thread PID is %lx, TID %lx in writer // open\n", // (long)getpid(), (long)gettid()); - Stream->DP_Interface = SelectDP(&Svcs, Stream, Stream->ConfigParams); + Stream->DP_Interface = + SelectDP(&Svcs, Stream, Stream->ConfigParams, Stream->Rank); if (!Stream->DP_Interface) { @@ -1314,17 +1319,16 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm) attr_list DPAttrs = create_attr_list(); Stream->DP_Stream = Stream->DP_Interface->initWriter( - &Svcs, Stream, Stream->ConfigParams, DPAttrs); + &Svcs, Stream, Stream->ConfigParams, DPAttrs, &Stream->Stats); if (Stream->Rank == 0) { registerContactInfo(Filename, Stream, DPAttrs); } - CP_verbose(Stream, SummaryVerbose, "Opening Stream \"%s\"\n", Filename); - if (Stream->Rank == 0) { + CP_verbose(Stream, SummaryVerbose, "Opening Stream \"%s\"\n", Filename); CP_verbose(Stream, SummaryVerbose, "Writer stream params are:\n"); CP_dumpParams(Stream, Stream->ConfigParams, 0 /* writer side */); } @@ -1353,8 +1357,6 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm) } SMPI_Barrier(Stream->mpiComm); - struct timeval Start; - gettimeofday(&Start, NULL); reader = WriterParticipateInReaderOpen(Stream); if (!reader) { @@ -1382,6 +1384,7 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm) } Stream->RendezvousReaderCount--; } + gettimeofday(&Stream->ValidStartTime, NULL); Stream->Filename = Filename; Stream->Status = Established; CP_verbose(Stream, PerStepVerbose, "Finish opening Stream \"%s\"\n", @@ -1640,9 +1643,13 @@ void SstWriterClose(SstStream Stream) STREAM_MUTEX_UNLOCK(Stream); gettimeofday(&CloseTime, NULL); timersub(&CloseTime, &Stream->ValidStartTime, &Diff); - if (Stream->Stats) - Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + Stream->Stats.StreamValidTimeSecs = + (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + if (Stream->CPVerbosityLevel >= (int)SummaryVerbose) + { + DoStreamSummary(Stream); + } CP_verbose(Stream, PerStepVerbose, "All timesteps are released in WriterClose\n"); @@ -2148,6 +2155,7 @@ extern void SstInternalProvideTimestep( Entry->Next = Stream->QueuedTimesteps; Stream->QueuedTimesteps = Entry; Stream->QueuedTimestepCount++; + Stream->Stats.TimestepsCreated++; /* no one waits on timesteps being added, so no condition signal to note * change */ @@ -2307,8 +2315,12 @@ extern void SstInternalProvideTimestep( while (PendingReaderCount--) { WS_ReaderInfo reader; - CP_verbose(Stream, SummaryVerbose, - "Writer side ReaderLateArrival accepting incoming reader\n"); + if (Stream->Rank == 0) + { + CP_verbose( + Stream, SummaryVerbose, + "Writer side ReaderLateArrival accepting incoming reader\n"); + } reader = WriterParticipateInReaderOpen(Stream); if (!reader) { diff --git a/source/adios2/toolkit/sst/dp/dp.c b/source/adios2/toolkit/sst/dp/dp.c index 3bfefa234e..4314ce9e73 100644 --- a/source/adios2/toolkit/sst/dp/dp.c +++ b/source/adios2/toolkit/sst/dp/dp.c @@ -52,7 +52,7 @@ static DPlist AddDPPossibility(CP_Services Svcs, void *CP_Stream, DPlist List, } CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, - struct _SstParams *Params) + struct _SstParams *Params, int Rank) { CP_DP_Interface Ret; DPlist List = NULL; @@ -93,6 +93,7 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, if (List[i].Priority >= 0) { SelectedDP = i; + printf("Setting selected DP\n"); break; } else @@ -116,17 +117,20 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, fprintf(stderr, "Warning: Preferred DataPlane \"%s\" not found.", Params->DataTransport); } + printf("SelectedDP is %d\n", SelectedDP); if (SelectedDP != -1) { - Svcs->verbose(CP_Stream, DPSummaryVerbose, - "Selecting DataPlane \"%s\" (preferred) for use\n", - List[SelectedDP].Name); + if (Rank == 0) + Svcs->verbose(CP_Stream, DPSummaryVerbose, + "Selecting DataPlane \"%s\" (preferred) for use\n", + List[SelectedDP].Name); } else { - Svcs->verbose(CP_Stream, DPSummaryVerbose, - "Selecting DataPlane \"%s\", priority %d for use\n", - List[BestPrioDP].Name, List[BestPrioDP].Priority); + if (Rank == 0) + Svcs->verbose(CP_Stream, DPSummaryVerbose, + "Selecting DataPlane \"%s\", priority %d for use\n", + List[BestPrioDP].Name, List[BestPrioDP].Priority); SelectedDP = BestPrioDP; } i = 0; diff --git a/source/adios2/toolkit/sst/dp/evpath_dp.c b/source/adios2/toolkit/sst/dp/evpath_dp.c index 38a34dafd9..8042a646f3 100644 --- a/source/adios2/toolkit/sst/dp/evpath_dp.c +++ b/source/adios2/toolkit/sst/dp/evpath_dp.c @@ -81,6 +81,8 @@ typedef struct _Evpath_RS_Stream long PreloadActiveTimestep; long TotalReadRequests; long ReadRequestsFromPreload; + SstStats Stats; + long LastPreloadTimestep; } * Evpath_RS_Stream; typedef struct _Evpath_WSR_Stream @@ -135,6 +137,7 @@ typedef struct _Evpath_WS_Stream int ReaderCount; Evpath_WSR_Stream *Readers; + SstStats Stats; } * Evpath_WS_Stream; typedef struct _EvpathReaderContactInfo @@ -253,7 +256,7 @@ static void SendSpeculativePreloadMsgs(CP_Services Svcs, static DP_RS_Stream EvpathInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, - attr_list WriterContact) + attr_list WriterContact, SstStats Stats) { Evpath_RS_Stream Stream = malloc(sizeof(struct _Evpath_RS_Stream)); EvpathReaderContactInfo Contact = @@ -272,6 +275,8 @@ static DP_RS_Stream EvpathInitReader(CP_Services Svcs, void *CP_Stream, * save the CP_stream value of later use */ Stream->CP_Stream = CP_Stream; + Stream->Stats = Stats; + Stream->LastPreloadTimestep = -1; pthread_mutex_init(&Stream->DataLock, NULL); @@ -520,6 +525,8 @@ static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, */ memcpy(Handle->Buffer, ReadReplyMsg->Data, ReadReplyMsg->DataLength); + RS_Stream->Stats->DataBytesReceived += ReadReplyMsg->DataLength; + /* * Signal the condition to wake the reader if they are waiting. */ @@ -651,7 +658,15 @@ static void EvpathPreloadHandler(CManager cm, CMConnection conn, void *msg_v, Entry->Data = PreloadMsg->Data; Entry->DataSize = PreloadMsg->DataLength; Entry->DataStart = 0; - + RS_Stream->Stats->DataBytesReceived += PreloadMsg->DataLength; + RS_Stream->Stats->PreloadBytesReceived += PreloadMsg->DataLength; + if (PreloadMsg->Timestep > RS_Stream->LastPreloadTimestep) + { + // only incremenet PreloadTimesteps once, even if we get multiple + // preloads on a timestep (from different ranks) + RS_Stream->LastPreloadTimestep = PreloadMsg->Timestep; + RS_Stream->Stats->PreloadTimestepsReceived++; + } pthread_mutex_lock(&RS_Stream->DataLock); Entry->Next = RS_Stream->QueuedTimesteps; RS_Stream->QueuedTimesteps = Entry; @@ -677,7 +692,7 @@ static void EvpathPreloadHandler(CManager cm, CMConnection conn, void *msg_v, // writer-side routine, called from the main program static DP_WS_Stream EvpathInitWriter(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, - attr_list DPAttrs) + attr_list DPAttrs, SstStats Stats) { Evpath_WS_Stream Stream = malloc(sizeof(struct _Evpath_WS_Stream)); CManager cm = Svcs->getCManager(CP_Stream); @@ -694,6 +709,7 @@ static DP_WS_Stream EvpathInitWriter(CP_Services Svcs, void *CP_Stream, * save the CP_stream value of later use */ Stream->CP_Stream = CP_Stream; + Stream->Stats = Stats; /* * add a handler for read request messages diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 7cc78e061d..debcdf4e28 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -345,7 +345,7 @@ typedef struct _RdmaWriterContactInfo static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, - attr_list WriterContact) + attr_list WriterContact, SstStats Stats) { Rdma_RS_Stream Stream = malloc(sizeof(struct _Rdma_RS_Stream)); CManager cm = Svcs->getCManager(CP_Stream); @@ -394,7 +394,8 @@ typedef struct _RdmaCompletionHandle } * RdmaCompletionHandle; static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, - struct _SstParams *Params, attr_list DPAttrs) + struct _SstParams *Params, attr_list DPAttrs, + SstStats Stats) { Rdma_WS_Stream Stream = malloc(sizeof(struct _Rdma_WS_Stream)); CManager cm = Svcs->getCManager(CP_Stream); diff --git a/source/adios2/toolkit/sst/dp_interface.h b/source/adios2/toolkit/sst/dp_interface.h index 39cf7bbdc3..f8906f4229 100644 --- a/source/adios2/toolkit/sst/dp_interface.h +++ b/source/adios2/toolkit/sst/dp_interface.h @@ -77,7 +77,8 @@ typedef void *CP_PeerCohort; typedef DP_RS_Stream (*CP_DP_InitReaderFunc)(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, - attr_list WriterContactAttributes); + attr_list WriterContactAttributes, + SstStats Stats); /*! * CP_DP_DestroyReaderFunc is the type of a dataplane reader-side @@ -100,7 +101,7 @@ typedef void (*CP_DP_DestroyReaderFunc)(CP_Services Svcs, DP_RS_Stream Reader); */ typedef DP_WS_Stream (*CP_DP_InitWriterFunc)(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, - attr_list DPAttrs); + attr_list DPAttrs, SstStats Stats); /*! * CP_DP_DestroyWriterFunc is the type of a dataplane writer-side @@ -403,6 +404,6 @@ struct _CP_Services }; CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, - struct _SstParams *Params); + struct _SstParams *Params, int Rank); #endif diff --git a/source/adios2/toolkit/sst/sst.h b/source/adios2/toolkit/sst/sst.h index 8ae6f77765..188076e2a1 100644 --- a/source/adios2/toolkit/sst/sst.h +++ b/source/adios2/toolkit/sst/sst.h @@ -47,17 +47,6 @@ typedef enum SstLatestAvailable // reader advance mode } SstStepMode; -/* - * Struct that represents statistics tracked by SST - */ -typedef struct _SstStats -{ - double OpenTimeSecs; - double CloseTimeSecs; - double ValidTimeSecs; - size_t BytesTransferred; -} * SstStats; - typedef struct _SstParams *SstParams; typedef enum @@ -181,11 +170,6 @@ extern int SstFFSWriterBeginStep(SstStream Stream, int mode, const float timeout_sec); extern void SstFFSWriterEndStep(SstStream Stream, size_t Step); -/* - * General Operations - */ -extern void SstSetStatsSave(SstStream Stream, SstStats Save); - #include "sst_data.h" #define SST_POSTFIX ".sst" diff --git a/source/adios2/toolkit/sst/sst_data.h b/source/adios2/toolkit/sst/sst_data.h index a9a00c49df..2203ef1b51 100644 --- a/source/adios2/toolkit/sst/sst_data.h +++ b/source/adios2/toolkit/sst/sst_data.h @@ -25,6 +25,26 @@ struct _SstBlock char *BlockData; }; +/* + * Struct that represents statistics tracked by SST + */ +typedef struct _SstStats +{ + double StreamValidTimeSecs; + size_t BytesTransferred; + size_t TimestepsCreated; + size_t TimestepsDelivered; + + size_t TimestepMetadataReceived; + size_t TimestepsConsumed; + size_t MetadataBytesReceived; + size_t DataBytesReceived; + size_t PreloadBytesReceived; + size_t PreloadTimestepsReceived; + size_t BytesRead; + double RunningFanIn; +} * SstStats; + #define SST_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \ MACRO(MarshalMethod, MarshalMethod, size_t, SstMarshalBP) \ MACRO(verbose, Int, int, 0) \ diff --git a/testing/adios2/engine/staging-common/TestCommonRead.cpp b/testing/adios2/engine/staging-common/TestCommonRead.cpp index 9524b5e7aa..ee8656ab19 100644 --- a/testing/adios2/engine/staging-common/TestCommonRead.cpp +++ b/testing/adios2/engine/staging-common/TestCommonRead.cpp @@ -332,7 +332,8 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8) engine.Get(var_r32, in_R32.data()); engine.Get(var_r64, in_R64.data()); - engine.Get(var_time, (int64_t *)&write_time); + if (!mpiRank) + engine.Get(var_time, (int64_t *)&write_time); } else { @@ -394,7 +395,8 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8) } } } - write_times.push_back(write_time); + if (!mpiRank) + write_times.push_back(write_time); } else { @@ -420,7 +422,7 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8) } EXPECT_EQ(t, NSteps); - if (!NoData) + if (!NoData && !mpiRank) { if ((write_times.back() - write_times.front()) > 1) {