Skip to content

Commit

Permalink
Push snapshot diffs before reset, and thread results after (#126)
Browse files Browse the repository at this point in the history
* Check if we have registered hosts before doing snapshot pushes

* Started splitting thread results and snapshot diffs

* Fix up compilation and tests

* Small rearrange

* Update comment

* Push snapshot diffs whenever pending

* Add checks for zero size snapshots

* Slight rework of conditional around snapshots in scheduler

* Added test for snapshots from child threads

* Correct wait statement in test

* Small comment update

* Started experimenting with other pagetable entries

* Reverting changes moved to other PR

* Add missing subtype and context to chained batch call

* Force slots on dist test server
  • Loading branch information
Shillaker authored Jul 13, 2021
1 parent a51938e commit 4691f18
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 250 deletions.
8 changes: 5 additions & 3 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class Executor

std::atomic<bool> claimed = false;

std::atomic<bool> pendingSnapshotPush = false;

std::atomic<int> executingTaskCount = 0;

std::mutex threadsMutex;
Expand Down Expand Up @@ -117,9 +119,9 @@ class Scheduler

void setThreadResult(const faabric::Message& msg, int32_t returnValue);

void setThreadResult(const faabric::Message& msg,
int32_t returnValue,
const std::vector<faabric::util::SnapshotDiff>& diffs);
void pushSnapshotDiffs(
const faabric::Message& msg,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void setThreadResultLocally(uint32_t msgId, int32_t returnValue);

Expand Down
12 changes: 1 addition & 11 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ getSnapshotDiffPushes();

std::vector<std::pair<std::string, std::string>> getSnapshotDeletes();

std::vector<std::pair<std::string,
std::tuple<uint32_t,
int,
std::string,
std::vector<faabric::util::SnapshotDiff>>>>
std::vector<std::pair<std::string, std::pair<uint32_t, int>>>
getThreadResults();

void clearMockSnapshotRequests();
Expand All @@ -50,12 +46,6 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

void pushThreadResult(uint32_t messageId, int returnValue);

void pushThreadResult(
uint32_t messageId,
int returnValue,
const std::string& snapshotKey,
const std::vector<faabric::util::SnapshotDiff>& diffs);

private:
void sendHeader(faabric::snapshot::SnapshotCalls call);
};
Expand Down
5 changes: 0 additions & 5 deletions include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,5 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);

void recvThreadResult(const uint8_t* buffer, size_t bufferSize);

private:
void applyDiffsToSnapshot(
const std::string& snapshotKey,
const flatbuffers::Vector<flatbuffers::Offset<SnapshotDiffChunk>>* diffs);
};
}
2 changes: 0 additions & 2 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,4 @@ table SnapshotDiffPushRequest {
table ThreadResultRequest {
message_id:int;
return_value:int;
key:string;
chunks:[SnapshotDiffChunk];
}
43 changes: 20 additions & 23 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ void Executor::executeTasks(std::vector<int> msgIdxs,

if (isSnapshot && !alreadyRestored) {
if ((!isMaster && isThreads) || !isThreads) {
SPDLOG_DEBUG(
"Performing snapshot restore {} [{}]", funcStr, snapshotKey);
SPDLOG_DEBUG("Restoring {} from snapshot {}", funcStr, snapshotKey);
lastSnapshot = snapshotKey;
restore(firstMsg);
} else {
Expand All @@ -124,6 +123,7 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Note this must be done after the restore has happened
if (isThreads && isSnapshot) {
faabric::util::resetDirtyTracking();
pendingSnapshotPush = true;
}

// Set executing task count
Expand Down Expand Up @@ -161,7 +161,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
SPDLOG_DEBUG("Thread pool thread {}:{} starting up", id, threadPoolIdx);

auto& sch = faabric::scheduler::getScheduler();
auto& conf = faabric::util::getSystemConfig();
const auto& conf = faabric::util::getSystemConfig();

bool selfShutdown = false;

Expand Down Expand Up @@ -196,20 +196,24 @@ void Executor::threadPoolThread(int threadPoolIdx)
assert(req->messages_size() >= msgIdx + 1);
faabric::Message& msg = req->mutable_messages()->at(msgIdx);

SPDLOG_TRACE("Thread {}:{} executing task {} ({})",
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
SPDLOG_TRACE("Thread {}:{} executing task {} ({}, thread={})",
id,
threadPoolIdx,
msgIdx,
msg.id());
msg.id(),
isThreads);

int32_t returnValue;
try {
returnValue = executeTask(threadPoolIdx, msgIdx, req);
} catch (const std::exception& ex) {
returnValue = 1;

msg.set_outputdata(fmt::format(
"Task {} threw exception. What: {}", msg.id(), ex.what()));
std::string errorMessage = fmt::format(
"Task {} threw exception. What: {}", msg.id(), ex.what());
SPDLOG_ERROR(errorMessage);
msg.set_outputdata(errorMessage);
}

// Set the return value
Expand All @@ -221,21 +225,21 @@ void Executor::threadPoolThread(int threadPoolIdx)
bool isLastTask = oldTaskCount == 1;

SPDLOG_TRACE("Task {} finished by thread {}:{} ({} left)",
msg.id(),
faabric::util::funcToString(msg, true),
id,
threadPoolIdx,
executingTaskCount);
oldTaskCount - 1);

// Get snapshot diffs _before_ we reset the executor
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
std::vector<faabric::util::SnapshotDiff> diffs;
if (isLastTask && isThreads) {
// Handle snapshot diffs _before_ we reset the executor
if (isLastTask && pendingSnapshotPush) {
// Get diffs
faabric::util::SnapshotData d = snapshot();
diffs = d.getDirtyPages();
std::vector<faabric::util::SnapshotDiff> diffs = d.getDirtyPages();
sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking now that we've got the diffs
// Reset dirty page tracking now that we've pushed the diffs
faabric::util::resetDirtyTracking();
pendingSnapshotPush = false;
}

// If this batch is finished, reset the executor and release its claim.
Expand All @@ -255,14 +259,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
// on its result to continue execution, therefore must be done once the
// executor has been reset, otherwise the executor may not be reused for
// a repeat invocation.
if (isLastTask && isThreads) {
// Send diffs along with thread result
SPDLOG_DEBUG("Task {} finished, returning {} snapshot diffs",
msg.id(),
diffs.size());

sch.setThreadResult(msg, returnValue, diffs);
} else if (isThreads) {
if (isThreads) {
// Set non-final thread result
sch.setThreadResult(msg, returnValue);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer,
PARSE_MSG(faabric::BatchExecuteRequest, buffer, bufferSize)

// This host has now been told to execute these functions no matter what
// TODO - avoid this copy
scheduler.callFunctions(std::make_shared<faabric::BatchExecuteRequest>(msg),
true);
}
Expand Down
79 changes: 41 additions & 38 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,40 +257,43 @@ std::vector<std::string> Scheduler::callFunctions(
// This ensures everything is up to date, and we don't have to
// maintain different records of which hosts hold which updates.
faabric::util::SnapshotData snapshotData;
std::vector<faabric::util::SnapshotDiff> snapshotDiffs;
std::string snapshotKey = firstMsg.snapshotkey();
bool snapshotNeeded =
req->type() == req->THREADS || req->type() == req->PROCESSES;

if (snapshotNeeded && snapshotKey.empty()) {
SPDLOG_ERROR("No snapshot provided for {}", funcStr);
throw std::runtime_error(
"Empty snapshot for distributed threads/ processes");
}

if (snapshotNeeded) {
if (snapshotKey.empty()) {
SPDLOG_ERROR("No snapshot provided for {}", funcStr);
throw std::runtime_error(
"Empty snapshot for distributed threads/ processes");
}

snapshotData =
faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey);
snapshotDiffs = snapshotData.getDirtyPages();

// Do the snapshot diff pushing
if (!snapshotDiffs.empty()) {
for (const auto& h : thisRegisteredHosts) {
SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}",
snapshotDiffs.size(),
funcStr,
h);
SnapshotClient& c = getSnapshotClient(h);
c.pushSnapshotDiffs(snapshotKey, snapshotDiffs);

if (!thisRegisteredHosts.empty()) {
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapshotData.getDirtyPages();

// Do the snapshot diff pushing
if (!snapshotDiffs.empty()) {
for (const auto& h : thisRegisteredHosts) {
SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}",
snapshotDiffs.size(),
funcStr,
h);
SnapshotClient& c = getSnapshotClient(h);
c.pushSnapshotDiffs(snapshotKey, snapshotDiffs);
}
}
}

// Now reset the dirty page tracking, as we want the next batch of
// diffs to contain everything from now on (including the updates
// sent back from all the threads)
SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}",
funcStr);
faabric::util::resetDirtyTracking();
// Now reset the dirty page tracking, as we want the next batch
// of diffs to contain everything from now on (including the
// updates sent back from all the threads)
SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}",
funcStr);
faabric::util::resetDirtyTracking();
}
}

// Work out how many we can handle locally
Expand Down Expand Up @@ -513,6 +516,8 @@ int Scheduler::scheduleFunctionsOnHost(
faabric::util::batchExecFactory();
hostRequest->set_snapshotkey(req->snapshotkey());
hostRequest->set_type(req->type());
hostRequest->set_subtype(req->subtype());
hostRequest->set_contextdata(req->contextdata());

// Add messages
int nOnThisHost = std::min<int>(available, remainder);
Expand Down Expand Up @@ -699,27 +704,25 @@ void Scheduler::registerThread(uint32_t msgId)
void Scheduler::setThreadResult(const faabric::Message& msg,
int32_t returnValue)
{
std::vector<faabric::util::SnapshotDiff> empty;
setThreadResult(msg, returnValue, empty);
bool isMaster = msg.masterhost() == conf.endpointHost;

if (isMaster) {
setThreadResultLocally(msg.id(), returnValue);
} else {
SnapshotClient& c = getSnapshotClient(msg.masterhost());
c.pushThreadResult(msg.id(), returnValue);
}
}

void Scheduler::setThreadResult(
void Scheduler::pushSnapshotDiffs(
const faabric::Message& msg,
int32_t returnValue,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
bool isMaster = msg.masterhost() == conf.endpointHost;

if (isMaster) {
setThreadResultLocally(msg.id(), returnValue);
} else {
if (!isMaster && !diffs.empty()) {
SnapshotClient& c = getSnapshotClient(msg.masterhost());

if (diffs.empty()) {
c.pushThreadResult(msg.id(), returnValue);
} else {
c.pushThreadResult(msg.id(), returnValue, msg.snapshotkey(), diffs);
}
c.pushSnapshotDiffs(msg.snapshotkey(), diffs);
}
}

Expand Down
70 changes: 14 additions & 56 deletions src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ static std::vector<

static std::vector<std::pair<std::string, std::string>> snapshotDeletes;

static std::vector<
std::pair<std::string,
std::tuple<uint32_t,
int,
std::string,
std::vector<faabric::util::SnapshotDiff>>>>
static std::vector<std::pair<std::string, std::pair<uint32_t, int>>>
threadResults;

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
Expand All @@ -48,12 +43,7 @@ std::vector<std::pair<std::string, std::string>> getSnapshotDeletes()
return snapshotDeletes;
}

std::vector<std::pair<std::string,
std::tuple<uint32_t,
int,
std::string,
std::vector<faabric::util::SnapshotDiff>>>>
getThreadResults()
std::vector<std::pair<std::string, std::pair<uint32_t, int>>> getThreadResults()
{
return threadResults;
}
Expand All @@ -79,7 +69,12 @@ SnapshotClient::SnapshotClient(const std::string& hostIn)
void SnapshotClient::pushSnapshot(const std::string& key,
const faabric::util::SnapshotData& data)
{
SPDLOG_DEBUG("Pushing snapshot {} to {}", key, host);
if (data.size == 0) {
SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host);
throw std::runtime_error("Pushing snapshot with zero size");
}

SPDLOG_DEBUG("Pushing snapshot {} to {} ({} bytes)", key, host, data.size);

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
Expand Down Expand Up @@ -154,57 +149,20 @@ void SnapshotClient::deleteSnapshot(const std::string& key)
}

void SnapshotClient::pushThreadResult(uint32_t messageId, int returnValue)
{
std::vector<faabric::util::SnapshotDiff> empty;
pushThreadResult(messageId, returnValue, "", empty);
}

void SnapshotClient::pushThreadResult(
uint32_t messageId,
int returnValue,
const std::string& snapshotKey,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
threadResults.emplace_back(std::make_pair(
host, std::make_tuple(messageId, returnValue, snapshotKey, diffs)));
threadResults.emplace_back(
std::make_pair(host, std::make_pair(messageId, returnValue)));

} else {
flatbuffers::FlatBufferBuilder mb;
flatbuffers::Offset<ThreadResultRequest> requestOffset;

if (!diffs.empty()) {
SPDLOG_DEBUG(
"Sending thread result for {} to {} (plus {} snapshot diffs)",
messageId,
host,
diffs.size());

// Create objects for the diffs
std::vector<flatbuffers::Offset<SnapshotDiffChunk>> diffsFbVector;
for (const auto& d : diffs) {
auto dataOffset = mb.CreateVector<uint8_t>(d.data, d.size);
auto chunk = CreateSnapshotDiffChunk(mb, d.offset, dataOffset);
diffsFbVector.push_back(chunk);
}

// Create message with diffs
auto diffsOffset = mb.CreateVector(diffsFbVector);

auto keyOffset = mb.CreateString(snapshotKey);
requestOffset = CreateThreadResultRequest(
mb, messageId, returnValue, keyOffset, diffsOffset);
} else {
SPDLOG_DEBUG(
"Sending thread result for {} to {} (with no snapshot diffs)",
messageId,
host);

// Create message without diffs
requestOffset =
CreateThreadResultRequest(mb, messageId, returnValue);
}
SPDLOG_DEBUG("Sending thread result for {} to {}", messageId, host);

// Create message without diffs
requestOffset = CreateThreadResultRequest(mb, messageId, returnValue);

mb.Finish(requestOffset);
SEND_FB_MSG_ASYNC(SnapshotCalls::ThreadResult, mb)
Expand Down
Loading

0 comments on commit 4691f18

Please sign in to comment.