Skip to content

Commit

Permalink
Minor changes to the continuation API to reflect current proposal
Browse files Browse the repository at this point in the history
- Swap flags and max_poll parameter in MPI_Continue_init
- Swap cont_req and info parameter in MPI_Continue_init
- Add new flags and remove support for info keys mpi_continue_poll_only,
  mpi_continue_max_poll, and mpi_continue_enqueue_complete

Signed-off-by: Joseph Schuchart <schuchart@icl.utk.edu>
  • Loading branch information
devreal committed Nov 22, 2022
1 parent cdace68 commit e2979db
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 54 deletions.
29 changes: 3 additions & 26 deletions ompi/mpiext/continue/c/continuation.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ int ompi_continue_attach(
}

bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE);
bool defer_complete = (flags & MPIX_CONT_DEFER_COMPLETE);

ompi_cont_request_t *cont_req = (ompi_cont_request_t *)continuation_request;
ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb,
Expand Down Expand Up @@ -901,7 +902,7 @@ int ompi_continue_attach(
last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -num_complete);
}
if (0 == last_num_active) {
if (cont_req->cont_enqueue_complete || OMPI_REQUEST_INACTIVE == cont_req->super.req_state) {
if (defer_complete || OMPI_REQUEST_INACTIVE == cont_req->super.req_state) {
/* enqueue for later processing */
ompi_continue_enqueue_runnable(cont);
} else {
Expand Down Expand Up @@ -930,8 +931,8 @@ int ompi_continue_attach(
*/
int ompi_continue_allocate_request(
ompi_request_t **cont_req_ptr,
int max_poll,
int flags,
int max_poll,
ompi_info_t *info)
{
ompi_cont_request_t *cont_req = OBJ_NEW(ompi_cont_request_t);
Expand All @@ -940,35 +941,11 @@ int ompi_continue_allocate_request(

cont_req->cont_flags = flags;

int flag;
bool test_poll = false;
/* TODO: remove the info flag */
ompi_info_get_bool(info, "mpi_continue_poll_only", &test_poll, &flag);

if ((flag && test_poll)) {
cont_req->cont_flags |= MPIX_CONT_POLL_ONLY;
}

if (cont_req->cont_flags & MPIX_CONT_POLL_ONLY) {
cont_req->cont_complete_list = OBJ_NEW(opal_list_t);
}

/* TODO: remove this flags, it should be part of attach */
bool enqueue_complete = false;
ompi_info_get_bool(info, "mpi_continue_enqueue_complete", &enqueue_complete, &flag);
cont_req->cont_enqueue_complete = (flag && enqueue_complete);

cont_req->continue_max_poll = max_poll;
/* TODO: remove this flag, it's explicit now */
opal_cstring_t *value_str;
ompi_info_get(info, "mpi_continue_max_poll", &value_str, &flag);
if (flag) {
int max_poll = atoi(value_str->string);
OBJ_RELEASE(value_str);
if (max_poll > 0) {
cont_req->continue_max_poll = max_poll;
}
}
if (0 == cont_req->continue_max_poll) {
cont_req->continue_max_poll = UINT32_MAX;
}
Expand Down
4 changes: 2 additions & 2 deletions ompi/mpiext/continue/c/continue_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ static const char FUNC_NAME[] = "MPIX_Continue_init";
int MPIX_Continue_init(
int max_poll,
int flags,
MPI_Request *cont_req,
MPI_Info info)
MPI_Info info,
MPI_Request *cont_req)
{
int rc = MPI_SUCCESS;

Expand Down
102 changes: 95 additions & 7 deletions ompi/mpiext/continue/c/mpiext_continue_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,105 @@

#include <mpi.h>

/**
* Mark the continuation request(s) as volatile.
* Generally, the request buffer should remain accessible until the continuation is invoked
* and will be set to MPI_REQUEST_NULL before the continuation is invoked.
* However, if this flag is specified the requests are not accessed after the call to
* MPI_Continue[all] returns.
*/
#define MPIX_CONT_REQBUF_VOLATILE 1<<0
/* the continuation is persistent (only valid with persistent requests) */
#define MPIX_CONT_PERSISTENT 1<<1
#define MPIX_CONT_POLL_ONLY 1<<2

typedef int (MPIX_Continue_cb_function)(int rc, void *user_data);
OMPI_DECLSPEC int MPIX_Continue_init(int max_poll, int flags, MPI_Request *cont_req, MPI_Info info);
/**
* the continuation is persistent (only valid with persistent requests)
* TODO: not implemented yet
*/
#define MPIX_CONT_PERSISTENT 1<<1

/*
* mark the continuation request as poll-only, i.e., only execute continuations
* when testing/waiting for the continuation request to complete
*/
#define MPIX_CONT_POLL_ONLY 1<<2

/* Wwhether the execution of continuations is deferred in MPI_Continue or
* MPI_Continueall if all operations are complete.
* By default, continuations eligible for execution are invoked immediately. */
#define MPIX_CONT_DEFER_COMPLETE 1<<3

/* whether failed continuations will be invoked and passed the error code
* TODO: not implemented yet
*/
#define MPIX_CONT_INVOKE_FAILED 1<<4

/**
* Completion callback signature:
* \param rc an error code (MPI_SUCCESS, unless MPIX_CONT_INVOKE_FAILED is provided)
* \param cb_data the pointer passed as cb_data to MPI_Continue[all]
* \returns MPI_SUCECSS on success, an error code to mark the continuation as failed
*/
typedef int (MPIX_Continue_cb_function)(int rc, void *cb_data);

/**
* Initialize a continuation request.
* \param flags 0 or \ref MPIX_CONT_POLL_ONLY
* \param max_poll the maximum number of continuations to execute when testing
* the continuation request for completion or MPI_UNDEFINED for
* unlimited execution of eligible continuations
* \param info info object used to further control the behavior of the continuation request.
* Currently supported:
* - mpi_continue_thread: either "all" (any thread in the process may execute callbacks)
* or "application" (only application threads may execute callbacks; default)
* - mpi_continue_async_signal_safe: whether the callbacks may be executed from within a signal handler
* \param[out] cont_req the newly created continuation request
*/
OMPI_DECLSPEC int MPIX_Continue_init(int flags, int max_poll, MPI_Info info, MPI_Request *cont_req);

/**
* Attach a new continuation to the operation represented by \c request and register it with the continuation request \c cont_req.
* The callback will be executed once the operation has completed and will be passed the \c cb_data pointer.
*
* \param request the request representing the the operation to attach a continuation to
* \param cb the callback to invoke upon completion, with signature \ref MPIX_Continue_cb_function
* \param cb_data the user-data to pass to the callback
* \param flags 0 or OR-combination of \ref MPIX_CONT_REQBUF_VOLATILE, \ref MPIX_CONT_PERSISTENT,
* \ref MPIX_CONT_DEFER_COMPLETE, \ref MPIX_CONT_INVOKE_FAILED
* \param status MPI_STATUS_IGNORE or a pointer to a status object that will be a filled before the callback is invoked
* \param cont_req a continuation request created through \ref MPIX_Continue_init
*/
OMPI_DECLSPEC int MPIX_Continue(MPI_Request *request, MPIX_Continue_cb_function *cb, void *cb_data,
int flags, MPI_Status *status, MPI_Request cont_req);
OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request request[], MPIX_Continue_cb_function *cb, void *cb_data,

/**
* Attach a new continuation to the operations represented by the \c count \c requests and
* register it with the continuation request \c cont_req.
* The callback will be executed once the operations have completed and will be passed the \c cb_data pointer.
*
* \param count the number of requests in \c requests
* \param requests the requests representing the the operations to attach a continuation to
* \param cb the callback to invoke upon completion of all operations, with signature \ref MPIX_Continue_cb_function
* \param cb_data the user-data to pass to the callback
* \param flags 0 or OR-combination of \ref MPIX_CONT_REQBUF_VOLATILE, \ref MPIX_CONT_PERSISTENT,
* \ref MPIX_CONT_DEFER_COMPLETE, \ref MPIX_CONT_INVOKE_FAILED
* \param status MPI_STATUS_IGNORE or a pointer to a status object that will be a filled before the callback is invoked
* \param cont_req a continuation request created through \ref MPIX_Continue_init
*/
OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request requests[], MPIX_Continue_cb_function *cb, void *cb_data,
int flags, MPI_Status status[], MPI_Request cont_req);
OMPI_DECLSPEC int MPIX_Continue_get_failed( MPI_Request cont_req, int *count, void **cb_data);

/**
* Query the callback data for failed continuations, i.e., continuations that returned a value other than
* MPI_SUCCESS or whose operations experienced an error.
* The applications passes in \c cb_data an array of size \c count. Upon return, \c count will be set
* to the actual number of elements stored in \c cb_data. If the resulting \c count equals \c count
* on input there may be more failed continuations to query and the call should be repeated.
* \note Handling of failed continuations requires an error handler for the involved operations that does not abort and
* is not supported if \ref MPIX_CONT_REQBUF_VOLATILE is used.
*
* \param cont_req The continuation request from which to query failed continuations
* \param[inout] count The maximum number of elements to be stored in \c cb_data
* \param cb_data Buffer of size \c count elements to store the callback data of failed continuations into
*/
OMPI_DECLSPEC int MPIX_Continue_get_failed(MPI_Request cont_req, int *count, void **cb_data);

#endif // MPIEXT_CONTINUE_C_H
16 changes: 5 additions & 11 deletions test/continuations/continutions-mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ int main(int argc, char *argv[])
sleep(2);

/* initialize the continuation request */
MPIX_Continue_init(0, 0, &cont_req, MPI_INFO_NULL);
MPIX_Continue_init(0, 0, MPI_INFO_NULL, &cont_req);

MPI_Start(&cont_req);

Expand Down Expand Up @@ -113,14 +113,8 @@ int main(int argc, char *argv[])
/****************************************************************
* Do the same thing, but with a poll-only continuation request
****************************************************************/

MPI_Info info;
MPI_Info_create(&info);
MPI_Info_set(info, "mpi_continue_poll_only", "true");
MPI_Info_set(info, "mpi_continue_enqueue_complete", "true");

/* initialize the continuation request */
MPIX_Continue_init(0, 0, &cont_req, info);
MPIX_Continue_init(MPIX_CONT_POLL_ONLY, MPI_UNDEFINED, MPI_INFO_NULL, &cont_req);

MPI_Info_free(&info);

Expand All @@ -133,7 +127,7 @@ int main(int argc, char *argv[])
MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]);

cb_cnt = 0;
MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, 0, MPI_STATUSES_IGNORE, cont_req);
MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUSES_IGNORE, cont_req);
MPI_Wait(&cont_req, MPI_STATUS_IGNORE);
assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL);
assert(cb_cnt == 1);
Expand All @@ -145,10 +139,10 @@ int main(int argc, char *argv[])
*/
cb_cnt = 0;
MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]);
MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req);
MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req);

MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]);
MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req);
MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req);

MPI_Wait(&cont_req, MPI_STATUS_IGNORE);
assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL);
Expand Down
11 changes: 3 additions & 8 deletions test/continuations/continutions.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ int main(int argc, char *argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

/* initialize the continuation request */
MPIX_Continue_init(0, 0, &cont_req, MPI_INFO_NULL);
MPIX_Continue_init(0, 0, MPI_INFO_NULL, &cont_req);

MPI_Start(&cont_req);

Expand Down Expand Up @@ -92,13 +92,8 @@ int main(int argc, char *argv[])
/**
* One send, one recv, two continuations in two continuation requests
*/
MPI_Info info;
MPI_Info_create(&info);
MPI_Info_set(info, "mpi_continue_poll_only", "true");
MPI_Info_set(info, "mpi_continue_enqueue_complete", "true");

/* initialize a poll-only continuation request */
MPIX_Continue_init(0, 0, &cont_req2, info);
MPIX_Continue_init(MPIX_CONT_POLL_ONLY, MPI_UNDEFINED, MPI_INFO_NULL, &cont_req2);

MPI_Start(&cont_req2);

Expand All @@ -107,7 +102,7 @@ int main(int argc, char *argv[])
MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req);

MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]);
MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req2);
MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req2);

MPI_Wait(&cont_req, MPI_STATUS_IGNORE);
assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL);
Expand Down

0 comments on commit e2979db

Please sign in to comment.