Skip to content

Commit

Permalink
ompi/request: change semantics of ompi request callbacks
Browse files Browse the repository at this point in the history
This commit changes the sematics of ompi request callbacks. If a
request's callback has freed or re-posted (using start) a request
the callback must return 1 instead of OMPI_SUCCESS. This indicates
to ompi_request_complete that the request should not be modified
further. This fixes a race condition in osc/pt2pt that could lead
to the req_state being inconsistent if a request is freed between
the callback and setting the request as complete.

Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
  • Loading branch information
hjelmn committed Aug 18, 2016
1 parent ce01246 commit 6aa658a
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 72 deletions.
26 changes: 4 additions & 22 deletions ompi/mca/osc/pt2pt/osc_pt2pt.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,6 @@ struct ompi_osc_pt2pt_module_t {
/** Lock for garbage collection lists */
opal_mutex_t gc_lock;

/** List of requests that need to be freed */
opal_list_t request_gc;

/** List of buffers that need to be freed */
opal_list_t buffer_gc;
};
Expand Down Expand Up @@ -658,41 +655,26 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, con
}

/**
* osc_pt2pt_request_gc_clean:
* osc_pt2pt_gc_clean:
*
* @short Release finished PML requests and accumulate buffers.
*
* @long This function exists because it is not possible to free a PML request
* or buffer from a request completion callback. We instead put requests
* and buffers on the module's garbage collection lists and release then
* at a later time.
* @long This function exists because it is not possible to free a buffer from
* a request completion callback. We instead put requests and buffers on the
* module's garbage collection lists and release then at a later time.
*/
static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
{
ompi_request_t *request;
opal_list_item_t *item;

OPAL_THREAD_LOCK(&module->gc_lock);

while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) {
OPAL_THREAD_UNLOCK(&module->gc_lock);
ompi_request_free (&request);
OPAL_THREAD_LOCK(&module->gc_lock);
}

while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) {
OBJ_RELEASE(item);
}

OPAL_THREAD_UNLOCK(&module->gc_lock);
}

static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request)
{
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
opal_list_append (&module->request_gc, (opal_list_item_t *) request));
}

static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer)
{
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
Expand Down
10 changes: 4 additions & 6 deletions ompi/mca/osc/pt2pt/osc_pt2pt_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request)

mark_outgoing_completion(module);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
ompi_request_free (&request);

return OMPI_SUCCESS;
return 1;
}

static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
Expand Down Expand Up @@ -101,10 +100,9 @@ static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
assert (NULL != module);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
ompi_request_free (&request);

return OMPI_SUCCESS;
return 1;
}

/* self communication optimizations */
Expand Down
1 change: 0 additions & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
Expand Down
34 changes: 11 additions & 23 deletions ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,8 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
/* free the temporary buffer */
free (ctx);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down Expand Up @@ -437,10 +435,8 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)

mark_incoming_completion (module, rank);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

struct osc_pt2pt_get_post_send_cb_data_t {
Expand All @@ -460,10 +456,8 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
/* mark this as a completed "incoming" request */
mark_incoming_completion (module, rank);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down Expand Up @@ -699,9 +693,7 @@ static int accumulate_cb (ompi_request_t *request)
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
}

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

ompi_request_free (&request);
return ret;
}

Expand Down Expand Up @@ -771,13 +763,11 @@ static int replace_cb (ompi_request_t *request)

mark_incoming_completion (module, rank);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

/* unlock the accumulate lock */
ompi_osc_pt2pt_accumulate_unlock (module);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down Expand Up @@ -1435,13 +1425,11 @@ static int process_large_datatype_request_cb (ompi_request_t *request)
return OMPI_ERROR;
}

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

/* free the datatype buffer */
osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down
6 changes: 2 additions & 4 deletions ompi/mca/osc/pt2pt/osc_pt2pt_frag.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ static int frag_send_cb (ompi_request_t *request)
mark_outgoing_completion(module);
opal_free_list_return (&mca_osc_pt2pt_component.frags, &frag->super);

ompi_request_free (&request);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
return 1;
}

static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag)
Expand Down
1 change: 0 additions & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ int ompi_osc_pt2pt_free(ompi_win_t *win)
OPAL_LIST_DESTRUCT(&module->pending_acc);

osc_pt2pt_gc_clean (module);
OPAL_LIST_DESTRUCT(&module->request_gc);
OPAL_LIST_DESTRUCT(&module->buffer_gc);
OBJ_DESTRUCT(&module->gc_lock);

Expand Down
36 changes: 21 additions & 15 deletions ompi/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ typedef int (*ompi_request_cancel_fn_t)(struct ompi_request_t* request, int flag

/*
* Optional function called when the request is completed from the MPI
* library perspective. This function is not allowed to release any
* ressources related to the request.
* library perspective. This function is allowed to release the request if
* the request will not be used with ompi_request_wait* or ompi_request_test.
* If the function reposts (using start) a request or calls ompi_request_free()
* on the request it *MUST* return 1. It should return 0 otherwise.
*/
typedef int (*ompi_request_complete_fn_t)(struct ompi_request_t* request);

Expand Down Expand Up @@ -412,24 +414,28 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
*/
static inline int ompi_request_complete(ompi_request_t* request, bool with_signal)
{
int rc = 0;

if( NULL != request->req_complete_cb) {
request->req_complete_cb( request );
rc = request->req_complete_cb( request );
request->req_complete_cb = NULL;
}

if( OPAL_LIKELY(with_signal) ) {
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
REQUEST_COMPLETED);
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
if( REQUEST_PENDING != tmp_sync )
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
if (0 == rc) {
if( OPAL_LIKELY(with_signal) ) {
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
REQUEST_COMPLETED);
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
if( REQUEST_PENDING != tmp_sync )
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
}
} else
request->req_complete = REQUEST_COMPLETED;

if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
ompi_request_failed++;
}
} else
request->req_complete = REQUEST_COMPLETED;

if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
ompi_request_failed++;
}

return OMPI_SUCCESS;
Expand Down

0 comments on commit 6aa658a

Please sign in to comment.