From 197fbff746f5735af057135e6b4cd8a45f112178 Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Fri, 22 Jul 2022 20:26:05 -0400 Subject: [PATCH] MPI_DP: Improve scalability Multiple changes are contained in this commit: - Adoption of BSD Queues instead of custom made linked-lists. - Used of linked-lists in often-insertion/no-random-access scenarios. - Removed unneeded dynamic allocated variables. - Fixed minor memory leak on StreamWSR destruction. - Finer grain lock/mutex usage. - Added multiple comments and diagram. - Renamed most of the data structures for better consistency. - Rearranged function order in the file to better follow its execution. --- source/adios2/toolkit/sst/dp/mpi_dp.c | 952 +++++++++++++------------- 1 file changed, 467 insertions(+), 485 deletions(-) diff --git a/source/adios2/toolkit/sst/dp/mpi_dp.c b/source/adios2/toolkit/sst/dp/mpi_dp.c index c692558e7a..85d84ac692 100644 --- a/source/adios2/toolkit/sst/dp/mpi_dp.c +++ b/source/adios2/toolkit/sst/dp/mpi_dp.c @@ -1,22 +1,61 @@ +/** + * ADIOS is freely available under the terms of the BSD license described + * in the COPYING file in the top level directory of this source distribution. + * + * mpi_dp.c + * + * Message Passing Interface (MPI) Data plane transport for ADIOS2 SST + * + * Data scheme of the main data structures introduced here: + * + * +-------------+ +-------------------+ +----------------+ + * | MpiStreamWR | | MpiStreamWPR | | MpiStreamRD | + * | (Writer) | | (WriterPerReader) | | (Reader) | + * | | | | | | + * | + Readers +------>| + ReaderCohort | | + WriterCohort | + * | (SLIST) | | (Array) | | (Array) | + * | + TimeSteps | | + ReaderMPIComm | | | + * | (SLIST) | | (Array) | | | + * +-------------+ +-------------------+ +----------------+ + * + * Author: Vicente Adolfo Bolea Sanchez + */ + #include "dp_interface.h" #include "sst_data.h" #include #include -#include + #include #include #include + +#include +#include #include #define MPI_DP_CONTACT_STRING_LEN 64 +#define QUOTE(name) #name +#define MACRO_TO_STR(name) QUOTE(name) -static pthread_rwlock_t LockRS = PTHREAD_RWLOCK_INITIALIZER; -static pthread_rwlock_t LockTS = PTHREAD_RWLOCK_INITIALIZER; static pthread_once_t OnceMpiInitializer = PTHREAD_ONCE_INIT; /*****Stream Basic Structures ***********************************************/ +typedef struct _MpiReaderContactInfo +{ + char ContactString[MPI_DP_CONTACT_STRING_LEN]; + void *StreamRS; +} * MpiReaderContactInfo; + +typedef struct _MpiWriterContactInfo +{ + char ContactString[MPI_DP_CONTACT_STRING_LEN]; + void *StreamWPR; + int PID; +} * MpiWriterContactInfo; + /* Base Stream class, used implicitly */ typedef struct _MpiStream { @@ -44,26 +83,28 @@ typedef struct _MpiStreamRD MpiStreamLink Link; CMFormat ReadRequestFormat; - struct _MpiWriterContactInfo *WriterContactInfo; + struct _MpiReaderContactInfo MyContactInfo; + struct _MpiWriterContactInfo *CohortWriterInfo; + MPI_Comm *CohortMpiComms; } * MpiStreamRD; /** * Writers Stream. * * It does not directly contains data related to each of the connected Readers. - * Instead it contains a collection of MpiStreamWSR that represent the Stream + * Instead it contains a collection of MpiStreamWPR that represent the Stream * used for interacting with each (one/multiple) of the Readers. */ -typedef struct _MpiStreamWS +typedef struct _MpiStreamWR { MpiStream Stream; - struct _TimeStepEntry *TimeSteps; CMFormat ReadReplyFormat; - int ReaderCount; - struct _MpiStreamWSR **Readers; + STAILQ_HEAD(TimeStepsListHead, _TimeStepsEntry) TimeSteps; + STAILQ_HEAD(ReadersListHead, _MpiStreamWPR) Readers; + pthread_rwlock_t LockTS; pthread_mutex_t MutexReaders; -} * MpiStreamWS; +} * MpiStreamWR; /** * WritersPerReader streams. @@ -71,42 +112,25 @@ typedef struct _MpiStreamWS * It is used in the Writer side to represent the Stream used for communicated * with a single Reader. */ -typedef struct _MpiStreamWSR +typedef struct _MpiStreamWPR { MpiStreamLink Link; + struct _MpiStreamWR *StreamWR; - struct _MpiStreamWS *StreamWS; - struct _MpiReaderContactInfo *ReaderContactInfo; + struct _MpiWriterContactInfo MyContactInfo; + struct _MpiReaderContactInfo *CohortReaderInfo; + MPI_Comm *CohortMpiComms; char MpiPortName[MPI_MAX_PORT_NAME]; -} * MpiStreamWSR; -typedef struct _MpiPerTimeStepInfo -{ - char *CheckString; -} * MpiPerTimeStepInfo; + STAILQ_ENTRY(_MpiStreamWPR) entries; +} * MpiStreamWPR; -typedef struct _TimeStepEntry +typedef struct _TimeStepsEntry { long TimeStep; struct _SstData *Data; - struct _MpiPerTimeStepInfo *DP_TimeStepInfo; - struct _TimeStepEntry *Next; -} * TimeStepList; - -typedef struct _MpiReaderContactInfo -{ - char *ContactString; - void *StreamRS; - MPI_Comm MpiComm; -} * MpiReaderContactInfo; - -typedef struct _MpiWriterContactInfo -{ - char *ContactString; - void *StreamWS; - MPI_Comm MpiComm; - int PID; -} * MpiWriterContactInfo; + STAILQ_ENTRY(_TimeStepsEntry) entries; +} * TimeStepsEntry; /*****Message Data Structures ***********************************************/ @@ -122,40 +146,49 @@ enum MPI_DP_COMM_TYPE typedef struct _MpiReadRequestMsg { + int NotifyCondition; + int RequestingRank; long TimeStep; - size_t Offset; size_t Length; - void *StreamWS; + size_t Offset; void *StreamRS; - int RequestingRank; - int NotifyCondition; - enum MPI_DP_COMM_TYPE CommType; + void *StreamWPR; } * MpiReadRequestMsg; typedef struct _MpiReadReplyMsg { + char *Data; + char *MpiPortName; + int NotifyCondition; long TimeStep; size_t DataLength; void *StreamRS; - int NotifyCondition; - char *MpiPortName; - char *Data; } * MpiReadReplyMsg; +typedef struct _MpiCompletionHandle +{ + struct _MpiReadRequestMsg ReadRequest; + + CManager cm; + void *CPStream; + void *Buffer; + int DestinationRank; + enum MPI_DP_COMM_TYPE CommType; +} * MpiCompletionHandle; + static FMField MpiReadRequestList[] = { {"TimeStep", "integer", sizeof(long), FMOffset(MpiReadRequestMsg, TimeStep)}, {"Offset", "integer", sizeof(size_t), FMOffset(MpiReadRequestMsg, Offset)}, {"Length", "integer", sizeof(size_t), FMOffset(MpiReadRequestMsg, Length)}, - {"StreamWS", "integer", sizeof(void *), - FMOffset(MpiReadRequestMsg, StreamWS)}, + {"StreamWPR", "integer", sizeof(void *), + FMOffset(MpiReadRequestMsg, StreamWPR)}, {"StreamRS", "integer", sizeof(void *), FMOffset(MpiReadRequestMsg, StreamRS)}, {"RequestingRank", "integer", sizeof(int), FMOffset(MpiReadRequestMsg, RequestingRank)}, {"NotifyCondition", "integer", sizeof(int), FMOffset(MpiReadRequestMsg, NotifyCondition)}, - {"CommType", "integer", sizeof(int), FMOffset(MpiReadRequestMsg, CommType)}, {NULL, NULL, 0, 0}}; static FMStructDescRec MpiReadRequestStructs[] = { @@ -181,8 +214,8 @@ static FMStructDescRec MpiReadReplyStructs[] = { {NULL, NULL, 0, NULL}}; static FMField MpiReaderContactList[] = { - {"ContactString", "string", sizeof(char *), - FMOffset(MpiReaderContactInfo, ContactString)}, + {"ContactString", "char[" MACRO_TO_STR(MPI_DP_CONTACT_STRING_LEN) "]", + sizeof(char), FMOffset(MpiReaderContactInfo, ContactString)}, {"reader_ID", "integer", sizeof(void *), FMOffset(MpiReaderContactInfo, StreamRS)}, {NULL, NULL, 0, 0}}; @@ -193,12 +226,10 @@ static FMStructDescRec MpiReaderContactStructs[] = { {NULL, NULL, 0, NULL}}; static FMField MpiWriterContactList[] = { - {"ContactString", "string", sizeof(char *), - FMOffset(MpiWriterContactInfo, ContactString)}, + {"ContactString", "char[" MACRO_TO_STR(MPI_DP_CONTACT_STRING_LEN) "]", + sizeof(char), FMOffset(MpiWriterContactInfo, ContactString)}, {"writer_ID", "integer", sizeof(void *), - FMOffset(MpiWriterContactInfo, StreamWS)}, - {"MpiComm", "integer", sizeof(int), - FMOffset(MpiWriterContactInfo, MpiComm)}, + FMOffset(MpiWriterContactInfo, StreamWPR)}, {"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)}, {NULL, NULL, 0, 0}}; @@ -212,6 +243,9 @@ static FMStructDescRec MpiWriterContactStructs[] = { static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs); +static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs); + /** * Initialize MPI in the mode that it is required for MPI_DP to work. * @@ -244,7 +278,7 @@ static void MpiInitialize() /*****Public accessible functions********************************************/ /** - * InitReader. + * MpiInitReader. * * Called by the control plane collectively during the early stages of Open on * the reader side. It should do whatever is necessary to initialize a new @@ -262,10 +296,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, pthread_once(&OnceMpiInitializer, MpiInitialize); MpiStreamRD Stream = calloc(sizeof(struct _MpiStreamRD), 1); - MpiReaderContactInfo Contact = - calloc(sizeof(struct _MpiReaderContactInfo), 1); CManager cm = Svcs->getCManager(CP_Stream); - char *MpiContactString = calloc(sizeof(char), MPI_DP_CONTACT_STRING_LEN); SMPI_Comm comm = Svcs->getMPIComm(CP_Stream); CMFormat F; @@ -275,211 +306,31 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, SMPI_Comm_rank(comm, &Stream->Stream.Rank); - snprintf(MpiContactString, MPI_DP_CONTACT_STRING_LEN, "Reader Rank %d", - Stream->Stream.Rank); - - /* - * add a handler for read reply messages - */ + /* add a handler for read reply messages */ Stream->ReadRequestFormat = CMregister_format(cm, MpiReadRequestStructs); F = CMregister_format(cm, MpiReadReplyStructs); CMregister_handler(F, MpiReadReplyHandler, Svcs); - Contact->ContactString = MpiContactString; - Contact->StreamRS = Stream; - - *ReaderContactInfoPtr = Contact; + /* Generate Contact info */ + snprintf(Stream->MyContactInfo.ContactString, MPI_DP_CONTACT_STRING_LEN, + "Reader Rank %d", Stream->Stream.Rank); + Stream->MyContactInfo.StreamRS = Stream; + *ReaderContactInfoPtr = &Stream->MyContactInfo; Svcs->verbose(Stream->Stream.CP_Stream, DPTraceVerbose, - "MPI dataplane reader initialized, reader rank %d", + "MPI dataplane reader initialized, reader rank %d\n", Stream->Stream.Rank); return Stream; } -static char *FetchTimeStep(TimeStepList timesteps, long timestep, long offset, - long length) -{ - TimeStepList ts = timesteps; - - pthread_rwlock_rdlock(&LockTS); - - // Find the requested timestep - while (ts != NULL && ts->TimeStep != timestep) - { - ts = ts->Next; - } - - if (ts == NULL) - { - fprintf(stderr, "Failed to read TimeStep %ld, not found\n", timestep); - return NULL; - } - - char *outboundBuffer = malloc(sizeof(char) * length); - memcpy(outboundBuffer, ts->Data->block + offset, length); - - pthread_rwlock_unlock(&LockTS); - - return outboundBuffer; -} - -static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, - void *client_Data, attr_list attrs) -{ - MpiReadRequestMsg ReadRequestMsg = (MpiReadRequestMsg)msg_v; - MpiStreamWSR StreamWSR = ReadRequestMsg->StreamWS; - MpiStreamWS StreamWS = StreamWSR->StreamWS; - CP_Services Svcs = (CP_Services)client_Data; - - Svcs->verbose(StreamWS->Stream.CP_Stream, DPTraceVerbose, - "MpiReadRequestHandler:" - "read request from reader=%d,ts=%d,off=%d,len=%d\n", - ReadRequestMsg->RequestingRank, ReadRequestMsg->TimeStep, - ReadRequestMsg->Offset, ReadRequestMsg->Length); - - PERFSTUBS_TIMER_START_FUNC(timer); - - char *outboundBuffer = NULL; - if (NULL == (outboundBuffer = FetchTimeStep( - StreamWS->TimeSteps, ReadRequestMsg->TimeStep, - ReadRequestMsg->Offset, ReadRequestMsg->Length))) - { - PERFSTUBS_TIMER_STOP_FUNC(timer); - return; - } - - struct _MpiReadReplyMsg ReadReplyMsg = { - .TimeStep = ReadRequestMsg->TimeStep, - .DataLength = ReadRequestMsg->Length, - .StreamRS = ReadRequestMsg->StreamRS, - .NotifyCondition = ReadRequestMsg->NotifyCondition, - .MpiPortName = StreamWSR->MpiPortName, - }; - - if (MPI_DP_LOCAL == ReadRequestMsg->CommType) - { - ReadReplyMsg.Data = outboundBuffer; - } - - Svcs->verbose( - StreamWS->Stream.CP_Stream, DPTraceVerbose, - "MpiReadRequestHandler: Replying reader=%d with MPI port name=%s\n", - ReadRequestMsg->RequestingRank, StreamWSR->MpiPortName); - - Svcs->sendToPeer(StreamWS->Stream.CP_Stream, StreamWSR->Link.PeerCohort, - ReadRequestMsg->RequestingRank, StreamWS->ReadReplyFormat, - &ReadReplyMsg); - - if (MPI_DP_REMOTE == ReadRequestMsg->CommType) - { - // Send the actual Data using MPI - MPI_Comm *comm = - &StreamWSR->ReaderContactInfo[ReadRequestMsg->RequestingRank] - .MpiComm; - MPI_Errhandler worldErrHandler; - MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler); - MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); - int ret = MPI_Send(outboundBuffer, ReadRequestMsg->Length, MPI_CHAR, 0, - ReadRequestMsg->NotifyCondition, *comm); - MPI_Comm_set_errhandler(MPI_COMM_WORLD, worldErrHandler); - - if (ret != MPI_SUCCESS) - { - MPI_Comm_accept(StreamWSR->MpiPortName, MPI_INFO_NULL, 0, - MPI_COMM_SELF, comm); - Svcs->verbose( - StreamWS->Stream.CP_Stream, DPTraceVerbose, - "MpiReadRequestHandler: Accepted client, Link.CohortSize=%d\n", - StreamWSR->Link.CohortSize); - MPI_Send(outboundBuffer, ReadRequestMsg->Length, MPI_CHAR, 0, - ReadRequestMsg->NotifyCondition, *comm); - } - } - - free(outboundBuffer); - - PERFSTUBS_TIMER_STOP_FUNC(timer); -} - -typedef struct _MpiCompletionHandle -{ - int CMcondition; - CManager cm; - void *CPStream; - void *Buffer; - int Rank; - enum MPI_DP_COMM_TYPE CommType; -} * MpiCompletionHandle; - /** - * This is invoked at the Reader side when a reply is ready to be read + * InitWriter * - */ -static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, - void *client_Data, attr_list attrs) -{ - PERFSTUBS_TIMER_START_FUNC(timer); - MpiReadReplyMsg ReadReplyMsg = (MpiReadReplyMsg)msg_v; - MpiStreamRD StreamRS = ReadReplyMsg->StreamRS; - CP_Services Svcs = (CP_Services)client_Data; - MpiCompletionHandle Handle = - CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); - - Svcs->verbose( - StreamRS->Stream.CP_Stream, DPTraceVerbose, - "MpiReadReplyHandler: Read recv from rank=%d,condition=%d,size=%d\n", - Handle->Rank, ReadReplyMsg->NotifyCondition, ReadReplyMsg->DataLength); - - if (MPI_DP_LOCAL == Handle->CommType) - { - memcpy(Handle->Buffer, ReadReplyMsg->Data, ReadReplyMsg->DataLength); - } - else - { - pthread_rwlock_rdlock(&LockRS); - MPI_Comm comm = StreamRS->WriterContactInfo[Handle->Rank].MpiComm; - pthread_rwlock_unlock(&LockRS); - - MPI_Errhandler worldErrHandler; - MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler); - MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); - int ret = - MPI_Recv(Handle->Buffer, ReadReplyMsg->DataLength, MPI_CHAR, 0, - ReadReplyMsg->NotifyCondition, comm, MPI_STATUS_IGNORE); - MPI_Comm_set_errhandler(MPI_COMM_WORLD, worldErrHandler); - - if (ret != MPI_SUCCESS) - { - MPI_Comm_connect(ReadReplyMsg->MpiPortName, MPI_INFO_NULL, 0, - MPI_COMM_SELF, &comm); - - Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose, - "MpiReadReplyHandler: Connecting to MPI Server\n"); - MPI_Recv(Handle->Buffer, ReadReplyMsg->DataLength, MPI_CHAR, 0, - ReadReplyMsg->NotifyCondition, comm, MPI_STATUS_IGNORE); - } - - pthread_rwlock_wrlock(&LockRS); - StreamRS->WriterContactInfo[Handle->Rank].MpiComm = comm; - pthread_rwlock_unlock(&LockRS); - } - StreamRS->Link.Stats->DataBytesReceived += ReadReplyMsg->DataLength; - - /* - * Signal the condition to wake the reader if they are waiting. - */ - CMCondition_signal(cm, ReadReplyMsg->NotifyCondition); - PERFSTUBS_TIMER_STOP_FUNC(timer); -} - -/* - * - * InitWriter. Called by the control plane collectively during the early + * Called by the control plane collectively during the early * stages of Open on the writer side. It should do whatever is necessary to * initialize a new writer-side data plane. This does *not* include creating * contact information per se. That can be put off until InitWriterPerReader(). - * */ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, attr_list DPAttrs, @@ -487,34 +338,36 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, { pthread_once(&OnceMpiInitializer, MpiInitialize); - /* Make MutexReaders to be recursive */ - - MpiStreamWS Stream = calloc(sizeof(struct _MpiStreamWS), 1); + MpiStreamWR Stream = calloc(sizeof(struct _MpiStreamWR), 1); CManager cm = Svcs->getCManager(CP_Stream); SMPI_Comm comm = Svcs->getMPIComm(CP_Stream); CMFormat F; + /* Make MutexReaders to be recursive */ pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&Stream->MutexReaders, &attr); + pthread_rwlock_init(&Stream->LockTS, NULL); + SMPI_Comm_rank(comm, &Stream->Stream.Rank); Stream->Stream.CP_Stream = CP_Stream; Stream->Stream.PID = getpid(); + STAILQ_INIT(&Stream->TimeSteps); + STAILQ_INIT(&Stream->Readers); - /* - * add a handler for read request messages - */ + /* * add a handler for read request messages */ F = CMregister_format(cm, MpiReadRequestStructs); CMregister_handler(F, MpiReadRequestHandler, Svcs); - /* - * register read reply message structure so we can send later - */ + /* * register read reply message structure so we can send later */ Stream->ReadReplyFormat = CMregister_format(cm, MpiReadReplyStructs); + Svcs->verbose(CP_Stream, DPTraceVerbose, + "MpiInitWriter initialized addr=%p\n", Stream); + return (void *)Stream; } @@ -528,75 +381,63 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, * The structure of that information should be described by * DPInterface.WriterContactFormats. (This is an FFS format description. See * https://www.cc.gatech.edu/systems/projects/FFS/.) - * */ static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v, int readerCohortSize, CP_PeerCohort PeerCohort, void **providedReaderInfo_v, void **WriterContactInfoPtr) { - MpiStreamWS StreamWS = (MpiStreamWS)WS_Stream_v; - MpiStreamWSR StreamWSR = calloc(sizeof(struct _MpiStreamWSR), 1); - MpiWriterContactInfo ContactInfo; - SMPI_Comm comm = Svcs->getMPIComm(StreamWS->Stream.CP_Stream); + MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v; + MpiStreamWPR StreamWPR = calloc(sizeof(struct _MpiStreamWPR), 1); MpiReaderContactInfo *providedReaderInfo = (MpiReaderContactInfo *)providedReaderInfo_v; - char *MpiContactString = calloc(sizeof(char), MPI_DP_CONTACT_STRING_LEN); - - int Rank; - SMPI_Comm_rank(comm, &Rank); - snprintf(MpiContactString, MPI_DP_CONTACT_STRING_LEN, - "Writer Rank %d, test contact", Rank); - MPI_Open_port(MPI_INFO_NULL, StreamWSR->MpiPortName); + MPI_Open_port(MPI_INFO_NULL, StreamWPR->MpiPortName); - StreamWSR->StreamWS = StreamWS; /* pointer to writer struct */ - StreamWSR->Link.PeerCohort = PeerCohort; - StreamWSR->Link.CohortSize = readerCohortSize; + StreamWPR->StreamWR = StreamWR; /* pointer to writer struct */ + StreamWPR->Link.PeerCohort = PeerCohort; + StreamWPR->Link.CohortSize = readerCohortSize; - Svcs->verbose(StreamWS->Stream.CP_Stream, DPTraceVerbose, + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, "MPI dataplane WriterPerReader to be initialized\n"); - /* - * make a copy of writer contact information (original will not be - * preserved) - */ - StreamWSR->ReaderContactInfo = - calloc(sizeof(struct _MpiReaderContactInfo), readerCohortSize); + /* * Copy of writer contact information (original will not be preserved) */ + StreamWPR->CohortReaderInfo = + malloc(sizeof(struct _MpiReaderContactInfo) * readerCohortSize); + StreamWPR->CohortMpiComms = malloc(sizeof(MPI_Comm) * readerCohortSize); for (int i = 0; i < readerCohortSize; i++) { - StreamWSR->ReaderContactInfo[i].ContactString = - strdup(providedReaderInfo[i]->ContactString); - StreamWSR->ReaderContactInfo[i].StreamRS = - providedReaderInfo[i]->StreamRS; - StreamWSR->ReaderContactInfo[i].MpiComm = MPI_COMM_NULL; - Svcs->verbose( - StreamWS->Stream.CP_Stream, DPTraceVerbose, - "Received contact info \"%s\", RD_Stream %p for Reader Rank %d\n", - StreamWSR->ReaderContactInfo[i].ContactString, - StreamWSR->ReaderContactInfo[i].StreamRS, i); + memcpy(&StreamWPR->CohortReaderInfo[i], providedReaderInfo[i], + sizeof(struct _MpiReaderContactInfo)); + StreamWPR->CohortMpiComms[i] = MPI_COMM_NULL; } - /* - * add this writer-side reader-specific stream to the parent writer stream - * structure - */ - pthread_mutex_lock(&StreamWS->MutexReaders); - StreamWS->Readers = realloc( - StreamWS->Readers, sizeof(*StreamWSR) * (StreamWS->ReaderCount + 1)); - StreamWS->Readers[StreamWS->ReaderCount] = StreamWSR; - StreamWS->ReaderCount++; - pthread_mutex_unlock(&StreamWS->MutexReaders); - - ContactInfo = calloc(sizeof(struct _MpiWriterContactInfo), 1); - ContactInfo->ContactString = MpiContactString; - ContactInfo->StreamWS = StreamWSR; - ContactInfo->PID = StreamWS->Stream.PID; - *WriterContactInfoPtr = ContactInfo; - - return StreamWSR; + pthread_mutex_lock(&StreamWR->MutexReaders); + STAILQ_INSERT_TAIL(&StreamWR->Readers, StreamWPR, entries); + pthread_mutex_unlock(&StreamWR->MutexReaders); + + /* Prepare ContactInfo */ + int Rank; + SMPI_Comm comm = Svcs->getMPIComm(StreamWR->Stream.CP_Stream); + SMPI_Comm_rank(comm, &Rank); + snprintf(StreamWPR->MyContactInfo.ContactString, MPI_DP_CONTACT_STRING_LEN, + "Writer Rank %d, test contact", Rank); + + StreamWPR->MyContactInfo.StreamWPR = StreamWPR; + StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID; + *WriterContactInfoPtr = &StreamWPR->MyContactInfo; + + return StreamWPR; } +/** + * MpiProvideWriterDataToReader + * + * This function is the last step of the Writer/Reader handshake, this is + * called after MpiInitWriterPerReader is invoked at the writer side. This + * function recieves the WriterContactInfo created at MpiInitWriterPerReader in + * providedWriterInfo_v argument. + */ static void MpiProvideWriterDataToReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v, int writerCohortSize, @@ -610,36 +451,46 @@ static void MpiProvideWriterDataToReader(CP_Services Svcs, StreamRS->Link.PeerCohort = PeerCohort; StreamRS->Link.CohortSize = writerCohortSize; - /* - * make a copy of writer contact information (original will not be - * preserved) - */ - struct _MpiWriterContactInfo *tmp = - calloc(sizeof(struct _MpiWriterContactInfo), writerCohortSize); + /* * Copy of writer contact information (original will not be preserved) */ + StreamRS->CohortWriterInfo = + malloc(sizeof(struct _MpiWriterContactInfo) * writerCohortSize); + StreamRS->CohortMpiComms = malloc(sizeof(MPI_Comm) * writerCohortSize); for (int i = 0; i < writerCohortSize; i++) { - tmp[i].ContactString = strdup(providedWriterInfo[i]->ContactString); - tmp[i].StreamWS = providedWriterInfo[i]->StreamWS; - tmp[i].MpiComm = MPI_COMM_NULL; - tmp[i].PID = providedWriterInfo[i]->PID; + memcpy(&StreamRS->CohortWriterInfo[i], providedWriterInfo[i], + sizeof(struct _MpiWriterContactInfo)); + StreamRS->CohortMpiComms[i] = MPI_COMM_NULL; + } +} + +/** + * LoadTimeStep + */ +static char *LoadTimeStep(MpiStreamWR Stream, long TimeStep) +{ + TimeStepsEntry Entry = NULL; + char *Data = NULL; - if (StreamRS->WriterContactInfo && - StreamRS->WriterContactInfo[i].MpiComm != MPI_COMM_NULL) + pthread_rwlock_rdlock(&Stream->LockTS); + STAILQ_FOREACH(Entry, &Stream->TimeSteps, entries) + { + if (Entry->TimeStep == TimeStep) { - MPI_Comm_disconnect(&StreamRS->WriterContactInfo[i].MpiComm); + break; } + } + pthread_rwlock_unlock(&Stream->LockTS); - Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose, - "Received contact info \"%s\", WS_stream %p for WSR Rank " - "%d\n", - tmp[i].ContactString, tmp[i].StreamWS, i); + if (Entry && Entry->Data) + { + Data = Entry->Data->block; } - StreamRS->WriterContactInfo = tmp; + return Data; } /** - * ReadRemoteMemory. + * MpiReadRemoteMemory. * * Called by the control plane on the reader side to request that timestep data * from the writer side, identified by Rank, TimeStep, starting at a particular @@ -649,7 +500,6 @@ static void MpiProvideWriterDataToReader(CP_Services Svcs, * This is an asyncronous request in that it need not be completed before this * call returns. The void* return value will later be passed to a * WaitForCompletion call and should represent a completion handle. - * */ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, int Rank, long TimeStep, size_t Offset, @@ -661,45 +511,44 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, CManager cm = Svcs->getCManager(Stream->Stream.CP_Stream); MpiCompletionHandle ret = calloc(sizeof(struct _MpiCompletionHandle), 1); - pthread_rwlock_rdlock(&LockRS); - - ret->CMcondition = CMCondition_get(cm, NULL); - ret->CPStream = Stream->Stream.CP_Stream; - ret->cm = cm; - ret->Buffer = Buffer; - ret->Rank = Rank; - ret->CommType = (Stream->WriterContactInfo[Rank].PID == Stream->Stream.PID) - ? MPI_DP_LOCAL - : MPI_DP_REMOTE; - - /* - * set the completion handle as client Data on the condition so that - * handler has access to it. - */ - CMCondition_set_client_data(cm, ret->CMcondition, ret); + MpiWriterContactInfo TargetContact = &Stream->CohortWriterInfo[Rank]; Svcs->verbose( Stream->Stream.CP_Stream, DPTraceVerbose, "Reader (rank %d) requesting to read remote memory for TimeStep %d " - "from Rank %d, StreamWSR = %p, Offset=%d, Length=%d\n", - Stream->Stream.Rank, TimeStep, Rank, - Stream->WriterContactInfo[Rank].StreamWS, Offset, Length); + "from Rank %d, StreamWPR =%p, Offset=%d, Length=%d\n", + Stream->Stream.Rank, TimeStep, Rank, TargetContact->StreamWPR, Offset, + Length); /* send request to appropriate writer */ struct _MpiReadRequestMsg ReadRequestMsg = { - .TimeStep = TimeStep, - .Offset = Offset, .Length = Length, - .StreamWS = Stream->WriterContactInfo[Rank].StreamWS, - .StreamRS = Stream, + .NotifyCondition = CMCondition_get(cm, NULL), + .Offset = Offset, .RequestingRank = Stream->Stream.Rank, - .NotifyCondition = ret->CMcondition, - .CommType = ret->CommType}; + .StreamRS = Stream, + .StreamWPR = TargetContact->StreamWPR, + .TimeStep = TimeStep}; - pthread_rwlock_unlock(&LockRS); + ret->ReadRequest = ReadRequestMsg; + ret->Buffer = Buffer; + ret->cm = cm; + ret->CPStream = Stream->Stream.CP_Stream; + ret->DestinationRank = Rank; + ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL + : MPI_DP_REMOTE; + + if (ret->CommType == MPI_DP_REMOTE) + { + CMCondition_set_client_data(cm, ReadRequestMsg.NotifyCondition, ret); + Svcs->sendToPeer(Stream->Stream.CP_Stream, Stream->Link.PeerCohort, + Rank, Stream->ReadRequestFormat, &ReadRequestMsg); + + Svcs->verbose(Stream->Stream.CP_Stream, DPTraceVerbose, + "ReadRemoteMemory: Send to server, Link.CohortSize=%d\n", + Stream->Link.CohortSize); + } - Svcs->sendToPeer(Stream->Stream.CP_Stream, Stream->Link.PeerCohort, Rank, - Stream->ReadRequestFormat, &ReadRequestMsg); return ret; } @@ -711,41 +560,192 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, * until that particular remote memory read is complete and the buffer is full. * A zero return means that the read failed and will result in a (hopefully * orderly) shutdown of the stream. + * + * If the writer exists in the same process as the reader a local direct read + * is attempted. */ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v) { - MpiCompletionHandle Handle = (MpiCompletionHandle)Handle_v; + const MpiCompletionHandle Handle = (MpiCompletionHandle)Handle_v; + const struct _MpiReadRequestMsg Request = Handle->ReadRequest; + int Ret = 0; + Svcs->verbose( Handle->CPStream, DPTraceVerbose, - "Waiting for completion of memory read to rank %d, condition %d\n", - Handle->Rank, Handle->CMcondition); + "Waiting for completion of memory read to rank %d, condition %d," + "timestep=%d, is_local=%d\n", + Handle->DestinationRank, Request.NotifyCondition, Request.TimeStep, + Handle->CommType); - /* - * Wait for the CM condition to be signalled. If it has been already, - * this returns immediately. Copying the incoming data to the waiting - * buffer has been done by the reply handler. - */ - int Ret = CMCondition_wait(Handle->cm, Handle->CMcondition); - if (!Ret) + // If possible, read locally + if (Handle->CommType == MPI_DP_LOCAL) + { + const MpiStreamWR StreamWR = + ((MpiStreamWPR)Request.StreamWPR)->StreamWR; + char *LoadedBuffer = LoadTimeStep(StreamWR, Request.TimeStep); + if (LoadedBuffer) + { + memcpy(Handle->Buffer, LoadedBuffer + Request.Offset, + Request.Length); + } + Ret = (LoadedBuffer != NULL); + } + // Otherwise, wait for remote read + else + { + Ret = CMCondition_wait(Handle->cm, Request.NotifyCondition); + } + + // Display result + if (Ret) { Svcs->verbose(Handle->CPStream, DPTraceVerbose, - "Remote memory read to rank %d with " - "condition %d has FAILED because of " - "writer failure\n", - Handle->Rank, Handle->CMcondition); + "Memory read to rank %d with condition %d and" + "length %zu has completed\n", + Handle->DestinationRank, Request.NotifyCondition, + Request.Length); } else { - if (Handle->CMcondition != -1) - Svcs->verbose(Handle->CPStream, DPTraceVerbose, - "Remote memory read to rank %d with condition %d has " - "completed\n", - Handle->Rank, Handle->CMcondition); + Svcs->verbose( + Handle->CPStream, DPTraceVerbose, + "Remote memory read to rank %d with condition %d has FAILED" + "because of " + "writer failure\n", + Handle->DestinationRank, Request.NotifyCondition); } + free(Handle); return Ret; } +/** + * MpiReadRequestHandler. + * + * This function is invoked at the writer side when a read request arrives, this + * is message is sent from MpiReadRemoteMemory. This function should noisily + * fail if the requested timestep is not found. + */ +static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + MpiReadRequestMsg ReadRequestMsg = (MpiReadRequestMsg)msg_v; + MpiStreamWPR StreamWPR = ReadRequestMsg->StreamWPR; + MpiStreamWR StreamWR = StreamWPR->StreamWR; + CP_Services Svcs = (CP_Services)client_Data; + + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiReadRequestHandler:" + "read request from reader=%d,ts=%d,off=%d,len=%d\n", + ReadRequestMsg->RequestingRank, ReadRequestMsg->TimeStep, + ReadRequestMsg->Offset, ReadRequestMsg->Length); + + PERFSTUBS_TIMER_START_FUNC(timer); + + char *RequestedData = LoadTimeStep(StreamWR, ReadRequestMsg->TimeStep); + + if (!RequestedData) + { + PERFSTUBS_TIMER_STOP_FUNC(timer); + Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose, + "Failed to read TimeStep %ld, not found\n", + ReadRequestMsg->TimeStep); + return; + } + + struct _MpiReadReplyMsg ReadReplyMsg = { + .TimeStep = ReadRequestMsg->TimeStep, + .DataLength = ReadRequestMsg->Length, + .StreamRS = ReadRequestMsg->StreamRS, + .NotifyCondition = ReadRequestMsg->NotifyCondition, + .MpiPortName = StreamWPR->MpiPortName, + }; + + Svcs->verbose( + StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiReadRequestHandler: Replying reader=%d with MPI port name=%s\n", + ReadRequestMsg->RequestingRank, StreamWPR->MpiPortName); + + Svcs->sendToPeer(StreamWR->Stream.CP_Stream, StreamWPR->Link.PeerCohort, + ReadRequestMsg->RequestingRank, StreamWR->ReadReplyFormat, + &ReadReplyMsg); + + // Send the actual Data using MPI + MPI_Comm *comm = &StreamWPR->CohortMpiComms[ReadRequestMsg->RequestingRank]; + MPI_Errhandler worldErrHandler; + MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); + int ret = + MPI_Send(RequestedData + ReadRequestMsg->Offset, ReadRequestMsg->Length, + MPI_CHAR, 0, ReadRequestMsg->NotifyCondition, *comm); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, worldErrHandler); + + if (ret != MPI_SUCCESS) + { + MPI_Comm_accept(StreamWPR->MpiPortName, MPI_INFO_NULL, 0, MPI_COMM_SELF, + comm); + Svcs->verbose( + StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiReadRequestHandler: Accepted client, Link.CohortSize=%d\n", + StreamWPR->Link.CohortSize); + MPI_Send(RequestedData + ReadRequestMsg->Offset, ReadRequestMsg->Length, + MPI_CHAR, 0, ReadRequestMsg->NotifyCondition, *comm); + } + + PERFSTUBS_TIMER_STOP_FUNC(timer); +} + +/** + * MpiReadReplyHandler. + * + * This is invoked at the Reader side when a reply is ready to be read. + */ +static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + PERFSTUBS_TIMER_START_FUNC(timer); + MpiReadReplyMsg ReadReplyMsg = (MpiReadReplyMsg)msg_v; + MpiStreamRD StreamRS = ReadReplyMsg->StreamRS; + CP_Services Svcs = (CP_Services)client_Data; + MpiCompletionHandle Handle = + CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + + Svcs->verbose( + StreamRS->Stream.CP_Stream, DPTraceVerbose, + "MpiReadReplyHandler: Read recv from rank=%d,condition=%d,size=%d\n", + Handle->DestinationRank, ReadReplyMsg->NotifyCondition, + ReadReplyMsg->DataLength); + + MPI_Comm comm = StreamRS->CohortMpiComms[Handle->DestinationRank]; + + MPI_Errhandler worldErrHandler; + MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); + int ret = MPI_Recv(Handle->Buffer, ReadReplyMsg->DataLength, MPI_CHAR, 0, + ReadReplyMsg->NotifyCondition, comm, MPI_STATUS_IGNORE); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, worldErrHandler); + + if (ret != MPI_SUCCESS) + { + MPI_Comm_connect(ReadReplyMsg->MpiPortName, MPI_INFO_NULL, 0, + MPI_COMM_SELF, &comm); + + Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose, + "MpiReadReplyHandler: Connecting to MPI Server\n"); + MPI_Recv(Handle->Buffer, ReadReplyMsg->DataLength, MPI_CHAR, 0, + ReadReplyMsg->NotifyCondition, comm, MPI_STATUS_IGNORE); + } + + StreamRS->CohortMpiComms[Handle->DestinationRank] = comm; + StreamRS->Link.Stats->DataBytesReceived += ReadReplyMsg->DataLength; + + /* + * Signal the condition to wake the reader if they are waiting. + */ + CMCondition_signal(cm, ReadReplyMsg->NotifyCondition); + PERFSTUBS_TIMER_STOP_FUNC(timer); +} + /** * ProvideTimeStep. * @@ -764,26 +764,16 @@ static void MpiProvideTimeStep(CP_Services Svcs, DP_WS_Stream Stream_v, struct _SstData *LocalMetadata, long TimeStep, void **TimeStepInfoPtr) { - MpiStreamWS Stream = (MpiStreamWS)Stream_v; - TimeStepList Entry = calloc(sizeof(struct _TimeStepEntry), 1); - struct _MpiPerTimeStepInfo *Info = - calloc(sizeof(struct _MpiPerTimeStepInfo), 1); - - Info->CheckString = calloc(sizeof(char), MPI_DP_CONTACT_STRING_LEN); - snprintf(Info->CheckString, MPI_DP_CONTACT_STRING_LEN, - "Mpi info for timestep %ld from rank %d", TimeStep, - Stream->Stream.Rank); + MpiStreamWR Stream = (MpiStreamWR)Stream_v; + TimeStepsEntry Entry = calloc(sizeof(struct _TimeStepsEntry), 1); Entry->Data = malloc(sizeof(struct _SstData)); memcpy(Entry->Data, Data, sizeof(struct _SstData)); Entry->TimeStep = TimeStep; - Entry->DP_TimeStepInfo = Info; - pthread_rwlock_wrlock(&LockTS); - Entry->Next = Stream->TimeSteps; - Stream->TimeSteps = Entry; - pthread_rwlock_unlock(&LockTS); - *TimeStepInfoPtr = Info; + pthread_rwlock_wrlock(&Stream->LockTS); + STAILQ_INSERT_TAIL(&Stream->TimeSteps, Entry, entries); + pthread_rwlock_unlock(&Stream->LockTS); } /** @@ -796,46 +786,57 @@ static void MpiProvideTimeStep(CP_Services Svcs, DP_WS_Stream Stream_v, static void MpiReleaseTimeStep(CP_Services Svcs, DP_WS_Stream Stream_v, long TimeStep) { - MpiStreamWS Stream = (MpiStreamWS)Stream_v; - TimeStepList List = Stream->TimeSteps; + MpiStreamWR Stream = (MpiStreamWR)Stream_v; Svcs->verbose(Stream->Stream.CP_Stream, DPTraceVerbose, "Releasing timestep %ld\n", TimeStep); - pthread_rwlock_wrlock(&LockTS); - if (Stream->TimeSteps->TimeStep == TimeStep) + pthread_rwlock_rdlock(&Stream->LockTS); + TimeStepsEntry EntryToDelete = STAILQ_FIRST(&Stream->TimeSteps); + pthread_rwlock_unlock(&Stream->LockTS); + + // Optimal pathway + if (EntryToDelete && EntryToDelete->TimeStep == TimeStep) { - Stream->TimeSteps = List->Next; - free(List->Data); - free(List); + pthread_rwlock_wrlock(&Stream->LockTS); + STAILQ_REMOVE_HEAD(&Stream->TimeSteps, entries); + pthread_rwlock_unlock(&Stream->LockTS); } - else + else // General pathway { - TimeStepList last = List; - List = List->Next; - while (List != NULL) + EntryToDelete = NULL; + + pthread_rwlock_rdlock(&Stream->LockTS); + STAILQ_FOREACH(EntryToDelete, &Stream->TimeSteps, entries) { - if (List->TimeStep == TimeStep) + if (EntryToDelete->TimeStep == TimeStep) { - last->Next = List->Next; - free(List->Data); - free(List); - pthread_rwlock_unlock(&LockTS); - return; + break; } - last = List; - List = List->Next; } - /* - * Shouldn't ever get here because we should never release a - * timestep that we don't have. - */ - fprintf(stderr, "Failed to release TimeStep %ld, not found\n", - TimeStep); + pthread_rwlock_unlock(&Stream->LockTS); + if (EntryToDelete) + { + pthread_rwlock_wrlock(&Stream->LockTS); + STAILQ_REMOVE(&Stream->TimeSteps, EntryToDelete, _TimeStepsEntry, + entries); + pthread_rwlock_unlock(&Stream->LockTS); + } + } + + if (EntryToDelete) + { + free(EntryToDelete->Data); + free(EntryToDelete); } - pthread_rwlock_unlock(&LockTS); } +/** + * MpiGetPriority. + * + * When MPI is initialized with MPI_THREAD_MULTIPLE this data-plane should have + * highest priority + */ static int MpiGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params) { @@ -852,6 +853,9 @@ static int MpiGetPriority(CP_Services Svcs, void *CP_Stream, return -100; } +/** + * MpiNotifyConnFailure + */ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, int FailedPeerRank) { @@ -864,108 +868,86 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, FailedPeerRank); } +/** + * MpiDestroyWriterPerReader. + * + * This is called whenever a reader disconnect from a writer. This function + * also removes the StreamWPR from its own StreamWR. + */ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) { - MpiStreamWSR StreamWSR = (MpiStreamWSR)WSR_Stream_v; - MpiStreamWS StreamWS = StreamWSR->StreamWS; + MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v; + MpiStreamWR StreamWR = StreamWPR->StreamWR; - char MpiPortName[MPI_MAX_PORT_NAME] = {0}; - const int CohortSize = StreamWSR->Link.CohortSize; - MPI_Comm *Comms_to_disconnect = calloc(sizeof(MPI_Comm), CohortSize); - - pthread_mutex_lock(&StreamWS->MutexReaders); - strncpy(MpiPortName, StreamWSR->MpiPortName, MPI_MAX_PORT_NAME); + const int CohortSize = StreamWPR->Link.CohortSize; for (int i = 0; i < CohortSize; i++) { - Comms_to_disconnect[i] = StreamWSR->ReaderContactInfo[i].MpiComm; - if (StreamWSR->ReaderContactInfo[i].ContactString) + if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) { - free(StreamWSR->ReaderContactInfo[i].ContactString); - } - } - - if (StreamWSR->ReaderContactInfo) - { - free(StreamWSR->ReaderContactInfo); - } - - StreamWSR->Link.CohortSize = 0; - - for (int i = 0; i < StreamWS->ReaderCount; i++) - { - if (StreamWS->Readers[i] == StreamWSR) - { - StreamWS->Readers[i] = StreamWS->Readers[StreamWS->ReaderCount - 1]; - - StreamWS->Readers = - realloc(StreamWS->Readers, - sizeof(*StreamWSR) * (StreamWS->ReaderCount - 1)); - StreamWS->ReaderCount--; - break; + MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]); } } + MPI_Close_port(StreamWPR->MpiPortName); - free(StreamWSR); - pthread_mutex_unlock(&StreamWS->MutexReaders); + free(StreamWPR->CohortReaderInfo); + free(StreamWPR->CohortMpiComms); - // MPI routines must be called outsie of a critical region - for (int i = 0; i < CohortSize; i++) - { - if (Comms_to_disconnect[i] != MPI_COMM_NULL) - { - MPI_Comm_disconnect(&Comms_to_disconnect[i]); - } - } - free(Comms_to_disconnect); - - MPI_Close_port(MpiPortName); + pthread_mutex_lock(&StreamWR->MutexReaders); + STAILQ_REMOVE(&StreamWR->Readers, StreamWPR, _MpiStreamWPR, entries); + pthread_mutex_unlock(&StreamWR->MutexReaders); + free(StreamWPR); } +/** + * MpiDestroyWriter + */ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) { - MpiStreamWS StreamWS = (MpiStreamWS)WS_Stream_v; + MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v; + + pthread_mutex_lock(&StreamWR->MutexReaders); + while (!STAILQ_EMPTY(&StreamWR->Readers)) + { + MpiStreamWPR Stream = STAILQ_FIRST(&StreamWR->Readers); + MpiDestroyWriterPerReader(Svcs, Stream); + } + pthread_mutex_unlock(&StreamWR->MutexReaders); - pthread_mutex_lock(&StreamWS->MutexReaders); - while (StreamWS->ReaderCount) + pthread_rwlock_wrlock(&StreamWR->LockTS); + while (!STAILQ_EMPTY(&StreamWR->TimeSteps)) { - MpiDestroyWriterPerReader(Svcs, - StreamWS->Readers[StreamWS->ReaderCount - 1]); + TimeStepsEntry Entry = STAILQ_FIRST(&StreamWR->TimeSteps); + STAILQ_REMOVE_HEAD(&StreamWR->TimeSteps, entries); + free(Entry->Data); + free(Entry); } + pthread_rwlock_unlock(&StreamWR->LockTS); - pthread_mutex_t *mutex_to_delete = &StreamWS->MutexReaders; - free(StreamWS->Readers); - free(StreamWS); - pthread_mutex_unlock(mutex_to_delete); + pthread_mutex_destroy(&StreamWR->MutexReaders); + pthread_rwlock_destroy(&StreamWR->LockTS); + free(StreamWR); } +/** + * MpiDestroyReader + */ static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) { MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v; const int CohortSize = StreamRS->Link.CohortSize; - MPI_Comm *MpiComms = calloc(sizeof(MPI_Comm), CohortSize); - - pthread_rwlock_wrlock(&LockRS); for (int i = 0; i < CohortSize; i++) { - MpiComms[i] = StreamRS->WriterContactInfo[i].MpiComm; - free(StreamRS->WriterContactInfo[i].ContactString); - } - free(StreamRS->WriterContactInfo); - free(StreamRS); - - pthread_rwlock_unlock(&LockRS); - - for (int i = 0; i < CohortSize; i++) - { - if (MpiComms[i] != MPI_COMM_NULL) + if (StreamRS->CohortMpiComms[i] != MPI_COMM_NULL) { - MPI_Comm_disconnect(&MpiComms[i]); + MPI_Comm_disconnect(&StreamRS->CohortMpiComms[i]); } } - free(MpiComms); + free(StreamRS->CohortMpiComms); + free(StreamRS->CohortWriterInfo); + free(StreamRS); } extern CP_DP_Interface LoadMpiDP()