diff --git a/src/bvals/comms/combined_buffers.cpp b/src/bvals/comms/combined_buffers.cpp index 798ad70d32f2..7b9c8bc57f42 100644 --- a/src/bvals/comms/combined_buffers.cpp +++ b/src/bvals/comms/combined_buffers.cpp @@ -30,8 +30,10 @@ namespace parthenon { -CombinedBuffersRank::CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send) - : other_rank(o_rank), b_type(b_type), sender(send), buffers_built(false) { +CombinedBuffersRank::CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send, + mpi_comm_t comm) + : other_rank(o_rank), b_type(b_type), sender(send), buffers_built(false), + comm_(comm) { int tag = 1234 + static_cast(GetAssociatedSender(b_type)); if (sender) { @@ -72,7 +74,7 @@ bool CombinedBuffersRank::TryReceiveBufInfo(Mesh *pmesh) { const int nbuf = mess_buf[idx++]; const int total_size = mess_buf[idx++]; combined_buffers[partition] = - CommBuffer(913 + partition, other_rank, Globals::my_rank, comm_); + CommBuffer(partition, other_rank, Globals::my_rank, comm_); combined_buffers[partition].ConstructBuffer("combined recv buffer", total_size); auto &cr_info = combined_info[partition]; auto &bufs = buffers[partition]; @@ -140,7 +142,7 @@ void CombinedBuffersRank::ResolveSendBuffersAndSendInfo(Mesh *pmesh) { // Allocate the combined buffers for (auto &[partition, size] : current_size) { combined_buffers[partition] = - CommBuffer(913 + partition, Globals::my_rank, other_rank, comm_); + CommBuffer(partition, Globals::my_rank, other_rank, comm_); combined_buffers[partition].ConstructBuffer("combined send buffer", size); } @@ -284,8 +286,11 @@ void CombinedBuffers::AddSendBuffer(int partition, MeshBlock *pmb, const std::shared_ptr> &var, BoundaryType b_type) { if (combined_send_buffers.count({nb.rank, b_type}) == 0) - combined_send_buffers[{nb.rank, b_type}] = CombinedBuffersRank(nb.rank, b_type, true); - combined_send_buffers[{nb.rank, b_type}].AddSendBuffer(partition, pmb, nb, var, b_type); + combined_send_buffers.emplace( + std::make_pair(std::make_pair(nb.rank, b_type), + CombinedBuffersRank(nb.rank, b_type, true, comm_))); + combined_send_buffers.at({nb.rank, b_type}) + .AddSendBuffer(partition, pmb, nb, var, b_type); } void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, @@ -295,8 +300,9 @@ void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, // know that it's existence implies that we need to receive a message from the // neighbor block rank eventually telling us the details if (combined_recv_buffers.count({nb.rank, b_type}) == 0) - combined_recv_buffers[{nb.rank, b_type}] = - CombinedBuffersRank(nb.rank, b_type, false); + combined_recv_buffers.emplace( + std::make_pair(std::make_pair(nb.rank, b_type), + CombinedBuffersRank(nb.rank, b_type, false, comm_))); } void CombinedBuffers::ResolveAndSendSendBuffers(Mesh *pmesh) { @@ -325,7 +331,7 @@ bool CombinedBuffers::IsAvailableForWrite(int partition, BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_send_buffers.count({rank, b_type})) { available = available && - combined_send_buffers[{rank, b_type}].IsAvailableForWrite(partition); + combined_send_buffers.at({rank, b_type}).IsAvailableForWrite(partition); } } return available; @@ -334,7 +340,7 @@ bool CombinedBuffers::IsAvailableForWrite(int partition, BoundaryType b_type) { void CombinedBuffers::PackAndSend(int partition, BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_send_buffers.count({rank, b_type})) { - combined_send_buffers[{rank, b_type}].PackAndSend(partition); + combined_send_buffers.at({rank, b_type}).PackAndSend(partition); } } } @@ -343,7 +349,7 @@ void CombinedBuffers::RepointSendBuffers(Mesh *pmesh, int partition, BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_send_buffers.count({rank, b_type})) - combined_send_buffers[{rank, b_type}].RepointBuffers(pmesh, partition); + combined_send_buffers.at({rank, b_type}).RepointBuffers(pmesh, partition); } } @@ -351,7 +357,7 @@ void CombinedBuffers::RepointRecvBuffers(Mesh *pmesh, int partition, BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_recv_buffers.count({rank, b_type})) - combined_recv_buffers[{rank, b_type}].RepointBuffers(pmesh, partition); + combined_recv_buffers.at({rank, b_type}).RepointBuffers(pmesh, partition); } } @@ -361,22 +367,22 @@ void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) { int flag; do { // TODO(LFR): Switch to a different communicator for each BoundaryType - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status); + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm_, &flag, &status); if (flag) { const int rank = status.MPI_SOURCE; - const int partition = status.MPI_TAG - 913; + const int partition = status.MPI_TAG; bool finished = - combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition); + combined_recv_buffers.at({rank, b_type}).TryReceiveAndUnpack(pmesh, partition); if (!finished) processing_messages.insert({rank, partition}); } } while (flag); // Process in flight messages - std::set> finished_messages; + std::vector> finished_messages; for (auto &[rank, partition] : processing_messages) { bool finished = - combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition); - if (finished) finished_messages.insert({rank, partition}); + combined_recv_buffers.at({rank, b_type}).TryReceiveAndUnpack(pmesh, partition); + if (finished) finished_messages.push_back({rank, partition}); } for (auto &m : finished_messages) diff --git a/src/bvals/comms/combined_buffers.hpp b/src/bvals/comms/combined_buffers.hpp index 12e3a6005f40..902ac82a7de7 100644 --- a/src/bvals/comms/combined_buffers.hpp +++ b/src/bvals/comms/combined_buffers.hpp @@ -56,15 +56,10 @@ struct CombinedBuffersRank { using com_buf_t = CommBuffer>; com_buf_t message; -#ifdef MPI_PARALLEL - mpi_comm_t comm_{MPI_COMM_WORLD}; -#else - mpi_comm_t comm_{0}; -#endif + mpi_comm_t comm_; bool sender{true}; - CombinedBuffersRank() = default; - CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send); + CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send, mpi_comm_t comm); void AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb, const std::shared_ptr> &var, BoundaryType b_type); @@ -95,6 +90,21 @@ struct CombinedBuffers { std::set> processing_messages; + mpi_comm_t comm_; + CombinedBuffers() { +#ifdef MPI_PARALLEL + PARTHENON_MPI_CHECK(MPI_Comm_dup(MPI_COMM_WORLD, &comm_)); +#else + comm_ = 0; +#endif + } + + ~CombinedBuffers() { +#ifdef MPI_PARALLEL + PARTHENON_MPI_CHECK(MPI_Comm_free(&comm_)); +#endif + } + void clear() { // TODO(LFR): Need to be careful here that the asynchronous send buffers are finished combined_send_buffers.clear();