Skip to content

Commit

Permalink
Fix hanging in transport layer (#121)
Browse files Browse the repository at this point in the history
* Increase wait time on flush test

* Small tidy-up in endpoint code

* Add timeout on close and catch-all error handling

* Formatting

* Lazy-init global message context and don't shut down

* Remove need to close global message context

* Fix up timeout test

* Removed catch-all error handling

* Remove message context argument from open and close

* Use stlib in Message object

* Remove MessageContext object

* Make SocketType a property on the message endpoint

* Error checking around use of send and recv sockets

* Remove the need for persist on messages

* Restore simple class wrapper around context for shutdown

* Close context from main thread

* Switch to raii sockets

* Remove MessageEndpointClient

* Explicitly open and close message context

* Simplify global context

* Reinstate timeout test

* Move sendResponse into RecvMessageEndpoint

* Default reply port

* Add linger

* Self review

* Added global context init/ close where necessary

* Formatting

* Remove dummy state server

* Remove custom stop methods on snapshot and function call servers

* Formatting

* Starting req/rep refactor

* Req/rep part 2

* Tests now running (and hanging)

* Switch to using client class instead of sockets directly

* Remove unused macros

* Explicitly set sync/async ports

* Split MPI world broadcasting of ranks into separate function

* Move send/ recv host ranks to MpiWorld

* Clear up MPI worlds in tests, lazy init recv ranks hosts socket

* Thread-local sockets for rank-to-host mappings

* Detailed logging of send/recv

* Switch local/remote in remote world tests

* Clear up all thread-local sockets when destroying MPI world

* Fix remote world test hanging

* Switch to SLEEP_MS macro

* Formatting

* Self review and restart dummy state server on initial error

* Remove unnecessary threads in transport tests

* Lengthen all timeouts

* Retry connecting socket

* Avoid arbitrary sleeps in tests

* Add timeout on barrier

* Long-lived shutdown endpoints in server

* Added latch to async server

* Guard against null pointers and avoid memcpying

* Share latch via shared pointer

* Typos

* Link util to transport

* Move global message context handling out of FaabricMain

* Add retry logic in servers, unify server threads into signle class

* Fix port mixup and remove unused macros

* Remove use thread-local cache in scheduler

* Rename retry macro

* Move latch constructor

* Move FB macro

* Message server interface to buffers
  • Loading branch information
Shillaker authored Jul 5, 2021
1 parent 608967e commit 63affca
Show file tree
Hide file tree
Showing 74 changed files with 2,523 additions and 2,526 deletions.
3 changes: 3 additions & 0 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/transport/context.h>
#include <faabric/util/logging.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -38,6 +39,7 @@ class ExampleExecutorFactory : public ExecutorFactory
int main()
{
faabric::util::initLogging();
faabric::transport::initGlobalMessageContext();

// Start the worker pool
SPDLOG_INFO("Starting executor pool in the background");
Expand All @@ -53,6 +55,7 @@ int main()

SPDLOG_INFO("Shutting down endpoint");
m.shutdown();
faabric::transport::closeGlobalMessageContext();

return EXIT_SUCCESS;
}
8 changes: 4 additions & 4 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/config.h>

Expand All @@ -13,13 +13,13 @@ namespace faabric::scheduler {
// -----------------------------------
std::vector<std::pair<std::string, faabric::Message>> getFunctionCalls();

std::vector<std::pair<std::string, faabric::ResponseRequest>> getFlushCalls();
std::vector<std::pair<std::string, faabric::EmptyRequest>> getFlushCalls();

std::vector<
std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>>
getBatchRequests();

std::vector<std::pair<std::string, faabric::ResponseRequest>>
std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
Expand Down Expand Up @@ -47,7 +47,7 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(const faabric::UnregisterRequest& req);
void unregister(faabric::UnregisterRequest& req);

private:
void sendHeader(faabric::scheduler::FunctionCalls call);
Expand Down
24 changes: 12 additions & 12 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/transport/MessageEndpointServer.h>

namespace faabric::scheduler {
Expand All @@ -13,24 +12,25 @@ class FunctionCallServer final
public:
FunctionCallServer();

void stop() override;

private:
Scheduler& scheduler;

void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;

/* Function call server API */
void doAsyncRecv(int header,
const uint8_t* buffer,
size_t bufferSize) override;

void recvFlush(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message>
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override;

void recvExecuteFunctions(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvFlush(const uint8_t* buffer,
size_t bufferSize);

void recvGetResources(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvGetResources(
const uint8_t* buffer,
size_t bufferSize);

void recvUnregister(faabric::transport::Message& body);
void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvSetThreadResult(faabric::transport::Message& body);
void recvUnregister(const uint8_t* buffer, size_t bufferSize);
};
}
25 changes: 18 additions & 7 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <faabric/util/timing.h>

#include <atomic>
#include <unordered_map>

namespace faabric::scheduler {
typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
Expand All @@ -22,12 +23,11 @@ class MpiWorld

void create(const faabric::Message& call, int newId, int newSize);

void initialiseFromMsg(const faabric::Message& msg,
bool forceLocal = false);
void broadcastHostsToRanks();

std::string getHostForRank(int rank);
void initialiseFromMsg(const faabric::Message& msg);

void setAllRankHostsPorts(const faabric::MpiHostsToRanksMessage& msg);
std::string getHostForRank(int rank);

std::string getUser();

Expand Down Expand Up @@ -181,12 +181,12 @@ class MpiWorld
double getWTime();

private:
int id;
int size;
int id = -1;
int size = -1;
std::string thisHost;
int basePort = DEFAULT_MPI_BASE_PORT;
faabric::util::TimePoint creationTime;

std::shared_mutex worldMutex;
std::atomic_flag isDestroyed = false;

std::string user;
Expand All @@ -208,18 +208,29 @@ class MpiWorld
std::vector<int> basePorts;
std::vector<int> initLocalBasePorts(
const std::vector<std::string>& executedAt);

void initRemoteMpiEndpoint(int localRank, int remoteRank);

std::pair<int, int> getPortForRanks(int localRank, int remoteRank);

void sendRemoteMpiMessage(int sendRank,
int recvRank,
const std::shared_ptr<faabric::MPIMessage>& msg);

std::shared_ptr<faabric::MPIMessage> recvRemoteMpiMessage(int sendRank,
int recvRank);

faabric::MpiHostsToRanksMessage recvMpiHostRankMsg();

void sendMpiHostRankMsg(const std::string& hostIn,
const faabric::MpiHostsToRanksMessage msg);

void closeMpiMessageEndpoints();

// Support for asyncrhonous communications
std::shared_ptr<MpiMessageBuffer> getUnackedMessageBuffer(int sendRank,
int recvRank);

std::shared_ptr<faabric::MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);
Expand Down
12 changes: 2 additions & 10 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class Scheduler

void reset();

void resetThreadLocalCache();

void shutdown();

void broadcastSnapshotDelete(const faabric::Message& msg,
Expand Down Expand Up @@ -168,10 +170,6 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

void closeFunctionCallClients();

void closeSnapshotClients();

private:
std::string thisHost;

Expand All @@ -186,15 +184,9 @@ class Scheduler

std::unordered_map<uint32_t, std::promise<int32_t>> threadResults;

std::shared_mutex functionCallClientsMx;
std::unordered_map<std::string, faabric::scheduler::FunctionCallClient>
functionCallClients;
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

std::shared_mutex snapshotClientsMx;
std::unordered_map<std::string, faabric::scheduler::SnapshotClient>
snapshotClients;
faabric::scheduler::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

Expand Down
1 change: 1 addition & 0 deletions include/faabric/scheduler/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/snapshot.h>

Expand Down
22 changes: 13 additions & 9 deletions include/faabric/scheduler/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
public:
SnapshotServer();

void stop() override;

protected:
void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(int header,
const uint8_t* buffer,
size_t bufferSize) override;

/* Snapshot server API */
std::unique_ptr<google::protobuf::Message>
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override;

void recvPushSnapshot(faabric::transport::Message& msg);
std::unique_ptr<google::protobuf::Message> recvPushSnapshot(
const uint8_t* buffer,
size_t bufferSize);

void recvDeleteSnapshot(faabric::transport::Message& msg);
std::unique_ptr<google::protobuf::Message> recvPushSnapshotDiffs(
const uint8_t* buffer,
size_t bufferSize);

void recvPushSnapshotDiffs(faabric::transport::Message& msg);
void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);

void recvThreadResult(faabric::transport::Message& msg);
void recvThreadResult(const uint8_t* buffer, size_t bufferSize);

private:
void applyDiffsToSnapshot(
Expand Down
31 changes: 0 additions & 31 deletions include/faabric/state/DummyStateServer.h

This file was deleted.

18 changes: 3 additions & 15 deletions include/faabric/state/StateClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/state/InMemoryStateRegistry.h>
#include <faabric/state/State.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>

namespace faabric::state {
Expand All @@ -15,11 +16,6 @@ class StateClient : public faabric::transport::MessageEndpointClient

const std::string user;
const std::string key;
const std::string host;

InMemoryStateRegistry& reg;

/* External state client API */

void pushChunks(const std::vector<StateChunk>& chunks);

Expand All @@ -41,16 +37,8 @@ class StateClient : public faabric::transport::MessageEndpointClient
void unlock();

private:
void sendHeader(faabric::state::StateCalls call);

// Block, but ignore return value
faabric::transport::Message awaitResponse();

void sendStateRequest(faabric::state::StateCalls header, bool expectReply);

void sendStateRequest(faabric::state::StateCalls header,
const uint8_t* data = nullptr,
int length = 0,
bool expectReply = false);
const uint8_t* data,
int length);
};
}
39 changes: 27 additions & 12 deletions include/faabric/state/StateServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,42 @@ class StateServer final : public faabric::transport::MessageEndpointServer
private:
State& state;

void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(int header,
const uint8_t* buffer,
size_t bufferSize) override;

/* State server API */
std::unique_ptr<google::protobuf::Message>
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override;

void recvSize(faabric::transport::Message& body);
// Sync methods

void recvPull(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvSize(const uint8_t* buffer,
size_t bufferSize);

void recvPush(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPull(const uint8_t* buffer,
size_t bufferSize);

void recvAppend(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPush(const uint8_t* buffer,
size_t bufferSize);

void recvPullAppended(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvAppend(const uint8_t* buffer,
size_t bufferSize);

void recvClearAppended(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPullAppended(
const uint8_t* buffer,
size_t bufferSize);

void recvDelete(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvClearAppended(
const uint8_t* buffer,
size_t bufferSize);

void recvLock(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvDelete(const uint8_t* buffer,
size_t bufferSize);

void recvUnlock(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvLock(const uint8_t* buffer,
size_t bufferSize);

std::unique_ptr<google::protobuf::Message> recvUnlock(const uint8_t* buffer,
size_t bufferSize);
};
}
Loading

0 comments on commit 63affca

Please sign in to comment.