Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coll/acoll: A few miscellaneous bugfixes #12985

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ompi/mca/coll/acoll/coll_acoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#ifdef HAVE_XPMEM_H
#include "opal/mca/rcache/base/base.h"
#include "opal/class/opal_hash_table.h"
#include <xpmem.h>
#endif

Expand All @@ -37,12 +38,14 @@ extern int mca_coll_acoll_max_comms;
extern int mca_coll_acoll_sg_size;
extern int mca_coll_acoll_sg_scale;
extern int mca_coll_acoll_node_size;
extern int mca_coll_acoll_force_numa;
extern int mca_coll_acoll_use_dynamic_rules;
extern int mca_coll_acoll_mnode_enable;
extern int mca_coll_acoll_bcast_lin0;
extern int mca_coll_acoll_bcast_lin1;
extern int mca_coll_acoll_bcast_lin2;
extern int mca_coll_acoll_bcast_nonsg;
extern int mca_coll_acoll_bcast_socket;
extern int mca_coll_acoll_allgather_lin;
extern int mca_coll_acoll_allgather_ring_1;

Expand Down Expand Up @@ -123,6 +126,7 @@ typedef struct coll_acoll_data {
void **xpmem_raddr;
mca_rcache_base_module_t **rcache;
void *scratch;
opal_hash_table_t **xpmem_reg_tracker_ht;
#endif
opal_shmem_ds_t *allshmseg_id;
void **allshmmmap_sbuf;
Expand Down Expand Up @@ -160,7 +164,7 @@ typedef struct coll_acoll_subcomms {
int numa_root;
int socket_ldr_root;
int base_root[MCA_COLL_ACOLL_NUM_BASE_LYRS][MCA_COLL_ACOLL_NUM_LAYERS];
int base_rank[MCA_COLL_ACOLL_NUM_BASE_LYRS];
int base_rank[MCA_COLL_ACOLL_NUM_BASE_LYRS][MCA_COLL_ACOLL_NUM_LAYERS];
int socket_rank;
int subgrp_size;
int initialized;
Expand Down Expand Up @@ -198,12 +202,14 @@ struct mca_coll_acoll_module_t {
int log2_sg_cnt;
int node_cnt;
int log2_node_cnt;
int force_numa;
int use_dyn_rules;
// Todo: Use substructure for every API related ones
int use_mnode;
int use_lin0;
int use_lin1;
int use_lin2;
int use_socket;
int mnode_sg_size;
int mnode_log2_sg_size;
int allg_lin;
Expand Down
12 changes: 6 additions & 6 deletions ompi/mca/coll/acoll/coll_acoll_allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ static inline int mca_coll_acoll_allgather_intra(const void *sbuf, size_t scount
}

/* Return if all ranks belong to single subgroup */
if (num_sgs == 1) {
if (1 == num_sgs) {
/* All done */
return err;
}
Expand Down Expand Up @@ -396,7 +396,7 @@ static inline int mca_coll_acoll_allgather_intra(const void *sbuf, size_t scount
}
/* Now all base ranks have the full data */
/* Do broadcast within subgroups from the base ranks for the extra data */
if (sg_id == 0) {
if (0 == sg_id) {
num_data_blks = 1;
data_blk_size[0] = bcount * (num_sgs - 2) + last_subgrp_rcnt;
blk_ofst[0] = bcount;
Expand Down Expand Up @@ -527,7 +527,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
if (num_nodes > 1) {
assert(subc->local_r_comm != NULL);
}
intra_comm = num_nodes == 1 ? comm : subc->local_r_comm;
intra_comm = 1 == num_nodes ? comm : subc->local_r_comm;
}
err = mca_coll_acoll_allgather_intra(sbuf, scount, sdtype, local_rbuf, rcount, rdtype,
intra_comm, module);
Expand All @@ -536,7 +536,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
}

/* Return if intra-node communicator */
if ((num_nodes == 1) || (size <= 2)) {
if ((1 == num_nodes) || (size <= 2)) {
/* All done */
return err;
}
Expand Down Expand Up @@ -592,7 +592,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
} /* End of if inter leader */

/* Do intra node broadcast */
if (node_id == 0) {
if (0 == node_id) {
num_data_blks = 1;
data_blk_size[0] = bcount * (num_nodes - 2) + last_subgrp_rcnt;
blk_ofst[0] = bcount;
Expand All @@ -613,7 +613,7 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
/* Loop over data blocks */
for (i = 0; i < num_data_blks; i++) {
char *buff = (char *) rbuf + (ptrdiff_t) blk_ofst[i] * rext;
err = (comm)->c_coll->coll_bcast(buff, data_blk_size[i], rdtype, 0, subc->local_r_comm,
err = ompi_coll_base_bcast_intra_basic_linear(buff, data_blk_size[i], rdtype, 0, subc->local_r_comm,
module);
if (MPI_SUCCESS != err) {
return err;
Expand Down
33 changes: 16 additions & 17 deletions ompi/mca/coll/acoll/coll_acoll_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
int size;
size_t total_dsize, dsize;

coll_acoll_init(module, comm, subc->data, subc);
coll_acoll_init(module, comm, subc->data, subc, 0);
coll_acoll_data_t *data = subc->data;
if (NULL == data) {
return -1;
Expand All @@ -82,15 +82,15 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
if (!subc->xpmem_use_sr_buf) {
tmp_rbuf = (char *) data->scratch;
tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2;
if ((sbuf == MPI_IN_PLACE)) {
if ((MPI_IN_PLACE == sbuf)) {
memcpy(tmp_sbuf, rbuf, total_dsize);
} else {
memcpy(tmp_sbuf, sbuf, total_dsize);
}
} else {
tmp_sbuf = (char *) sbuf;
tmp_rbuf = (char *) rbuf;
if (sbuf == MPI_IN_PLACE) {
if (MPI_IN_PLACE == sbuf) {
tmp_sbuf = (char *) rbuf;
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si

my_count_size = (l2_local_rank == (local_size - 1)) ? chunk + (count % local_size) : chunk;

if (l2_local_rank == 0) {
if (0 == l2_local_rank) {
for (int i = 1; i < local_size; i++) {
ompi_op_reduce(op, (char *) data->xpmem_raddr[l2_gp[i]], (char *) tmp_rbuf,
my_count_size, dtype);
Expand Down Expand Up @@ -192,7 +192,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
int size;
size_t total_dsize, dsize;

coll_acoll_init(module, comm, subc->data, subc);
coll_acoll_init(module, comm, subc->data, subc, 0);
coll_acoll_data_t *data = subc->data;
if (NULL == data) {
return -1;
Expand All @@ -207,15 +207,15 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
if (!subc->xpmem_use_sr_buf) {
tmp_rbuf = (char *) data->scratch;
tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2;
if ((sbuf == MPI_IN_PLACE)) {
if ((MPI_IN_PLACE == sbuf)) {
memcpy(tmp_sbuf, rbuf, total_dsize);
} else {
memcpy(tmp_sbuf, sbuf, total_dsize);
}
} else {
tmp_sbuf = (char *) sbuf;
tmp_rbuf = (char *) rbuf;
if (sbuf == MPI_IN_PLACE) {
if (MPI_IN_PLACE == sbuf) {
tmp_sbuf = (char *) rbuf;
}
}
Expand All @@ -242,7 +242,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,

size_t chunk = count / size;
size_t my_count_size = (rank == (size - 1)) ? (count / size) + count % size : count / size;
if (rank == 0) {
if (0 == rank) {
if (sbuf != MPI_IN_PLACE)
memcpy(tmp_rbuf, sbuf, my_count_size * dsize);
} else {
Expand Down Expand Up @@ -299,7 +299,7 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp
opal_atomic_wmb();

int val;
if (up == 1) {
if (1 == up) {
val = data->sync[0];
} else {
val = data->sync[1];
Expand Down Expand Up @@ -346,7 +346,7 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp
__ATOMIC_RELAXED);
}
}
if (up == 1) {
if (1 == up) {
data->sync[0] = val;
} else {
data->sync[1] = val;
Expand All @@ -361,8 +361,7 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c
{
size_t dsize;
int err = MPI_SUCCESS;

coll_acoll_init(module, comm, subc->data, subc);
coll_acoll_init(module, comm, subc->data, subc, 0);
coll_acoll_data_t *data = subc->data;
if (NULL == data) {
return -1;
Expand Down Expand Up @@ -434,7 +433,7 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c
}

if (intra && (ompi_comm_size(subc->numa_comm) > 1)) {
err = mca_coll_acoll_bcast(rbuf, count, dtype, 0, subc->numa_comm, module);
err = ompi_coll_base_bcast_intra_basic_linear(rbuf, count, dtype, 0, subc->numa_comm, module);
}
return err;
}
Expand All @@ -451,7 +450,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
ompi_datatype_type_size(dtype, &dsize);
total_dsize = dsize * count;

if (size == 1) {
if (1 == size) {
if (MPI_IN_PLACE != sbuf) {
memcpy((char *) rbuf, sbuf, total_dsize);
}
Expand Down Expand Up @@ -483,7 +482,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,

alg = coll_allreduce_decision_fixed(size, total_dsize);

if (num_nodes == 1) {
if (1 == num_nodes) {
if (total_dsize < 32) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
comm, module);
Expand All @@ -494,10 +493,10 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
comm, module);
} else if (total_dsize < 65536) {
if (alg == 1) {
if (1 == alg) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype,
op, comm, module);
} else if (alg == 2) {
} else if (2 == alg) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
} else { /*alg == 3 */
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/coll/acoll/coll_acoll_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
}

size = ompi_comm_size(comm);
if (size == 1) {
if (1 == size) {
return err;
}
if (!subc->initialized && size > 1) {
Expand Down
Loading
Loading