From e2979dbd5a64b247943d1f3e44406dcb5fded6a4 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 3 Nov 2022 13:51:55 -0400 Subject: [PATCH] Minor changes to the continuation API to reflect current proposal - Swap flags and max_poll parameter in MPI_Continue_init - Swap cont_req and info parameter in MPI_Continue_init - Add new flags and remove support for info keys mpi_continue_poll_only, mpi_continue_max_poll, and mpi_continue_enqueue_complete Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 29 +----- ompi/mpiext/continue/c/continue_init.c | 4 +- ompi/mpiext/continue/c/mpiext_continue_c.h | 102 +++++++++++++++++++-- test/continuations/continutions-mt.c | 16 +--- test/continuations/continutions.c | 11 +-- 5 files changed, 108 insertions(+), 54 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index e09ba7da7b8..f88bc86913d 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -824,6 +824,7 @@ int ompi_continue_attach( } bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE); + bool defer_complete = (flags & MPIX_CONT_DEFER_COMPLETE); ompi_cont_request_t *cont_req = (ompi_cont_request_t *)continuation_request; ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb, @@ -901,7 +902,7 @@ int ompi_continue_attach( last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -num_complete); } if (0 == last_num_active) { - if (cont_req->cont_enqueue_complete || OMPI_REQUEST_INACTIVE == cont_req->super.req_state) { + if (defer_complete || OMPI_REQUEST_INACTIVE == cont_req->super.req_state) { /* enqueue for later processing */ ompi_continue_enqueue_runnable(cont); } else { @@ -930,8 +931,8 @@ int ompi_continue_attach( */ int ompi_continue_allocate_request( ompi_request_t **cont_req_ptr, - int max_poll, int flags, + int max_poll, ompi_info_t *info) { ompi_cont_request_t *cont_req = OBJ_NEW(ompi_cont_request_t); @@ -940,35 +941,11 @@ int ompi_continue_allocate_request( cont_req->cont_flags = flags; - int flag; - bool test_poll = false; - /* TODO: remove the info flag */ - ompi_info_get_bool(info, "mpi_continue_poll_only", &test_poll, &flag); - - if ((flag && test_poll)) { - cont_req->cont_flags |= MPIX_CONT_POLL_ONLY; - } - if (cont_req->cont_flags & MPIX_CONT_POLL_ONLY) { cont_req->cont_complete_list = OBJ_NEW(opal_list_t); } - /* TODO: remove this flags, it should be part of attach */ - bool enqueue_complete = false; - ompi_info_get_bool(info, "mpi_continue_enqueue_complete", &enqueue_complete, &flag); - cont_req->cont_enqueue_complete = (flag && enqueue_complete); - cont_req->continue_max_poll = max_poll; - /* TODO: remove this flag, it's explicit now */ - opal_cstring_t *value_str; - ompi_info_get(info, "mpi_continue_max_poll", &value_str, &flag); - if (flag) { - int max_poll = atoi(value_str->string); - OBJ_RELEASE(value_str); - if (max_poll > 0) { - cont_req->continue_max_poll = max_poll; - } - } if (0 == cont_req->continue_max_poll) { cont_req->continue_max_poll = UINT32_MAX; } diff --git a/ompi/mpiext/continue/c/continue_init.c b/ompi/mpiext/continue/c/continue_init.c index 1be90888e66..1be127ca559 100644 --- a/ompi/mpiext/continue/c/continue_init.c +++ b/ompi/mpiext/continue/c/continue_init.c @@ -36,8 +36,8 @@ static const char FUNC_NAME[] = "MPIX_Continue_init"; int MPIX_Continue_init( int max_poll, int flags, - MPI_Request *cont_req, - MPI_Info info) + MPI_Info info, + MPI_Request *cont_req) { int rc = MPI_SUCCESS; diff --git a/ompi/mpiext/continue/c/mpiext_continue_c.h b/ompi/mpiext/continue/c/mpiext_continue_c.h index 90fc175c114..8352fca1bbe 100644 --- a/ompi/mpiext/continue/c/mpiext_continue_c.h +++ b/ompi/mpiext/continue/c/mpiext_continue_c.h @@ -16,17 +16,105 @@ #include +/** + * Mark the continuation request(s) as volatile. + * Generally, the request buffer should remain accessible until the continuation is invoked + * and will be set to MPI_REQUEST_NULL before the continuation is invoked. + * However, if this flag is specified the requests are not accessed after the call to + * MPI_Continue[all] returns. + */ #define MPIX_CONT_REQBUF_VOLATILE 1<<0 -/* the continuation is persistent (only valid with persistent requests) */ -#define MPIX_CONT_PERSISTENT 1<<1 -#define MPIX_CONT_POLL_ONLY 1<<2 -typedef int (MPIX_Continue_cb_function)(int rc, void *user_data); -OMPI_DECLSPEC int MPIX_Continue_init(int max_poll, int flags, MPI_Request *cont_req, MPI_Info info); +/** + * the continuation is persistent (only valid with persistent requests) + * TODO: not implemented yet + */ +#define MPIX_CONT_PERSISTENT 1<<1 + +/* + * mark the continuation request as poll-only, i.e., only execute continuations + * when testing/waiting for the continuation request to complete + */ +#define MPIX_CONT_POLL_ONLY 1<<2 + +/* Wwhether the execution of continuations is deferred in MPI_Continue or + * MPI_Continueall if all operations are complete. + * By default, continuations eligible for execution are invoked immediately. */ +#define MPIX_CONT_DEFER_COMPLETE 1<<3 + +/* whether failed continuations will be invoked and passed the error code + * TODO: not implemented yet + */ +#define MPIX_CONT_INVOKE_FAILED 1<<4 + +/** + * Completion callback signature: + * \param rc an error code (MPI_SUCCESS, unless MPIX_CONT_INVOKE_FAILED is provided) + * \param cb_data the pointer passed as cb_data to MPI_Continue[all] + * \returns MPI_SUCECSS on success, an error code to mark the continuation as failed + */ +typedef int (MPIX_Continue_cb_function)(int rc, void *cb_data); + +/** + * Initialize a continuation request. + * \param flags 0 or \ref MPIX_CONT_POLL_ONLY + * \param max_poll the maximum number of continuations to execute when testing + * the continuation request for completion or MPI_UNDEFINED for + * unlimited execution of eligible continuations + * \param info info object used to further control the behavior of the continuation request. + * Currently supported: + * - mpi_continue_thread: either "all" (any thread in the process may execute callbacks) + * or "application" (only application threads may execute callbacks; default) + * - mpi_continue_async_signal_safe: whether the callbacks may be executed from within a signal handler + * \param[out] cont_req the newly created continuation request + */ +OMPI_DECLSPEC int MPIX_Continue_init(int flags, int max_poll, MPI_Info info, MPI_Request *cont_req); + +/** + * Attach a new continuation to the operation represented by \c request and register it with the continuation request \c cont_req. + * The callback will be executed once the operation has completed and will be passed the \c cb_data pointer. + * + * \param request the request representing the the operation to attach a continuation to + * \param cb the callback to invoke upon completion, with signature \ref MPIX_Continue_cb_function + * \param cb_data the user-data to pass to the callback + * \param flags 0 or OR-combination of \ref MPIX_CONT_REQBUF_VOLATILE, \ref MPIX_CONT_PERSISTENT, + * \ref MPIX_CONT_DEFER_COMPLETE, \ref MPIX_CONT_INVOKE_FAILED + * \param status MPI_STATUS_IGNORE or a pointer to a status object that will be a filled before the callback is invoked + * \param cont_req a continuation request created through \ref MPIX_Continue_init + */ OMPI_DECLSPEC int MPIX_Continue(MPI_Request *request, MPIX_Continue_cb_function *cb, void *cb_data, int flags, MPI_Status *status, MPI_Request cont_req); -OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request request[], MPIX_Continue_cb_function *cb, void *cb_data, + +/** + * Attach a new continuation to the operations represented by the \c count \c requests and + * register it with the continuation request \c cont_req. + * The callback will be executed once the operations have completed and will be passed the \c cb_data pointer. + * + * \param count the number of requests in \c requests + * \param requests the requests representing the the operations to attach a continuation to + * \param cb the callback to invoke upon completion of all operations, with signature \ref MPIX_Continue_cb_function + * \param cb_data the user-data to pass to the callback + * \param flags 0 or OR-combination of \ref MPIX_CONT_REQBUF_VOLATILE, \ref MPIX_CONT_PERSISTENT, + * \ref MPIX_CONT_DEFER_COMPLETE, \ref MPIX_CONT_INVOKE_FAILED + * \param status MPI_STATUS_IGNORE or a pointer to a status object that will be a filled before the callback is invoked + * \param cont_req a continuation request created through \ref MPIX_Continue_init + */ +OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request requests[], MPIX_Continue_cb_function *cb, void *cb_data, int flags, MPI_Status status[], MPI_Request cont_req); -OMPI_DECLSPEC int MPIX_Continue_get_failed( MPI_Request cont_req, int *count, void **cb_data); + +/** + * Query the callback data for failed continuations, i.e., continuations that returned a value other than + * MPI_SUCCESS or whose operations experienced an error. + * The applications passes in \c cb_data an array of size \c count. Upon return, \c count will be set + * to the actual number of elements stored in \c cb_data. If the resulting \c count equals \c count + * on input there may be more failed continuations to query and the call should be repeated. + * \note Handling of failed continuations requires an error handler for the involved operations that does not abort and + * is not supported if \ref MPIX_CONT_REQBUF_VOLATILE is used. + * + * \param cont_req The continuation request from which to query failed continuations + * \param[inout] count The maximum number of elements to be stored in \c cb_data + * \param cb_data Buffer of size \c count elements to store the callback data of failed continuations into + */ +OMPI_DECLSPEC int MPIX_Continue_get_failed(MPI_Request cont_req, int *count, void **cb_data); #endif // MPIEXT_CONTINUE_C_H diff --git a/test/continuations/continutions-mt.c b/test/continuations/continutions-mt.c index af48d77c416..b81422a1f74 100644 --- a/test/continuations/continutions-mt.c +++ b/test/continuations/continutions-mt.c @@ -76,7 +76,7 @@ int main(int argc, char *argv[]) sleep(2); /* initialize the continuation request */ - MPIX_Continue_init(0, 0, &cont_req, MPI_INFO_NULL); + MPIX_Continue_init(0, 0, MPI_INFO_NULL, &cont_req); MPI_Start(&cont_req); @@ -113,14 +113,8 @@ int main(int argc, char *argv[]) /**************************************************************** * Do the same thing, but with a poll-only continuation request ****************************************************************/ - - MPI_Info info; - MPI_Info_create(&info); - MPI_Info_set(info, "mpi_continue_poll_only", "true"); - MPI_Info_set(info, "mpi_continue_enqueue_complete", "true"); - /* initialize the continuation request */ - MPIX_Continue_init(0, 0, &cont_req, info); + MPIX_Continue_init(MPIX_CONT_POLL_ONLY, MPI_UNDEFINED, MPI_INFO_NULL, &cont_req); MPI_Info_free(&info); @@ -133,7 +127,7 @@ int main(int argc, char *argv[]) MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); cb_cnt = 0; - MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, 0, MPI_STATUSES_IGNORE, cont_req); + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUSES_IGNORE, cont_req); MPI_Wait(&cont_req, MPI_STATUS_IGNORE); assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); assert(cb_cnt == 1); @@ -145,10 +139,10 @@ int main(int argc, char *argv[]) */ cb_cnt = 0; MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); - MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req); MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); - MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req); MPI_Wait(&cont_req, MPI_STATUS_IGNORE); assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); diff --git a/test/continuations/continutions.c b/test/continuations/continutions.c index 3f665b0c3eb..b1779bbda31 100644 --- a/test/continuations/continutions.c +++ b/test/continuations/continutions.c @@ -52,7 +52,7 @@ int main(int argc, char *argv[]) MPI_Comm_rank(MPI_COMM_WORLD, &rank); /* initialize the continuation request */ - MPIX_Continue_init(0, 0, &cont_req, MPI_INFO_NULL); + MPIX_Continue_init(0, 0, MPI_INFO_NULL, &cont_req); MPI_Start(&cont_req); @@ -92,13 +92,8 @@ int main(int argc, char *argv[]) /** * One send, one recv, two continuations in two continuation requests */ - MPI_Info info; - MPI_Info_create(&info); - MPI_Info_set(info, "mpi_continue_poll_only", "true"); - MPI_Info_set(info, "mpi_continue_enqueue_complete", "true"); - /* initialize a poll-only continuation request */ - MPIX_Continue_init(0, 0, &cont_req2, info); + MPIX_Continue_init(MPIX_CONT_POLL_ONLY, MPI_UNDEFINED, MPI_INFO_NULL, &cont_req2); MPI_Start(&cont_req2); @@ -107,7 +102,7 @@ int main(int argc, char *argv[]) MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); - MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req2); + MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req2); MPI_Wait(&cont_req, MPI_STATUS_IGNORE); assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL);