Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NEVER_ALONE for chained MPI calls #184

Merged
merged 3 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
std::vector<std::string> executedAt;
if (size > 1) {
// Send the init messages (note that message i corresponds to rank i+1)
faabric::util::SchedulingDecision decision = sch.callFunctions(req);
// By default, we use the NEVER_ALONE policy for MPI executions to
// minimise cross-host messaging
faabric::util::SchedulingDecision decision = sch.callFunctions(
req, faabric::util::SchedulingTopologyHint::NEVER_ALONE);
executedAt = decision.hosts;
}
assert(executedAt.size() == size - 1);
Expand Down
135 changes: 56 additions & 79 deletions tests/test/scheduler/test_remote_mpi_worlds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ class RemoteCollectiveTestFixture : public RemoteMpiTestFixture
thisWorldRanks = { thisHostRankB, thisHostRankA, 0 };
otherWorldRanks = { otherHostRankB, otherHostRankC, otherHostRankA };

// Here we rely on the scheduler running out of resources and
// overloading this world with ranks 4 and 5
setWorldSizes(thisWorldSize, 1, 3);
setWorldSizes(thisWorldSize, 3, 3);
}

MpiWorld& setUpThisWorld()
Expand All @@ -48,12 +46,11 @@ class RemoteCollectiveTestFixture : public RemoteMpiTestFixture
protected:
int thisWorldSize = 6;

int otherHostRankA = 1;
int otherHostRankB = 2;
int otherHostRankC = 3;

int thisHostRankA = 4;
int thisHostRankB = 5;
int thisHostRankA = 1;
int thisHostRankB = 2;
int otherHostRankA = 3;
int otherHostRankB = 4;
int otherHostRankC = 5;

std::vector<int> otherWorldRanks;
std::vector<int> thisWorldRanks;
Expand All @@ -62,7 +59,7 @@ class RemoteCollectiveTestFixture : public RemoteMpiTestFixture
TEST_CASE_METHOD(RemoteMpiTestFixture, "Test rank allocation", "[mpi]")
{
// Allocate two ranks in total, one rank per host
setWorldSizes(2, 1, 1);
setWorldSizes(4, 2, 2);

MpiWorld& thisWorld = getMpiWorldRegistry().createWorld(msg, worldId);
faabric::util::setMockMode(false);
Expand All @@ -72,8 +69,10 @@ TEST_CASE_METHOD(RemoteMpiTestFixture, "Test rank allocation", "[mpi]")
std::thread otherWorldThread([this] {
otherWorld.initialiseFromMsg(msg);

REQUIRE(otherWorld.getHostForRank(0) == thisHost);
REQUIRE(otherWorld.getHostForRank(1) == otherHost);
assert(otherWorld.getHostForRank(0) == thisHost);
assert(otherWorld.getHostForRank(1) == thisHost);
assert(otherWorld.getHostForRank(2) == otherHost);
assert(otherWorld.getHostForRank(3) == otherHost);

otherWorld.destroy();
});
Expand All @@ -83,17 +82,19 @@ TEST_CASE_METHOD(RemoteMpiTestFixture, "Test rank allocation", "[mpi]")
}

REQUIRE(thisWorld.getHostForRank(0) == thisHost);
REQUIRE(thisWorld.getHostForRank(1) == otherHost);
REQUIRE(thisWorld.getHostForRank(1) == thisHost);
REQUIRE(thisWorld.getHostForRank(2) == otherHost);
REQUIRE(thisWorld.getHostForRank(3) == otherHost);

thisWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture, "Test send across hosts", "[mpi]")
{
// Register two ranks (one on each host)
setWorldSizes(2, 1, 1);
setWorldSizes(4, 2, 2);
int rankA = 0;
int rankB = 1;
int rankB = 2;
std::vector<int> messageData = { 0, 1, 2 };

// Init worlds
Expand Down Expand Up @@ -137,9 +138,9 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
"[mpi]")
{
// Register two ranks (one on each host)
setWorldSizes(2, 1, 1);
setWorldSizes(4, 2, 2);
int rankA = 0;
int rankB = 1;
int rankB = 2;
std::vector<int> messageData = { 0, 1, 2 };
std::vector<int> messageData2 = { 3, 4, 5 };

Expand Down Expand Up @@ -201,61 +202,14 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
thisWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture, "Test barrier across hosts", "[mpi]")
Copy link
Collaborator Author

@csegarragonz csegarragonz Nov 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new scheduling policy, we will always have at least two remote processes. Thus, we can't test a barrier across hosts without using a distributed setting (both remote ranks must call barrier to unlock).

The other tests in this file I managed to port.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nice, we have quite a lot of this kind of test where we're trying to fake a distributed setting locally, all of which date from before the distributed tests.

Is there a dist test that covers this now? I can't remember. If not, could we add one? Happy to discuss specifics of this offline as I'm not sure we have any Faabric dist tests that cover the MPI implementation.

In future it might be worth trying to port things to dist tests rather than keep the hacky local versions, but in this instance it seems to work ok.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am introducing distributed tests for MPI (inheriting from mpi-native in #186). The order in which we merge this PR and the other one does not really matter.

{
// Register two ranks (one on each host)
setWorldSizes(2, 1, 1);
int rankA = 0;
int rankB = 1;
std::vector<int> sendData = { 0, 1, 2 };
std::vector<int> recvData = { -1, -1, -1 };

// Init worlds
MpiWorld& thisWorld = getMpiWorldRegistry().createWorld(msg, worldId);
faabric::util::setMockMode(false);

thisWorld.broadcastHostsToRanks();

std::thread otherWorldThread([this, rankA, rankB, &sendData, &recvData] {
otherWorld.initialiseFromMsg(msg);

otherWorld.send(
rankB, rankA, BYTES(sendData.data()), MPI_INT, sendData.size());

// Barrier on this rank
otherWorld.barrier(rankB);
assert(sendData == recvData);
otherWorld.destroy();
});

// Receive the message for the given rank
thisWorld.recv(rankB,
rankA,
BYTES(recvData.data()),
MPI_INT,
recvData.size(),
MPI_STATUS_IGNORE);
REQUIRE(recvData == sendData);

// Call barrier to synchronise remote host
thisWorld.barrier(rankA);

// Clean up
if (otherWorldThread.joinable()) {
otherWorldThread.join();
}

thisWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture,
"Test sending many messages across host",
"[mpi]")
{
// Register two ranks (one on each host)
setWorldSizes(2, 1, 1);
setWorldSizes(4, 2, 2);
int rankA = 0;
int rankB = 1;
int rankB = 2;
int numMessages = 1000;

// Init worlds
Expand Down Expand Up @@ -374,8 +328,17 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
MPI_INT,
nPerRank);

// Build the expectation
std::vector<std::vector<int>> expected(otherWorldRanks.size(),
std::vector<int>(nPerRank));
for (int i = 0; i < otherWorldRanks.size(); i++) {
for (int j = 0; j < nPerRank; j++) {
expected.at(i).at(j) = otherWorldRanks.at(i) * nPerRank + j;
}
}

// Check for root
assert(actual == std::vector<int>({ 8, 9, 10, 11 }));
assert(actual == expected.at(0));

// Check the other ranks on this host have received the data
otherWorld.scatter(otherHostRankB,
Expand All @@ -386,7 +349,8 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
BYTES(actual.data()),
MPI_INT,
nPerRank);
assert(actual == std::vector<int>({ 4, 5, 6, 7 }));
// assert(actual == std::vector<int>({ 4, 5, 6, 7 }));
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
assert(actual == expected.at(2));

otherWorld.scatter(otherHostRankB,
otherHostRankC,
Expand All @@ -396,12 +360,22 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
BYTES(actual.data()),
MPI_INT,
nPerRank);
assert(actual == std::vector<int>({ 12, 13, 14, 15 }));
// assert(actual == std::vector<int>({ 12, 13, 14, 15 }));
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
assert(actual == expected.at(1));

testLatch->wait();
otherWorld.destroy();
});

// Build the expectation
std::vector<std::vector<int>> expected(thisWorldRanks.size(),
std::vector<int>(nPerRank));
for (int i = 0; i < thisWorldRanks.size(); i++) {
for (int j = 0; j < nPerRank; j++) {
expected.at(i).at(j) = thisWorldRanks.at(i) * nPerRank + j;
}
}

// Check for ranks on this host
std::vector<int> actual(nPerRank, -1);
thisWorld.scatter(otherHostRankB,
Expand All @@ -412,7 +386,7 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
BYTES(actual.data()),
MPI_INT,
nPerRank);
REQUIRE(actual == std::vector<int>({ 0, 1, 2, 3 }));
REQUIRE(actual == expected.at(2));

thisWorld.scatter(otherHostRankB,
thisHostRankB,
Expand All @@ -422,7 +396,7 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
BYTES(actual.data()),
MPI_INT,
nPerRank);
REQUIRE(actual == std::vector<int>({ 20, 21, 22, 23 }));
REQUIRE(actual == expected.at(0));

thisWorld.scatter(otherHostRankB,
thisHostRankA,
Expand All @@ -432,7 +406,7 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
BYTES(actual.data()),
MPI_INT,
nPerRank);
REQUIRE(actual == std::vector<int>({ 16, 17, 18, 19 }));
REQUIRE(actual == expected.at(1));

// Clean up
testLatch->wait();
Expand Down Expand Up @@ -530,8 +504,8 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
"[mpi]")
{
// Allocate two ranks in total, one rank per host
setWorldSizes(2, 1, 1);
int sendRank = 1;
setWorldSizes(4, 2, 2);
int sendRank = 2;
int recvRank = 0;
std::vector<int> messageData = { 0, 1, 2 };

Expand Down Expand Up @@ -597,8 +571,8 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
"[mpi]")
{
// Allocate two ranks in total, one rank per host
setWorldSizes(2, 1, 1);
int sendRank = 1;
setWorldSizes(4, 2, 2);
int sendRank = 2;
int recvRank = 0;

// Init world
Expand Down Expand Up @@ -729,9 +703,9 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
"[mpi]")
{
// Register two ranks (one on each host)
setWorldSizes(2, 1, 1);
setWorldSizes(4, 2, 2);
int rankA = 0;
int rankB = 1;
int rankB = 2;
std::vector<int> messageData = { 0, 1, 2 };
std::vector<int> messageData2 = { 3, 4 };

Expand Down Expand Up @@ -779,7 +753,10 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
});

std::vector<bool> endpointCheck;
std::vector<bool> expectedEndpoints = { false, true, false, false };
std::vector<bool> expectedEndpoints = { false, false, true, false,
false, false, false, false,
false, false, false, false,
false, false, false, false };

// Sending a message initialises the remote endpoint
thisWorld.send(
Expand Down