Skip to content

Commit

Permalink
osc/pt2pt: make lock_all locking on-demand
Browse files Browse the repository at this point in the history
The original lock_all algorithm in osc/pt2pt sent a lock message to
each peer in the communicator even if the peer is never the target of
an operation. Since this scales very poorly the implementation has
been replaced by one that locks the remote peer on first communication
after a call to MPI_Win_lock_all.

Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
  • Loading branch information
hjelmn committed Aug 11, 2016
1 parent 7589a25 commit 9444df1
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 36 deletions.
67 changes: 63 additions & 4 deletions ompi/mca/osc/pt2pt/osc_pt2pt.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ struct ompi_osc_pt2pt_component_t {
};
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;

enum {
/** peer has sent an unexpected post message (no matching start) */
OMPI_OSC_PT2PT_PEER_FLAG_UNEX = 1,
/** eager sends are active on this peer */
OMPI_OSC_PT2PT_PEER_FLAG_EAGER = 2,
/** peer has been locked (on-demand locking for lock_all) */
OMPI_OSC_PT2PT_PEER_FLAG_LOCK = 4,
};


struct ompi_osc_pt2pt_peer_t {
/** make this an opal object */
Expand All @@ -111,13 +120,54 @@ struct ompi_osc_pt2pt_peer_t {
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
int32_t passive_incoming_frag_count;

/** unexpected post message arrived */
bool unexpected_post;
/** peer flags */
int32_t flags;
};
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;

OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);

static inline bool ompi_osc_pt2pt_peer_locked (ompi_osc_pt2pt_peer_t *peer)
{
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_LOCK);
}

static inline bool ompi_osc_pt2pt_peer_unex (ompi_osc_pt2pt_peer_t *peer)
{
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_UNEX);
}

static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer)
{
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER);
}

static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value)
{
if (value) {
peer->flags |= flag;
} else {
peer->flags &= ~flag;
}
}

static inline bool ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value)
{
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_LOCK, value);
}

static inline bool ompi_osc_pt2pt_peer_set_unex (ompi_osc_pt2pt_peer_t *peer, bool value)
{
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_UNEX, value);
}

static inline bool ompi_osc_pt2pt_peer_set_eager_active (ompi_osc_pt2pt_peer_t *peer, bool value)
{
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_EAGER, value);
}

OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);

/** Module structure. Exactly one of these is associated with each
PT2PT window */
struct ompi_osc_pt2pt_module_t {
Expand Down Expand Up @@ -431,6 +481,8 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module,
int tag,
struct ompi_communicator_t *comm);

int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock);

/**
* ompi_osc_pt2pt_progress_pending_acc:
*
Expand Down Expand Up @@ -845,6 +897,12 @@ static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_modu
static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target,
struct ompi_osc_pt2pt_peer_t **peer)
{
ompi_osc_pt2pt_peer_t *tmp;

if (NULL == peer) {
peer = &tmp;
}

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc/pt2pt: looking for synchronization object for target %d", target));

Expand All @@ -862,8 +920,9 @@ static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc

/* fence epoch is now active */
module->all_sync.epoch_active = true;
if (peer) {
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && !ompi_osc_pt2pt_peer_locked (*peer)) {
(void) ompi_osc_pt2pt_lock_remote (module, target, &module->all_sync);
}

return &module->all_sync;
Expand Down
6 changes: 3 additions & 3 deletions ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
for (int i = 0 ; i < sync->num_peers ; ++i) {
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];

if (peer->unexpected_post) {
if (ompi_osc_pt2pt_peer_unex (peer)) {
/* the peer already sent a post message for this pscw access epoch */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"found unexpected post from %d",
peer->rank));
OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
peer->unexpected_post = false;
ompi_osc_pt2pt_peer_set_unex (peer, false);
}
}
OPAL_THREAD_UNLOCK(&sync->lock);
Expand Down Expand Up @@ -600,7 +600,7 @@ void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
"received unexpected post message from %d for future PSCW synchronization",
source));

peer->unexpected_post = true;
ompi_osc_pt2pt_peer_set_unex (peer, true);
OPAL_THREAD_UNLOCK(&sync->lock);
} else {
OPAL_THREAD_UNLOCK(&sync->lock);
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
peer->active_frag = NULL;
peer->passive_incoming_frag_count = 0;
peer->unexpected_post = false;
peer->flags = 0;
}

static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
Expand Down
62 changes: 35 additions & 27 deletions ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
const int my_rank = ompi_comm_rank (module->comm);
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
int lock_type = lock->sync.lock.type;
bool acquired = false;

assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);

(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);

acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
if (!acquired) {
/* queue the lock */
Expand All @@ -73,6 +76,9 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp
ompi_osc_pt2pt_sync_wait_expected (lock);
}

ompi_osc_pt2pt_peer_set_locked (peer, true);
ompi_osc_pt2pt_peer_set_eager_active (peer, true);

OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"local lock aquired"));

Expand All @@ -81,8 +87,12 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp

static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
const int my_rank = ompi_comm_rank (module->comm);
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
int lock_type = lock->sync.lock.type;

(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);

assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);

OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
Expand All @@ -98,16 +108,22 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module,
/* need to ensure we make progress */
opal_progress();

ompi_osc_pt2pt_peer_set_locked (peer, false);
ompi_osc_pt2pt_peer_set_eager_active (peer, false);

ompi_osc_pt2pt_sync_expected (lock);
}

static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
int lock_type = lock->sync.lock.type;
ompi_osc_pt2pt_header_lock_t lock_req;

int ret;

(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);

assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);

/* generate a lock request */
Expand All @@ -128,6 +144,9 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i

/* make sure the request gets sent, so we can start eager sending... */
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
ompi_osc_pt2pt_peer_set_locked (peer, true);
}

return ret;
}
Expand All @@ -140,6 +159,8 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_header_unlock_t unlock_req;
int ret;

(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);

assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);

unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ;
Expand Down Expand Up @@ -169,6 +190,9 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
return ret;
}

ompi_osc_pt2pt_peer_set_locked (peer, false);
ompi_osc_pt2pt_peer_set_eager_active (peer, false);

return ompi_osc_pt2pt_frag_flush_target(module, target);
}

Expand All @@ -179,6 +203,8 @@ static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module,
int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
int ret;

(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);

assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);

flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
Expand Down Expand Up @@ -218,8 +244,6 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);

if (0 == (assert & MPI_MODE_NOCHECK)) {
lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;

if (my_rank != target && target != -1) {
ret = ompi_osc_pt2pt_lock_remote (module, target, lock);
} else {
Expand All @@ -231,19 +255,7 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
return ret;
}

if (-1 == target) {
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
if (my_rank == i) {
continue;
}

ret = ompi_osc_pt2pt_lock_remote (module, i, lock);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}

}
/* for lock_all there is nothing more to do. we will lock peer's on demand */
} else {
lock->eager_send_active = true;
}
Expand Down Expand Up @@ -312,7 +324,7 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
lock->sync.lock.target = target;
lock->sync.lock.type = lock_type;
lock->sync.lock.assert = assert;

lock->num_peers = (-1 == target) ? ompi_comm_size (&module->comm) : 1;
lock->sync_expected = 0;

/* delay all eager sends until we've heard back.. */
Expand Down Expand Up @@ -376,13 +388,13 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
"ompi_osc_pt2pt_unlock_internal: all lock acks received"));

if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) {
lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;

if (my_rank != target) {
if (-1 == target) {
/* send unlock messages to all of my peers */
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
if (my_rank == i) {
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, i);

if (my_rank == i || !ompi_osc_pt2pt_peer_locked (peer)) {
continue;
}

Expand Down Expand Up @@ -469,12 +481,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
able to eager send before we can transfer all the data... */
ompi_osc_pt2pt_sync_wait_expected (lock);

if (-1 == target) {
lock->sync_expected = ompi_comm_size(module->comm) - 1;
} else {
lock->sync_expected = 1;
}

if (-1 == target) {
/* NTH: no local flush */
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
Expand All @@ -488,7 +494,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
}
}
} else {

/* send control message with flush request and count */
ret = ompi_osc_pt2pt_flush_remote (module, target, lock);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
Expand Down Expand Up @@ -787,6 +792,7 @@ int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source,
void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, lock_ack_header->source);
ompi_osc_pt2pt_sync_t *lock;

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
Expand All @@ -796,6 +802,8 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr;
assert (NULL != lock);

ompi_osc_pt2pt_peer_set_eager_active (peer, true);

ompi_osc_pt2pt_sync_expected (lock);
}

Expand Down
4 changes: 3 additions & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync)
int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
if (0 == new_value) {
OPAL_THREAD_LOCK(&sync->lock);
sync->eager_send_active = true;
if (!(sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK && sync->num_peers > 1)) {
sync->eager_send_active = true;
}
opal_condition_broadcast (&sync->cond);
OPAL_THREAD_UNLOCK(&sync->lock);
}
Expand Down

0 comments on commit 9444df1

Please sign in to comment.