Skip to content

Commit

Permalink
Merge pull request #2432 from eisenhauer/SstSummary
Browse files Browse the repository at this point in the history
Add summary verbose output to SST
  • Loading branch information
eisenhauer authored Aug 17, 2020
2 parents 9e32dcf + 909caf2 commit c81de7c
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 68 deletions.
99 changes: 93 additions & 6 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,97 @@ extern void AddToLastCallFreeList(void *Block)
SharedCMInfo->LastCallFreeCount++;
}

static void ReadableSizeString(size_t SizeInBytes, char *Output, size_t size)
{
int i = 0;
size_t LastSizeInBytes = SizeInBytes;
char *byteUnits[] = {"bytes", "kB", "MB", "GB", "TB",
"PB", "EB", "ZB", "YB"};

while (SizeInBytes > 1024)
{
LastSizeInBytes = SizeInBytes;
SizeInBytes = SizeInBytes / 1024;
i++;
}

if ((SizeInBytes < 100) && (i != 0))
{
snprintf(Output, size, "%.1f %s", ((double)LastSizeInBytes) / 1024,
byteUnits[i]);
}
else
{
snprintf(Output, size, "%ld %s", SizeInBytes, byteUnits[i]);
}
};

extern void DoStreamSummary(SstStream Stream)
{
SstStats AllStats = NULL;

if (Stream->Rank == 0)
AllStats = malloc(sizeof(struct _SstStats) * Stream->CohortSize);

SMPI_Gather(&Stream->Stats, sizeof(struct _SstStats), SMPI_CHAR, AllStats,
sizeof(struct _SstStats), SMPI_CHAR, 0, Stream->mpiComm);

if (Stream->Rank != 0)
{
return;
}

for (int i = 1; i < Stream->CohortSize; i++)
{
AllStats[0].MetadataBytesReceived += AllStats[i].MetadataBytesReceived;
AllStats[0].DataBytesReceived += AllStats[i].DataBytesReceived;
AllStats[0].PreloadBytesReceived += AllStats[i].PreloadBytesReceived;
AllStats[0].RunningFanIn += AllStats[i].RunningFanIn;
}
AllStats[0].RunningFanIn /= Stream->CohortSize;

CP_verbose(Stream, SummaryVerbose, "\nStream \"%s\" (%p) summary info:\n",
Stream->Filename, (void *)Stream);
CP_verbose(Stream, SummaryVerbose, "\tDuration (secs) = %g\n",
Stream->Stats.StreamValidTimeSecs);
if (Stream->Role == WriterRole)
{
CP_verbose(Stream, SummaryVerbose, "\tTimesteps Created = %zu\n",
Stream->Stats.TimestepsCreated);
CP_verbose(Stream, SummaryVerbose, "\tTimesteps Delivered = %zu\n",
Stream->Stats.TimestepsDelivered);
}
else if (Stream->Role == ReaderRole)
{
char OutputString[256];
CP_verbose(Stream, SummaryVerbose,
"\tTimestep Metadata Received = %zu\n",
Stream->Stats.TimestepMetadataReceived);
CP_verbose(Stream, SummaryVerbose, "\tTimesteps Consumed = %zu\n",
Stream->Stats.TimestepsConsumed);
ReadableSizeString(AllStats[0].MetadataBytesReceived, OutputString,
sizeof(OutputString));
CP_verbose(Stream, SummaryVerbose,
"\tMetadataBytesReceived = %zu (%s)\n",
AllStats[0].MetadataBytesReceived, OutputString);
ReadableSizeString(AllStats[0].DataBytesReceived, OutputString,
sizeof(OutputString));
CP_verbose(Stream, SummaryVerbose, "\tDataBytesReceived = %zu (%s)\n",
AllStats[0].DataBytesReceived, OutputString);
ReadableSizeString(AllStats[0].PreloadBytesReceived, OutputString,
sizeof(OutputString));
CP_verbose(Stream, SummaryVerbose,
"\tPreloadBytesReceived = %zu (%s)\n",
AllStats[0].PreloadBytesReceived, OutputString);
CP_verbose(Stream, SummaryVerbose, "\tPreloadTimestepsReceived = %zu\n",
Stream->Stats.PreloadTimestepsReceived);
CP_verbose(Stream, SummaryVerbose, "\tAverageReadRankFanIn = %.1f\n",
AllStats[0].RunningFanIn);
}
CP_verbose(Stream, SummaryVerbose, "\n");
free(AllStats);
}

extern void SstStreamDestroy(SstStream Stream)
{
/*
Expand Down Expand Up @@ -1120,6 +1211,8 @@ extern void SstStreamDestroy(SstStream Stream)
Stream->ConnectionsToWriter = NULL;
}
free(Stream->Peers);
if (Stream->RanksRead)
free(Stream->RanksRead);
}
else if (Stream->ConfigParams->MarshalMethod == SstMarshalFFS)
{
Expand Down Expand Up @@ -1452,12 +1545,6 @@ extern void getPeerArrays(int MySize, int MyRank, int PeerSize,
}
}

extern void SstSetStatsSave(SstStream Stream, SstStats Stats)
{
Stats->OpenTimeSecs = Stream->OpenTimeSecs;
Stream->Stats = Stats;
}

static void DP_verbose(SstStream s, int Level, char *Format, ...)
{
if (s->DPVerbosityLevel >= Level)
Expand Down
22 changes: 15 additions & 7 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ typedef struct _CPTimestepEntry
long Timestep;
struct _SstData Data;
struct _TimestepMetadataMsg *Msg;
int MetaDataSendCount;
int ReferenceCount;
int Expired;
int PreciousTimestep;
Expand Down Expand Up @@ -146,7 +147,8 @@ struct _SstStream
int DPVerbosityLevel;
double OpenTimeSecs;
struct timeval ValidStartTime;
SstStats Stats;
struct _SstStats Stats;
char *RanksRead;

/* MPI info */
int Rank;
Expand Down Expand Up @@ -524,14 +526,19 @@ extern void FFSFreeMarshalData(SstStream Stream);
extern void getPeerArrays(int MySize, int MyRank, int PeerSize,
int **forwardArray, int **reverseArray);
extern void AddToLastCallFreeList(void *Block);

enum VerbosityLevel
{
TraceVerbose = 5,
PerRankVerbose = 4,
PerStepVerbose = 3,
SummaryVerbose = 2,
CriticalVerbose = 1,
NoVerbose = 0,
NoVerbose = 0, // Generally no output (but not absolutely quiet?)
CriticalVerbose = 1, // Informational output for failures only
SummaryVerbose =
2, // One-time summary output containing general info (transports used,
// timestep count, stream duration, etc.)
PerStepVerbose = 3, // One-per-step info, generally from rank 0 (metadata
// read, Begin/EndStep verbosity, etc.)
PerRankVerbose = 4, // Per-step info from each rank (for those things that
// might be different per rank).
TraceVerbose = 5, // All debugging available
};

extern void CP_verbose(SstStream Stream, enum VerbosityLevel Level,
Expand All @@ -546,3 +553,4 @@ typedef void (*CPNetworkInfoFunc)(int dataID, const char *net_string,
extern char *IPDiagString;
extern CPNetworkInfoFunc globalNetinfoCallback;
extern void SSTSetNetworkCallback(CPNetworkInfoFunc callback);
extern void DoStreamSummary(SstStream Stream);
79 changes: 69 additions & 10 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
CP_validateParams(Stream, Params, 0 /* reader */);
Stream->ConfigParams = Params;

Stream->DP_Interface = SelectDP(&Svcs, Stream, Stream->ConfigParams);
Stream->DP_Interface =
SelectDP(&Svcs, Stream, Stream->ConfigParams, Stream->Rank);

Stream->CPInfo =
CP_getCPInfo(Stream->DP_Interface, Stream->ConfigParams->ControlModule);
Expand All @@ -500,7 +501,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
}

Stream->DP_Stream = Stream->DP_Interface->initReader(
&Svcs, Stream, &dpInfo, Stream->ConfigParams, WriterContactAttributes);
&Svcs, Stream, &dpInfo, Stream->ConfigParams, WriterContactAttributes,
&Stream->Stats);

free_attr_list(WriterContactAttributes);

Expand Down Expand Up @@ -617,23 +619,27 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)

Stream->WriterCohortSize = ReturnData->WriterCohortSize;
Stream->WriterConfigParams = ReturnData->WriterConfigParams;
if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS)
if ((Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) &&
(Stream->Rank == 0))
{
CP_verbose(Stream, SummaryVerbose,
"Writer is doing FFS-based marshalling\n");
}
if (Stream->WriterConfigParams->MarshalMethod == SstMarshalBP)
if ((Stream->WriterConfigParams->MarshalMethod == SstMarshalBP) &&
(Stream->Rank == 0))
{
CP_verbose(Stream, SummaryVerbose,
"Writer is doing BP-based marshalling\n");
}
if (Stream->WriterConfigParams->CPCommPattern == SstCPCommMin)
if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommMin) &&
(Stream->Rank == 0))
{
CP_verbose(
Stream, SummaryVerbose,
"Writer is using Minimum Connection Communication pattern (min)\n");
}
if (Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer)
if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer) &&
(Stream->Rank == 0))
{
CP_verbose(Stream, SummaryVerbose,
"Writer is using Peer-based Communication pattern (peer)\n");
Expand Down Expand Up @@ -817,6 +823,12 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream,
{
Stream->Timesteps = New;
}
Stream->Stats.TimestepMetadataReceived++;
if (tsm->Metadata)
{
Stream->Stats.MetadataBytesReceived +=
(tsm->Metadata->DataSize + tsm->AttributeData->DataSize);
}
CP_verbose(Stream, PerRankVerbose,
"Received a Timestep metadata message for timestep %d, "
"signaling condition\n",
Expand Down Expand Up @@ -1345,14 +1357,52 @@ extern SstFullMetadata SstGetCurMetadata(SstStream Stream)
return Stream->CurrentMetadata;
}

static void AddToReadStats(SstStream Stream, int Rank, long Timestep,
size_t Length)
{
if (!Stream->RanksRead)
Stream->RanksRead = calloc(1, Stream->WriterCohortSize);
Stream->RanksRead[Rank] = 1;
Stream->Stats.BytesRead += Length;
}

#ifndef min
#define min(a, b) (((a) < (b)) ? (a) : (b))
#endif

static void ReleaseTSReadStats(SstStream Stream, long Timestep)
{
int ThisFanIn = 0;
if (Stream->RanksRead)
{
for (int i = 0; i < Stream->WriterCohortSize; i++)
{
if (Stream->RanksRead[i])
ThisFanIn++;
}
memset(Stream->RanksRead, 0, Stream->WriterCohortSize);
}
if (Stream->Stats.TimestepsConsumed == 1)
{
Stream->Stats.RunningFanIn = ThisFanIn;
}
else
{
Stream->Stats.RunningFanIn =
Stream->Stats.RunningFanIn +
((double)ThisFanIn - Stream->Stats.RunningFanIn) /
min(Stream->Stats.TimestepsConsumed, 100);
}
}

// SstReadRemotememory is only called by the main
// program thread.
extern void *SstReadRemoteMemory(SstStream Stream, int Rank, long Timestep,
size_t Offset, size_t Length, void *Buffer,
void *DP_TimestepInfo)
{
if (Stream->Stats)
Stream->Stats->BytesTransferred += Length;
Stream->Stats.BytesTransferred += Length;
AddToReadStats(Stream, Rank, Timestep, Length);
return Stream->DP_Interface->readRemoteMemory(
&Svcs, Stream->DP_Stream, Rank, Timestep, Offset, Length, Buffer,
DP_TimestepInfo);
Expand Down Expand Up @@ -1428,6 +1478,7 @@ extern void SstReleaseStep(SstStream Stream)
(Stream->DP_Interface->RSReleaseTimestep)(&Svcs, Stream->DP_Stream,
Timestep);
}
ReleaseTSReadStats(Stream, Timestep);
STREAM_MUTEX_UNLOCK(Stream);

if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer) ||
Expand Down Expand Up @@ -2000,6 +2051,10 @@ extern SstStatusValue SstAdvanceStep(SstStream Stream, const float timeout_sec)
{
result = SstAdvanceStepMin(Stream, mode, timeout_sec);
}
if (result == SstSuccess)
{
Stream->Stats.TimestepsConsumed++;
}
STREAM_MUTEX_UNLOCK(Stream);
return result;
}
Expand All @@ -2020,9 +2075,13 @@ extern void SstReaderClose(SstStream Stream)
memset(&Msg, 0, sizeof(Msg));
sendOneToEachWriterRank(Stream, Stream->CPInfo->SharedCM->ReaderCloseFormat,
&Msg, &Msg.WSR_Stream);
if (Stream->Stats)
Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec;
Stream->Stats.StreamValidTimeSecs =
(double)Diff.tv_usec / 1e6 + Diff.tv_sec;

if (Stream->CPVerbosityLevel >= (int)SummaryVerbose)
{
DoStreamSummary(Stream);
}
CMusleep(Stream->CPInfo->SharedCM->cm, 100000);
if (Stream->CurrentMetadata != NULL)
{
Expand Down
Loading

0 comments on commit c81de7c

Please sign in to comment.