Skip to content

Commit

Permalink
Merge pull request #12985 from amd-nithyavs/17Dec2024_bug_fixes
Browse files Browse the repository at this point in the history
coll/acoll: A few miscellaneous bugfixes
  • Loading branch information
mshanthagit authored Jan 15, 2025
2 parents b09a79d + 3e67dd1 commit 398b8d4
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 167 deletions.
8 changes: 7 additions & 1 deletion ompi/mca/coll/acoll/coll_acoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#ifdef HAVE_XPMEM_H
#include "opal/mca/rcache/base/base.h"
#include "opal/class/opal_hash_table.h"
#include <xpmem.h>
#endif

Expand All @@ -37,12 +38,14 @@ extern int mca_coll_acoll_max_comms;
extern int mca_coll_acoll_sg_size;
extern int mca_coll_acoll_sg_scale;
extern int mca_coll_acoll_node_size;
extern int mca_coll_acoll_force_numa;
extern int mca_coll_acoll_use_dynamic_rules;
extern int mca_coll_acoll_mnode_enable;
extern int mca_coll_acoll_bcast_lin0;
extern int mca_coll_acoll_bcast_lin1;
extern int mca_coll_acoll_bcast_lin2;
extern int mca_coll_acoll_bcast_nonsg;
extern int mca_coll_acoll_bcast_socket;
extern int mca_coll_acoll_allgather_lin;
extern int mca_coll_acoll_allgather_ring_1;

Expand Down Expand Up @@ -123,6 +126,7 @@ typedef struct coll_acoll_data {
void **xpmem_raddr;
mca_rcache_base_module_t **rcache;
void *scratch;
opal_hash_table_t **xpmem_reg_tracker_ht;
#endif
opal_shmem_ds_t *allshmseg_id;
void **allshmmmap_sbuf;
Expand Down Expand Up @@ -160,7 +164,7 @@ typedef struct coll_acoll_subcomms {
int numa_root;
int socket_ldr_root;
int base_root[MCA_COLL_ACOLL_NUM_BASE_LYRS][MCA_COLL_ACOLL_NUM_LAYERS];
int base_rank[MCA_COLL_ACOLL_NUM_BASE_LYRS];
int base_rank[MCA_COLL_ACOLL_NUM_BASE_LYRS][MCA_COLL_ACOLL_NUM_LAYERS];
int socket_rank;
int subgrp_size;
int initialized;
Expand Down Expand Up @@ -198,12 +202,14 @@ struct mca_coll_acoll_module_t {
int log2_sg_cnt;
int node_cnt;
int log2_node_cnt;
int force_numa;
int use_dyn_rules;
// Todo: Use substructure for every API related ones
int use_mnode;
int use_lin0;
int use_lin1;
int use_lin2;
int use_socket;
int mnode_sg_size;
int mnode_log2_sg_size;
int allg_lin;
Expand Down
12 changes: 6 additions & 6 deletions ompi/mca/coll/acoll/coll_acoll_allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ static inline int mca_coll_acoll_allgather_intra(const void *sbuf, size_t scount
}

/* Return if all ranks belong to single subgroup */
if (num_sgs == 1) {
if (1 == num_sgs) {
/* All done */
return err;
}
Expand Down Expand Up @@ -396,7 +396,7 @@ static inline int mca_coll_acoll_allgather_intra(const void *sbuf, size_t scount
}
/* Now all base ranks have the full data */
/* Do broadcast within subgroups from the base ranks for the extra data */
if (sg_id == 0) {
if (0 == sg_id) {
num_data_blks = 1;
data_blk_size[0] = bcount * (num_sgs - 2) + last_subgrp_rcnt;
blk_ofst[0] = bcount;
Expand Down Expand Up @@ -527,7 +527,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
if (num_nodes > 1) {
assert(subc->local_r_comm != NULL);
}
intra_comm = num_nodes == 1 ? comm : subc->local_r_comm;
intra_comm = 1 == num_nodes ? comm : subc->local_r_comm;
}
err = mca_coll_acoll_allgather_intra(sbuf, scount, sdtype, local_rbuf, rcount, rdtype,
intra_comm, module);
Expand All @@ -536,7 +536,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
}

/* Return if intra-node communicator */
if ((num_nodes == 1) || (size <= 2)) {
if ((1 == num_nodes) || (size <= 2)) {
/* All done */
return err;
}
Expand Down Expand Up @@ -592,7 +592,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
} /* End of if inter leader */

/* Do intra node broadcast */
if (node_id == 0) {
if (0 == node_id) {
num_data_blks = 1;
data_blk_size[0] = bcount * (num_nodes - 2) + last_subgrp_rcnt;
blk_ofst[0] = bcount;
Expand All @@ -613,7 +613,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
/* Loop over data blocks */
for (i = 0; i < num_data_blks; i++) {
char *buff = (char *) rbuf + (ptrdiff_t) blk_ofst[i] * rext;
err = (comm)->c_coll->coll_bcast(buff, data_blk_size[i], rdtype, 0, subc->local_r_comm,
err = ompi_coll_base_bcast_intra_basic_linear(buff, data_blk_size[i], rdtype, 0, subc->local_r_comm,
module);
if (MPI_SUCCESS != err) {
return err;
Expand Down
33 changes: 16 additions & 17 deletions ompi/mca/coll/acoll/coll_acoll_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
int size;
size_t total_dsize, dsize;

coll_acoll_init(module, comm, subc->data, subc);
coll_acoll_init(module, comm, subc->data, subc, 0);
coll_acoll_data_t *data = subc->data;
if (NULL == data) {
return -1;
Expand All @@ -82,15 +82,15 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
if (!subc->xpmem_use_sr_buf) {
tmp_rbuf = (char *) data->scratch;
tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2;
if ((sbuf == MPI_IN_PLACE)) {
if ((MPI_IN_PLACE == sbuf)) {
memcpy(tmp_sbuf, rbuf, total_dsize);
} else {
memcpy(tmp_sbuf, sbuf, total_dsize);
}
} else {
tmp_sbuf = (char *) sbuf;
tmp_rbuf = (char *) rbuf;
if (sbuf == MPI_IN_PLACE) {
if (MPI_IN_PLACE == sbuf) {
tmp_sbuf = (char *) rbuf;
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si

my_count_size = (l2_local_rank == (local_size - 1)) ? chunk + (count % local_size) : chunk;

if (l2_local_rank == 0) {
if (0 == l2_local_rank) {
for (int i = 1; i < local_size; i++) {
ompi_op_reduce(op, (char *) data->xpmem_raddr[l2_gp[i]], (char *) tmp_rbuf,
my_count_size, dtype);
Expand Down Expand Up @@ -192,7 +192,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
int size;
size_t total_dsize, dsize;

coll_acoll_init(module, comm, subc->data, subc);
coll_acoll_init(module, comm, subc->data, subc, 0);
coll_acoll_data_t *data = subc->data;
if (NULL == data) {
return -1;
Expand All @@ -207,15 +207,15 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
if (!subc->xpmem_use_sr_buf) {
tmp_rbuf = (char *) data->scratch;
tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2;
if ((sbuf == MPI_IN_PLACE)) {
if ((MPI_IN_PLACE == sbuf)) {
memcpy(tmp_sbuf, rbuf, total_dsize);
} else {
memcpy(tmp_sbuf, sbuf, total_dsize);
}
} else {
tmp_sbuf = (char *) sbuf;
tmp_rbuf = (char *) rbuf;
if (sbuf == MPI_IN_PLACE) {
if (MPI_IN_PLACE == sbuf) {
tmp_sbuf = (char *) rbuf;
}
}
Expand All @@ -242,7 +242,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,

size_t chunk = count / size;
size_t my_count_size = (rank == (size - 1)) ? (count / size) + count % size : count / size;
if (rank == 0) {
if (0 == rank) {
if (sbuf != MPI_IN_PLACE)
memcpy(tmp_rbuf, sbuf, my_count_size * dsize);
} else {
Expand Down Expand Up @@ -299,7 +299,7 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp
opal_atomic_wmb();

int val;
if (up == 1) {
if (1 == up) {
val = data->sync[0];
} else {
val = data->sync[1];
Expand Down Expand Up @@ -346,7 +346,7 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp
__ATOMIC_RELAXED);
}
}
if (up == 1) {
if (1 == up) {
data->sync[0] = val;
} else {
data->sync[1] = val;
Expand All @@ -361,8 +361,7 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c
{
size_t dsize;
int err = MPI_SUCCESS;

coll_acoll_init(module, comm, subc->data, subc);
coll_acoll_init(module, comm, subc->data, subc, 0);
coll_acoll_data_t *data = subc->data;
if (NULL == data) {
return -1;
Expand Down Expand Up @@ -434,7 +433,7 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c
}

if (intra && (ompi_comm_size(subc->numa_comm) > 1)) {
err = mca_coll_acoll_bcast(rbuf, count, dtype, 0, subc->numa_comm, module);
err = ompi_coll_base_bcast_intra_basic_linear(rbuf, count, dtype, 0, subc->numa_comm, module);
}
return err;
}
Expand All @@ -451,7 +450,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
ompi_datatype_type_size(dtype, &dsize);
total_dsize = dsize * count;

if (size == 1) {
if (1 == size) {
if (MPI_IN_PLACE != sbuf) {
memcpy((char *) rbuf, sbuf, total_dsize);
}
Expand Down Expand Up @@ -483,7 +482,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,

alg = coll_allreduce_decision_fixed(size, total_dsize);

if (num_nodes == 1) {
if (1 == num_nodes) {
if (total_dsize < 32) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
comm, module);
Expand All @@ -494,10 +493,10 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
comm, module);
} else if (total_dsize < 65536) {
if (alg == 1) {
if (1 == alg) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype,
op, comm, module);
} else if (alg == 2) {
} else if (2 == alg) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
} else { /*alg == 3 */
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/coll/acoll/coll_acoll_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
}

size = ompi_comm_size(comm);
if (size == 1) {
if (1 == size) {
return err;
}
if (!subc->initialized && size > 1) {
Expand Down
Loading

0 comments on commit 398b8d4

Please sign in to comment.