Skip to content

Commit

Permalink
Move snapshot client/ server into snapshot module (#125)
Browse files Browse the repository at this point in the history
* Only do snapshot pushing if there are registered hosts

* Move snapshot stuff to snapshot module

* Fix compilation

* Revert unrelated change
  • Loading branch information
Shillaker authored Jul 5, 2021
1 parent 9839151 commit a51938e
Show file tree
Hide file tree
Showing 16 changed files with 50 additions and 50 deletions.
4 changes: 2 additions & 2 deletions include/faabric/runner/FaabricMain.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallServer.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotServer.h>
#include <faabric/snapshot/SnapshotServer.h>
#include <faabric/state/StateServer.h>
#include <faabric/util/config.h>

Expand All @@ -28,6 +28,6 @@ class FaabricMain
private:
faabric::state::StateServer stateServer;
faabric::scheduler::FunctionCallServer functionServer;
faabric::scheduler::SnapshotServer snapshotServer;
faabric::snapshot::SnapshotServer snapshotServer;
};
}
4 changes: 2 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <faabric/scheduler/ExecGraph.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
Expand Down Expand Up @@ -187,7 +187,7 @@ class Scheduler
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

faabric::scheduler::SnapshotClient& getSnapshotClient(
faabric::snapshot::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

faabric::HostResources thisHostResources;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

namespace faabric::scheduler {
namespace faabric::snapshot {
enum SnapshotCalls
{
NoSnapshotCall = 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#pragma once

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/snapshot/SnapshotApi.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/snapshot.h>

namespace faabric::scheduler {
namespace faabric::snapshot {

// -----------------------------------
// Mocking
Expand Down Expand Up @@ -57,6 +57,6 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
const std::vector<faabric::util::SnapshotDiff>& diffs);

private:
void sendHeader(faabric::scheduler::SnapshotCalls call);
void sendHeader(faabric::snapshot::SnapshotCalls call);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/snapshot/SnapshotApi.h>
#include <faabric/transport/MessageEndpointServer.h>

namespace faabric::scheduler {
namespace faabric::snapshot {
class SnapshotServer final : public faabric::transport::MessageEndpointServer
{
public:
Expand Down
4 changes: 1 addition & 3 deletions src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ set(LIB_FILES
FunctionCallClient.cpp
FunctionCallServer.cpp
Scheduler.cpp
SnapshotServer.cpp
SnapshotClient.cpp
MpiContext.cpp
MpiMessageBuffer.cpp
MpiWorldRegistry.cpp
Expand All @@ -18,4 +16,4 @@ set(LIB_FILES

faabric_lib(scheduler "${LIB_FILES}")

target_link_libraries(scheduler flat proto snapshot state faabricmpi redis transport)
target_link_libraries(scheduler snapshot state faabricmpi redis)
5 changes: 3 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/environment.h>
#include <faabric/util/func.h>
Expand All @@ -19,6 +19,7 @@
#define FLUSH_TIMEOUT_MS 10000

using namespace faabric::util;
using namespace faabric::snapshot;

namespace faabric::scheduler {

Expand All @@ -31,7 +32,7 @@ static thread_local std::unordered_map<std::string,
functionCallClients;

static thread_local std::unordered_map<std::string,
faabric::scheduler::SnapshotClient>
faabric::snapshot::SnapshotClient>
snapshotClients;

Scheduler& getScheduler()
Expand Down
4 changes: 3 additions & 1 deletion src/snapshot/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
file(GLOB HEADERS "${FAABRIC_INCLUDE_DIR}/faabric/snapshot/*.h")

set(LIB_FILES
SnapshotClient.cpp
SnapshotRegistry.cpp
SnapshotServer.cpp
${HEADERS}
)

faabric_lib(snapshot "${LIB_FILES}")

target_link_libraries(snapshot proto util)
target_link_libraries(snapshot proto flat transport util)
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/transport/common.h>
#include <faabric/transport/macros.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>
#include <faabric/util/queue.h>
#include <faabric/util/testing.h>

namespace faabric::scheduler {
namespace faabric::snapshot {

// -----------------------------------
// Mocking
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <faabric/flat/faabric_generated.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/SnapshotServer.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/snapshot/SnapshotServer.h>
#include <faabric/state/State.h>
#include <faabric/transport/common.h>
#include <faabric/transport/macros.h>
Expand All @@ -10,7 +10,7 @@

#include <sys/mman.h>

namespace faabric::scheduler {
namespace faabric::snapshot {
SnapshotServer::SnapshotServer()
: faabric::transport::MessageEndpointServer(SNAPSHOT_ASYNC_PORT,
SNAPSHOT_SYNC_PORT)
Expand All @@ -21,11 +21,11 @@ void SnapshotServer::doAsyncRecv(int header,
size_t bufferSize)
{
switch (header) {
case faabric::scheduler::SnapshotCalls::DeleteSnapshot: {
case faabric::snapshot::SnapshotCalls::DeleteSnapshot: {
this->recvDeleteSnapshot(buffer, bufferSize);
break;
}
case faabric::scheduler::SnapshotCalls::ThreadResult: {
case faabric::snapshot::SnapshotCalls::ThreadResult: {
this->recvThreadResult(buffer, bufferSize);
break;
}
Expand All @@ -40,10 +40,10 @@ std::unique_ptr<google::protobuf::Message>
SnapshotServer::doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize)
{
switch (header) {
case faabric::scheduler::SnapshotCalls::PushSnapshot: {
case faabric::snapshot::SnapshotCalls::PushSnapshot: {
return recvPushSnapshot(buffer, bufferSize);
}
case faabric::scheduler::SnapshotCalls::PushSnapshotDiffs: {
case faabric::snapshot::SnapshotCalls::PushSnapshotDiffs: {
return recvPushSnapshotDiffs(buffer, bufferSize);
}
default: {
Expand Down
2 changes: 1 addition & 1 deletion tests/dist/scheduler/test_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/bytes.h>
#include <faabric/util/config.h>
Expand Down
25 changes: 12 additions & 13 deletions tests/test/scheduler/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
Expand Down Expand Up @@ -468,7 +468,7 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(actualHost == otherHost);

// Check the snapshot has been pushed to the other host
auto snapPushes = faabric::scheduler::getSnapshotPushes();
auto snapPushes = faabric::snapshot::getSnapshotPushes();
REQUIRE(snapPushes.size() == 1);
REQUIRE(snapPushes.at(0).first == otherHost);

Expand All @@ -484,7 +484,7 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(restoreCount == 1);

// Process the thread result requests
auto results = faabric::scheduler::getThreadResults();
auto results = faabric::snapshot::getThreadResults();

for (auto& r : results) {
REQUIRE(r.first == thisHost);
Expand Down Expand Up @@ -524,8 +524,8 @@ TEST_CASE_METHOD(TestExecutorFixture,

// Note that because the results don't actually get logged on this host, we
// can't wait on them as usual.
auto actual = faabric::scheduler::getThreadResults();
REQUIRE_RETRY(actual = faabric::scheduler::getThreadResults(),
auto actual = faabric::snapshot::getThreadResults();
REQUIRE_RETRY(actual = faabric::snapshot::getThreadResults(),
actual.size() == nThreads);

std::vector<uint32_t> actualMessageIds;
Expand Down Expand Up @@ -714,11 +714,10 @@ TEST_CASE_METHOD(TestExecutorFixture,

// Results aren't set on this host as it's not the master, so we have to
// wait
REQUIRE_RETRY({},
faabric::scheduler::getThreadResults().size() == nThreads);
REQUIRE_RETRY({}, faabric::snapshot::getThreadResults().size() == nThreads);

// Check results have been sent back to the master host
auto actualResults = faabric::scheduler::getThreadResults();
auto actualResults = faabric::snapshot::getThreadResults();
REQUIRE(actualResults.size() == nThreads);

// Check only one has diffs attached
Expand Down Expand Up @@ -814,18 +813,18 @@ TEST_CASE_METHOD(TestExecutorFixture,
REQUIRE(sch.getFunctionRegisteredHosts(msg) == expectedRegistered);

// Check snapshot has been pushed
auto pushes = faabric::scheduler::getSnapshotPushes();
auto pushes = faabric::snapshot::getSnapshotPushes();
REQUIRE(pushes.at(0).first == otherHost);
REQUIRE(pushes.at(0).second.size == snapshotSize);

REQUIRE(faabric::scheduler::getSnapshotDiffPushes().empty());
REQUIRE(faabric::snapshot::getSnapshotDiffPushes().empty());

// Check that we're not registering any dirty pages on the snapshot
faabric::util::SnapshotData& snap = reg.getSnapshot(snapshotKey);
REQUIRE(snap.getDirtyPages().empty());

// Now reset snapshot pushes of all kinds
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

// Make an edit to the snapshot memory and get the expected diffs
snap.data[0] = 9;
Expand All @@ -850,10 +849,10 @@ TEST_CASE_METHOD(TestExecutorFixture,
sch.awaitThreadResult(reqB->mutable_messages()->at(0).id());

// Check the full snapshot hasn't been pushed
REQUIRE(faabric::scheduler::getSnapshotPushes().empty());
REQUIRE(faabric::snapshot::getSnapshotPushes().empty());

// Check the diffs are pushed as expected
auto diffPushes = faabric::scheduler::getSnapshotDiffPushes();
auto diffPushes = faabric::snapshot::getSnapshotDiffPushes();
REQUIRE(diffPushes.size() == 1);
REQUIRE(diffPushes.at(0).first == otherHost);
std::vector<faabric::util::SnapshotDiff> actualDiffs =
Expand Down
8 changes: 4 additions & 4 deletions tests/test/scheduler/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/environment.h>
#include <faabric/util/func.h>
Expand Down Expand Up @@ -272,7 +272,7 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]")
REQUIRE(resRequestsOne.at(0).first == otherHost);

// Check snapshots have been pushed
auto snapshotPushes = faabric::scheduler::getSnapshotPushes();
auto snapshotPushes = faabric::snapshot::getSnapshotPushes();
if (expectedSnapshot.empty()) {
REQUIRE(snapshotPushes.empty());
} else {
Expand Down Expand Up @@ -749,7 +749,7 @@ TEST_CASE_METHOD(SlowExecutorFixture,
for (auto h : expectedHosts) {
expectedDeleteRequests.push_back({ h, snapKey });
};
auto actualDeleteRequests = faabric::scheduler::getSnapshotDeletes();
auto actualDeleteRequests = faabric::snapshot::getSnapshotDeletes();

REQUIRE(actualDeleteRequests == expectedDeleteRequests);
}
Expand Down Expand Up @@ -791,7 +791,7 @@ TEST_CASE_METHOD(SlowExecutorFixture,
}

// Check the results have been pushed along with the thread result
auto actualResults = faabric::scheduler::getThreadResults();
auto actualResults = faabric::snapshot::getThreadResults();

REQUIRE(actualResults.size() == 1);
REQUIRE(actualResults.at(0).first == "otherHost");
Expand Down
8 changes: 4 additions & 4 deletions tests/test/scheduler/test_snapshot_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

#include <sys/mman.h>

#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/scheduler/SnapshotServer.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/snapshot/SnapshotServer.h>
#include <faabric/util/config.h>
#include <faabric/util/environment.h>
#include <faabric/util/gids.h>
Expand All @@ -22,8 +22,8 @@ class SnapshotClientServerFixture
, public SnapshotTestFixture
{
protected:
faabric::scheduler::SnapshotServer server;
faabric::scheduler::SnapshotClient cli;
faabric::snapshot::SnapshotServer server;
faabric::snapshot::SnapshotClient cli;

public:
SnapshotClientServerFixture()
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SchedulerTestFixture
faabric::util::setTestMode(true);

faabric::scheduler::clearMockRequests();
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

sch.shutdown();
sch.addHostToGlobalSet();
Expand All @@ -80,7 +80,7 @@ class SchedulerTestFixture
faabric::util::setTestMode(true);

faabric::scheduler::clearMockRequests();
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

sch.shutdown();
sch.addHostToGlobalSet();
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/system_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/MpiWorldRegistry.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/scheduler/SnapshotClient.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/state/State.h>
#include <faabric/util/testing.h>
Expand Down Expand Up @@ -50,7 +50,7 @@ void cleanFaabric()
faabric::util::setTestMode(true);
faabric::util::setMockMode(false);
faabric::scheduler::clearMockRequests();
faabric::scheduler::clearMockSnapshotRequests();
faabric::snapshot::clearMockSnapshotRequests();

// Set up dummy executor factory
std::shared_ptr<faabric::scheduler::ExecutorFactory> fac =
Expand Down

0 comments on commit a51938e

Please sign in to comment.