Skip to content

Commit

Permalink
Tweaks and test for SST in threads
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Jun 16, 2020
1 parent bf02720 commit e545e34
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 236 deletions.
343 changes: 185 additions & 158 deletions source/adios2/toolkit/sst/cp/cp_common.c

Large diffs are not rendered by default.

41 changes: 26 additions & 15 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,18 @@

#define SSTMAGICV0 "#ADIOS2-SST v0\n"

typedef struct _CP_GlobalInfo
typedef struct StructList
{
int CustomStructCount;
FMStructDescList *CustomStructList;
} CP_StructList;

typedef struct _CP_GlobalCMInfo
{
/* exchange info */
CManager cm;
FFSContext ffs_c;
FMContext fm_c;
FFSTypeHandle PerRankReaderInfoFormat;
FFSTypeHandle CombinedReaderInfoFormat;
CMFormat ReaderRegisterFormat;
FFSTypeHandle PerRankWriterInfoFormat;
FFSTypeHandle CombinedWriterInfoFormat;
CMFormat WriterResponseFormat;
FFSTypeHandle PerRankMetadataFormat;
FFSTypeHandle TimestepDistributionFormat;
FFSTypeHandle ReturnMetadataInfoFormat;
CMFormat DeliverTimestepMetadataFormat;
CMFormat PeerSetupFormat;
CMFormat ReaderActivateFormat;
Expand All @@ -26,11 +23,25 @@ typedef struct _CP_GlobalInfo
CMFormat CommPatternLockedFormat;
CMFormat WriterCloseFormat;
CMFormat ReaderCloseFormat;
int CustomStructCount;
FMStructDescList *CustomStructList;
int LastCallFreeCount;
void **LastCallFreeList;
} * CP_GlobalInfo;
struct StructList CustomStructs;
} * CP_GlobalCMInfo;

typedef struct _CP_Info
{
CP_GlobalCMInfo SharedCM;
FFSContext ffs_c;
FMContext fm_c;
FFSTypeHandle PerRankReaderInfoFormat;
FFSTypeHandle CombinedReaderInfoFormat;
FFSTypeHandle PerRankWriterInfoFormat;
FFSTypeHandle CombinedWriterInfoFormat;
FFSTypeHandle PerRankMetadataFormat;
FFSTypeHandle TimestepDistributionFormat;
FFSTypeHandle ReturnMetadataInfoFormat;
struct StructList CustomStructs;
} * CP_Info;

struct _ReaderRegisterMsg;

Expand Down Expand Up @@ -121,7 +132,7 @@ typedef struct FFSFormatBlock *FFSFormatList;

struct _SstStream
{
CP_GlobalInfo CPInfo;
CP_Info CPInfo;

SMPI_Comm mpiComm;
enum StreamRole Role;
Expand Down Expand Up @@ -462,7 +473,7 @@ typedef struct _MetadataPlusDPInfo *MetadataPlusDPInfo;
extern atom_t CM_TRANSPORT_ATOM;

void CP_validateParams(SstStream stream, SstParams Params, int Writer);
extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule);
extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule);
extern char *CP_GetContactString(SstStream s, attr_list DPAttrs);
extern SstStream CP_newStream();
extern void SstInternalProvideTimestep(
Expand Down
53 changes: 29 additions & 24 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ attr_list ContactWriter(SstStream Stream, char *Filename, SstParams Params,
(globalNetinfoCallback)(2, CMContactString, NULL);
}
WriterRank0Contact = attr_list_from_string(CMContactString);
conn = CMget_conn(Stream->CPInfo->cm, WriterRank0Contact);
conn = CMget_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact);
free_attr_list(WriterRank0Contact);
}
if (conn)
Expand Down Expand Up @@ -510,7 +510,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
memset(&ReaderRegister, 0, sizeof(ReaderRegister));
ReaderRegister.WriterFile = WriterFileID;
ReaderRegister.WriterResponseCondition =
CMCondition_get(Stream->CPInfo->cm, rank0_to_rank0_conn);
CMCondition_get(Stream->CPInfo->SharedCM->cm, rank0_to_rank0_conn);
ReaderRegister.ReaderCohortSize = Stream->CohortSize;
switch (Stream->ConfigParams->SpeculativePreloadMode)
{
Expand Down Expand Up @@ -543,11 +543,12 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)

/* the response value is set in the handler */
struct _WriterResponseMsg *response = NULL;
CMCondition_set_client_data(Stream->CPInfo->cm,
CMCondition_set_client_data(Stream->CPInfo->SharedCM->cm,
ReaderRegister.WriterResponseCondition,
&response);

if (CMwrite(rank0_to_rank0_conn, Stream->CPInfo->ReaderRegisterFormat,
if (CMwrite(rank0_to_rank0_conn,
Stream->CPInfo->SharedCM->ReaderRegisterFormat,
&ReaderRegister) != 1)
{
CP_verbose(Stream,
Expand All @@ -561,7 +562,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
Stream,
"Waiting for writer response message in SstReadOpen(\"%s\")\n",
Filename, ReaderRegister.WriterResponseCondition);
CMCondition_wait(Stream->CPInfo->cm,
CMCondition_wait(Stream->CPInfo->SharedCM->cm,
ReaderRegister.WriterResponseCondition);
CP_verbose(Stream,
"finished wait writer response message in read_open\n");
Expand Down Expand Up @@ -701,8 +702,9 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
Stream->ConnectionsToWriter, ReturnData->DP_WriterInfo);
CP_verbose(Stream, "Sending Reader Activate messages to writer\n");
memset(&Msg, 0, sizeof(Msg));
sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderActivateFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->SharedCM->ReaderActivateFormat,
&Msg, &Msg.WSR_Stream);
CP_verbose(Stream,
"Finish opening Stream \"%s\", starting with Step number %d\n",
Filename, ReturnData->StartingStepNumber);
Expand Down Expand Up @@ -780,9 +782,9 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream,
"Sending ReleaseTimestep message for PRIOR DISCARD "
"timestep %d, one to each writer\n",
tsm->Timestep);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
}
else
{
Expand Down Expand Up @@ -1076,7 +1078,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs)
}

TimeoutTask =
CMadd_delayed_task(Stream->CPInfo->cm, timeout_int_sec,
CMadd_delayed_task(Stream->CPInfo->SharedCM->cm, timeout_int_sec,
timeout_int_usec, triggerDataCondition, Stream);
while (1)
{
Expand Down Expand Up @@ -1155,13 +1157,13 @@ static void releasePriorTimesteps(SstStream Stream, long Latest)
Last->Next = Next;
}
STREAM_MUTEX_UNLOCK(Stream);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
if (This->MetadataMsg == NULL)
printf("READER RETURN_BUFFER, metadatamsg == %p, line %d\n",
This->MetadataMsg, __LINE__);
CMreturn_buffer(Stream->CPInfo->cm, This->MetadataMsg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm, This->MetadataMsg);
STREAM_MUTEX_LOCK(Stream);
free(This);
}
Expand All @@ -1187,7 +1189,7 @@ static void FreeTimestep(SstStream Stream, long Timestep)
if (List->MetadataMsg == NULL)
printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, line %d\n",
List->MetadataMsg, __LINE__);
CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm, List->MetadataMsg);

free(List);
}
Expand All @@ -1204,7 +1206,8 @@ static void FreeTimestep(SstStream Stream, long Timestep)
printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, "
"line %d\n",
List->MetadataMsg, __LINE__);
CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm,
List->MetadataMsg);

free(List);
break;
Expand Down Expand Up @@ -1388,8 +1391,9 @@ extern void SstReaderDefinitionLock(SstStream Stream, long EffectiveTimestep)
memset(&Msg, 0, sizeof(Msg));
Msg.Timestep = EffectiveTimestep;

sendOneToEachWriterRank(Stream, Stream->CPInfo->LockReaderDefinitionsFormat,
&Msg, &Msg.WSR_Stream);
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->LockReaderDefinitionsFormat, &Msg,
&Msg.WSR_Stream);
}

// SstReleaseStep is only called by the main program thread. It
Expand Down Expand Up @@ -1430,8 +1434,9 @@ extern void SstReleaseStep(SstStream Stream)
Stream,
"Sending ReleaseTimestep message for timestep %d, one to each writer\n",
Timestep);
sendOneToEachWriterRank(Stream, Stream->CPInfo->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->SharedCM->ReleaseTimestepFormat,
&Msg, &Msg.WSR_Stream);

if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS)
{
Expand Down Expand Up @@ -1990,12 +1995,12 @@ extern void SstReaderClose(SstStream Stream)
gettimeofday(&CloseTime, NULL);
timersub(&CloseTime, &Stream->ValidStartTime, &Diff);
memset(&Msg, 0, sizeof(Msg));
sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderCloseFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(Stream, Stream->CPInfo->SharedCM->ReaderCloseFormat,
&Msg, &Msg.WSR_Stream);
if (Stream->Stats)
Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec;

CMusleep(Stream->CPInfo->cm, 100000);
CMusleep(Stream->CPInfo->SharedCM->cm, 100000);
if (Stream->CurrentMetadata != NULL)
{
if (Stream->CurrentMetadata->FreeBlock)
Expand Down
55 changes: 22 additions & 33 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,6 @@ static void RemoveQueueEntries(SstStream Stream)
}
}

static void ReleaseAndDiscardRemainingTimesteps(SstStream Stream)
{
CPTimestepList List = Stream->QueuedTimesteps;

while (List)
{
List->Expired = 1;
List->PreciousTimestep = 0;
List->ReferenceCount = 0;
List = List->Next;
}
RemoveQueueEntries(Stream);
}

/*
Queue maintenance: (ASSUME LOCKED)
calculate smallest entry for CurrentTimestep in a reader. Update that
Expand Down Expand Up @@ -517,7 +503,7 @@ static void SendPeerSetupMsg(WS_ReaderInfo reader, int reversePeer, int myRank)
setup.WriterRank = myRank;
setup.WriterCohortSize = Stream->CohortSize;
STREAM_ASSERT_UNLOCKED(Stream);
if (CMwrite(conn, Stream->CPInfo->PeerSetupFormat, &setup) != 1)
if (CMwrite(conn, Stream->CPInfo->SharedCM->PeerSetupFormat, &setup) != 1)
{
CP_verbose(Stream,
"Message failed to send to reader in sendPeerSetup in "
Expand Down Expand Up @@ -606,7 +592,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize,
if (!reader->Connections[peer].CMconn)
{
reader->Connections[peer].CMconn =
CMget_conn(reader->ParentStream->CPInfo->cm,
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[peer].ContactList);
}

Expand Down Expand Up @@ -660,7 +646,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize,
usleep(WriterRank *
reader->ParentStream->ConnectionUsleepMultiplier);
reader->Connections[peer].CMconn =
CMget_conn(reader->ParentStream->CPInfo->cm,
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[peer].ContactList);

if (!reader->Connections[peer].CMconn)
Expand Down Expand Up @@ -694,7 +680,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize,
if (!reader->Connections[0].CMconn)
{
reader->Connections[0].CMconn =
CMget_conn(reader->ParentStream->CPInfo->cm,
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[0].ContactList);
}
if (!reader->Connections[0].CMconn)
Expand Down Expand Up @@ -811,7 +797,7 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream)
&free_block);
WriterResponseCondition = Req->Msg->WriterResponseCondition;
conn = Req->Conn;
CMreturn_buffer(Stream->CPInfo->cm, Req->Msg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm, Req->Msg);
free(Req);
}
else
Expand Down Expand Up @@ -948,7 +934,8 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream)
response.DP_WriterInfo[i] = pointers[i]->DP_Info;
}
STREAM_ASSERT_UNLOCKED(Stream);
if (CMwrite(conn, Stream->CPInfo->WriterResponseFormat, &response) != 1)
if (CMwrite(conn, Stream->CPInfo->SharedCM->WriterResponseFormat,
&response) != 1)
{
CP_verbose(
Stream,
Expand Down Expand Up @@ -1167,9 +1154,10 @@ static void SendTimestepEntryToSingleReader(SstStream Stream,
Entry->Msg->PreloadMode = PMode;
STREAM_MUTEX_LOCK(Stream);
if (CP_WSR_Stream->ReaderStatus == Established)
sendOneToWSRCohort(CP_WSR_Stream,
Stream->CPInfo->DeliverTimestepMetadataFormat,
Entry->Msg, &Entry->Msg->RS_Stream);
sendOneToWSRCohort(
CP_WSR_Stream,
Stream->CPInfo->SharedCM->DeliverTimestepMetadataFormat,
Entry->Msg, &Entry->Msg->RS_Stream);
}
}

Expand Down Expand Up @@ -1306,7 +1294,7 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm)
if (Stream->RendezvousReaderCount > 0)
{
Stream->FirstReaderCondition =
CMCondition_get(Stream->CPInfo->cm, NULL);
CMCondition_get(Stream->CPInfo->SharedCM->cm, NULL);
}
else
{
Expand Down Expand Up @@ -1470,7 +1458,7 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
if (NewState == PeerFailed)
{
// move to fully closed state later
CMfree(CMadd_delayed_task(ParentStream->CPInfo->cm, 2, 0,
CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0,
CloseWSRStream, CP_WSR_Stream));
}
}
Expand Down Expand Up @@ -1505,8 +1493,8 @@ void SstWriterClose(SstStream Stream)
"SstWriterClose, Sending Close at Timestep %d, one to each reader\n",
Msg.FinalTimestep);

sendOneToEachReaderRank(Stream, Stream->CPInfo->WriterCloseFormat, &Msg,
&Msg.RS_Stream);
sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat,
&Msg, &Msg.RS_Stream);

UntagPreciousTimesteps(Stream);
Stream->ConfigParams->ReserveQueueLimit = 0;
Expand Down Expand Up @@ -1857,9 +1845,10 @@ static void ActOnTSLockStatus(SstStream Stream, long Timestep)
}
Msg.Timestep = Timestep;
SomethingSent++;
sendOneToWSRCohort(Stream->Readers[i],
Stream->CPInfo->CommPatternLockedFormat, &Msg,
&Msg.RS_Stream);
sendOneToWSRCohort(
Stream->Readers[i],
Stream->CPInfo->SharedCM->CommPatternLockedFormat, &Msg,
&Msg.RS_Stream);
Stream->Readers[i]->PreloadMode = SstPreloadLearned;
Stream->Readers[i]->PreloadModeActiveTimestep = Timestep;
CP_verbose(Stream,
Expand Down Expand Up @@ -2273,9 +2262,9 @@ extern void SstInternalProvideTimestep(
Timestep);

STREAM_MUTEX_LOCK(Stream);
sendOneToEachReaderRank(Stream,
Stream->CPInfo->DeliverTimestepMetadataFormat,
Msg, &Msg->RS_Stream);
sendOneToEachReaderRank(
Stream, Stream->CPInfo->SharedCM->DeliverTimestepMetadataFormat,
Msg, &Msg->RS_Stream);

Entry->Expired = 1;
Entry->ReferenceCount = 0;
Expand Down
Loading

0 comments on commit e545e34

Please sign in to comment.