Skip to content

Commit

Permalink
Add distributed coordination operations (#161)
Browse files Browse the repository at this point in the history
* Add distributed coordination operations

* Started renaming

* Merge distributed coordination into ptp broker

* Continuing fallout

* Continuing refactor

* More test fixes

* Fix normal tests

* Fixing up dist tests

* Tidy-up

* Improved logging

* Fix force-local issue

* Debugging barrier test

* Fix distributed tests

* Scheduler test

* Fix test for scheduler dispatching mappings

* Add notify distributed test

* Formatting

* Review comments

* Lengthen message endpoint server timeout

* Switch back to []

* Add extra request latch barrier
  • Loading branch information
Shillaker authored Oct 29, 2021
1 parent 8a70efd commit dc3d150
Show file tree
Hide file tree
Showing 42 changed files with 1,854 additions and 299 deletions.
18 changes: 11 additions & 7 deletions dist-test/dev_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ PROJ_ROOT=${THIS_DIR}/..
pushd ${PROJ_ROOT} > /dev/null

if [[ -z "$1" ]]; then
docker-compose \
up \
-d \
dist-test-server
docker-compose up -d dist-test-server
elif [[ "$1" == "restart" ]]; then
docker-compose restart dist-test-server
elif [[ "$1" == "stop" ]]; then
docker-compose stop dist-test-server
else
docker-compose \
restart \
dist-test-server
echo "Unrecognised argument: $1"
echo ""
echo "Usage:"
echo ""
echo "./dist-test/dev_server.sh [restart|stop]"
exit 1
fi

popd > /dev/null
6 changes: 5 additions & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ inv dev.cc faabric_dist_tests
inv dev.cc faabric_dist_test_server
```

In another terminal, start the server:
In another terminal, (re)start the server:

```bash
# Start
./dist-tests/dev_server.sh

# Restart
./dist-tests/dev_server.sh restart
```

Back in the CLI, you can then run the tests:
Expand Down
3 changes: 1 addition & 2 deletions include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ enum FunctionCalls
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4,
SetThreadResult = 5,
GetResources = 4
};
}
5 changes: 1 addition & 4 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
public:
explicit FunctionCallClient(const std::string& hostIn);

/* Function call client external API */

void sendFlush();

faabric::HostResources getResources();

void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);
void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);

Expand Down
51 changes: 32 additions & 19 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/transport/PointToPointBroker.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
Expand Down Expand Up @@ -188,55 +189,67 @@ class Scheduler

faabric::util::SystemConfig& conf;

std::shared_mutex mx;

// ---- Executors ----
std::vector<std::shared_ptr<Executor>> deadExecutors;

std::unordered_map<std::string, std::vector<std::shared_ptr<Executor>>>
executors;

std::shared_mutex mx;

// ---- Threads ----
std::unordered_map<uint32_t, std::promise<int32_t>> threadResults;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
localResults;

std::mutex localResultsMutex;

// ---- Clients ----
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

faabric::snapshot::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots;
std::set<std::string> availableHostsCache;
std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
localResults;
std::mutex localResultsMutex;
void updateHostResources();

std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
std::vector<std::pair<std::string, faabric::Message>>
recordedMessagesShared;
faabric::HostResources getHostResources(const std::string& host);

std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);
// ---- Actual scheduling ----
std::set<std::string> availableHostsCache;

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);

faabric::HostResources getHostResources(const std::string& host);

ExecGraphNode getFunctionExecGraphNode(unsigned int msgId);

void updateHostResources();
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

int scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot);

// ---- Accounting and debugging ----
std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
std::vector<std::pair<std::string, faabric::Message>>
recordedMessagesShared;

ExecGraphNode getFunctionExecGraphNode(unsigned int msgId);

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;
};

}
6 changes: 3 additions & 3 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ getThreadResults();
void clearMockSnapshotRequests();

// -----------------------------------
// gRPC client
// Client
// -----------------------------------

class SnapshotClient final : public faabric::transport::MessageEndpointClient
{
public:
explicit SnapshotClient(const std::string& hostIn);

/* Snapshot client external API */

void pushSnapshot(const std::string& key,
int32_t groupId,
const faabric::util::SnapshotData& data);

void pushSnapshotDiffs(std::string snapshotKey,
int32_t groupId,
std::vector<faabric::util::SnapshotDiff> diffs);

void deleteSnapshot(const std::string& key);
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotApi.h>
#include <faabric/transport/MessageEndpointServer.h>
#include <faabric/transport/PointToPointBroker.h>

namespace faabric::snapshot {
class SnapshotServer final : public faabric::transport::MessageEndpointServer
Expand All @@ -30,5 +31,8 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);

void recvThreadResult(const uint8_t* buffer, size_t bufferSize);

private:
faabric::transport::PointToPointBroker& broker;
};
}
7 changes: 4 additions & 3 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
#define ANY_HOST "0.0.0.0"

// These timeouts should be long enough to permit sending and receiving large
// messages, but short enough not to hang around when something has gone wrong.
#define DEFAULT_RECV_TIMEOUT_MS 20000
#define DEFAULT_SEND_TIMEOUT_MS 20000
// messages, note that they also determine the period on which endpoints will
// re-poll.
#define DEFAULT_RECV_TIMEOUT_MS 60000
#define DEFAULT_SEND_TIMEOUT_MS 60000

// How long undelivered messages will hang around when the socket is closed,
// which also determines how long the context will hang for when closing if
Expand Down
96 changes: 83 additions & 13 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
@@ -1,40 +1,112 @@
#pragma once

#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/util/config.h>
#include <faabric/util/scheduling.h>

#include <condition_variable>
#include <queue>
#include <set>
#include <shared_mutex>
#include <stack>
#include <string>
#include <unordered_map>
#include <vector>

#define DEFAULT_DISTRIBUTED_TIMEOUT_MS 30000

#define POINT_TO_POINT_MASTER_IDX 0

namespace faabric::transport {

class PointToPointBroker;

class PointToPointGroup
{
public:
static std::shared_ptr<PointToPointGroup> getGroup(int groupId);

static bool groupExists(int groupId);

static void addGroup(int appId, int groupId, int groupSize);

static void clear();

PointToPointGroup(int appId, int groupIdIn, int groupSizeIn);

void lock(int groupIdx, bool recursive);

void unlock(int groupIdx, bool recursive);

int getLockOwner(bool recursive);

void localLock();

void localUnlock();

bool localTryLock();

void barrier(int groupIdx);

void notify(int groupIdx);

int getNotifyCount();

private:
faabric::util::SystemConfig& conf;

int timeoutMs = DEFAULT_DISTRIBUTED_TIMEOUT_MS;

std::string masterHost;
int appId = 0;
int groupId = 0;
int groupSize = 0;

std::mutex mx;

// Transport
faabric::transport::PointToPointBroker& ptpBroker;

// Local lock
std::timed_mutex localMx;
std::recursive_timed_mutex localRecursiveMx;

// Distributed lock
std::stack<int> recursiveLockOwners;
int lockOwnerIdx = -1;
std::queue<int> lockWaiters;

void notifyLocked(int groupIdx);

void masterLock(int groupIdx, bool recursive);

void masterUnlock(int groupIdx, bool recursive);
};

class PointToPointBroker
{
public:
PointToPointBroker();

std::string getHostForReceiver(int appId, int recvIdx);
std::string getHostForReceiver(int groupId, int recvIdx);

std::set<std::string> setUpLocalMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void setAndSendMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void waitForMappingsOnThisHost(int appId);
void waitForMappingsOnThisHost(int groupId);

std::set<int> getIdxsRegisteredForApp(int appId);
std::set<int> getIdxsRegisteredForGroup(int groupId);

void sendMessage(int appId,
void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize);

std::vector<uint8_t> recvMessage(int appId, int sendIdx, int recvIdx);
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx);

void clear();

Expand All @@ -43,16 +115,14 @@ class PointToPointBroker
private:
std::shared_mutex brokerMutex;

std::unordered_map<int, std::set<int>> appIdxs;
std::unordered_map<int, std::set<int>> groupIdIdxsMap;
std::unordered_map<std::string, std::string> mappings;

std::unordered_map<int, bool> appMappingsFlags;
std::unordered_map<int, std::mutex> appMappingMutexes;
std::unordered_map<int, std::condition_variable> appMappingCvs;

std::shared_ptr<PointToPointClient> getClient(const std::string& host);
std::unordered_map<int, bool> groupMappingsFlags;
std::unordered_map<int, std::mutex> groupMappingMutexes;
std::unordered_map<int, std::condition_variable> groupMappingCvs;

faabric::scheduler::Scheduler& sch;
faabric::util::SystemConfig& conf;
};

PointToPointBroker& getPointToPointBroker();
Expand Down
6 changes: 5 additions & 1 deletion include/faabric/transport/PointToPointCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ namespace faabric::transport {
enum PointToPointCall
{
MAPPING = 0,
MESSAGE = 1
MESSAGE = 1,
LOCK_GROUP = 2,
LOCK_GROUP_RECURSIVE = 3,
UNLOCK_GROUP = 4,
UNLOCK_GROUP_RECURSIVE = 5,
};
}
Loading

0 comments on commit dc3d150

Please sign in to comment.