Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging in transport layer #121

Merged
merged 67 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6860a6d
Increase wait time on flush test
Shillaker Jun 18, 2021
bac7026
Small tidy-up in endpoint code
Shillaker Jun 18, 2021
7a183a1
Add timeout on close and catch-all error handling
Shillaker Jun 18, 2021
4dfc42d
Formatting
Shillaker Jun 18, 2021
1f5f000
Lazy-init global message context and don't shut down
Shillaker Jun 18, 2021
d38dd03
Remove need to close global message context
Shillaker Jun 18, 2021
9cb6b89
Fix up timeout test
Shillaker Jun 18, 2021
db3ad14
Removed catch-all error handling
Shillaker Jun 18, 2021
c746876
Remove message context argument from open and close
Shillaker Jun 21, 2021
b985bc7
Use stlib in Message object
Shillaker Jun 21, 2021
f40e12e
Remove MessageContext object
Shillaker Jun 22, 2021
9ee0e11
Make SocketType a property on the message endpoint
Shillaker Jun 22, 2021
9a3349a
Error checking around use of send and recv sockets
Shillaker Jun 22, 2021
880af93
Remove the need for persist on messages
Shillaker Jun 22, 2021
4b6fd9e
Restore simple class wrapper around context for shutdown
Shillaker Jun 22, 2021
d4e8f41
Close context from main thread
Shillaker Jun 22, 2021
2ee0ef4
Switch to raii sockets
Shillaker Jun 23, 2021
c2af35e
Remove MessageEndpointClient
Shillaker Jun 23, 2021
d038e8e
Explicitly open and close message context
Shillaker Jun 23, 2021
317bf5e
Simplify global context
Shillaker Jun 23, 2021
b3bcc5d
Reinstate timeout test
Shillaker Jun 23, 2021
1daeb8e
Move sendResponse into RecvMessageEndpoint
Shillaker Jun 23, 2021
d4be278
Default reply port
Shillaker Jun 23, 2021
96d2bf3
Add linger
Shillaker Jun 23, 2021
763594a
Self review
Shillaker Jun 23, 2021
a5c57f3
Added global context init/ close where necessary
Shillaker Jun 23, 2021
acfbe48
Formatting
Shillaker Jun 23, 2021
a0df2b0
Merge branch 'master' into hanging
Shillaker Jun 23, 2021
ed1b04b
Remove dummy state server
Shillaker Jun 23, 2021
97a1e38
Remove custom stop methods on snapshot and function call servers
Shillaker Jun 23, 2021
e00978a
Formatting
Shillaker Jun 24, 2021
80754d4
Starting req/rep refactor
Shillaker Jun 24, 2021
bae864e
Req/rep part 2
Shillaker Jun 24, 2021
0fe4ac3
Tests now running (and hanging)
Shillaker Jun 24, 2021
3e79318
Switch to using client class instead of sockets directly
Shillaker Jun 25, 2021
49b1410
Remove unused macros
Shillaker Jun 25, 2021
371091a
Explicitly set sync/async ports
Shillaker Jun 25, 2021
46f9d8d
Split MPI world broadcasting of ranks into separate function
Shillaker Jun 25, 2021
f2e9113
Move send/ recv host ranks to MpiWorld
Shillaker Jun 25, 2021
01ec57d
Clear up MPI worlds in tests, lazy init recv ranks hosts socket
Shillaker Jun 25, 2021
7f617bf
Thread-local sockets for rank-to-host mappings
Shillaker Jun 25, 2021
6c15fcb
Detailed logging of send/recv
Shillaker Jun 25, 2021
1d702f9
Switch local/remote in remote world tests
Shillaker Jun 25, 2021
2d88beb
Clear up all thread-local sockets when destroying MPI world
Shillaker Jun 28, 2021
705a29c
Fix remote world test hanging
Shillaker Jun 28, 2021
6026ffc
Switch to SLEEP_MS macro
Shillaker Jun 28, 2021
b656c49
Formatting
Shillaker Jun 28, 2021
6e089d0
Self review and restart dummy state server on initial error
Shillaker Jun 28, 2021
ec9d58d
Remove unnecessary threads in transport tests
Shillaker Jun 28, 2021
f9d45ba
Lengthen all timeouts
Shillaker Jun 28, 2021
e267134
Retry connecting socket
Shillaker Jun 28, 2021
8bb3888
Avoid arbitrary sleeps in tests
Shillaker Jun 28, 2021
6bebb61
Add timeout on barrier
Shillaker Jun 29, 2021
51fd3d4
Long-lived shutdown endpoints in server
Shillaker Jun 29, 2021
4de6f27
Added latch to async server
Shillaker Jun 29, 2021
9b0cbaf
Guard against null pointers and avoid memcpying
Shillaker Jun 29, 2021
ff00429
Share latch via shared pointer
Shillaker Jun 29, 2021
413ce6e
Typos
Shillaker Jun 29, 2021
4e6dbf2
Link util to transport
Shillaker Jun 29, 2021
b4d8cc7
Move global message context handling out of FaabricMain
Shillaker Jun 29, 2021
7584527
Add retry logic in servers, unify server threads into signle class
Shillaker Jun 30, 2021
19a7d6e
Fix port mixup and remove unused macros
Shillaker Jun 30, 2021
bf6b15c
Remove use thread-local cache in scheduler
Shillaker Jul 1, 2021
b3327e9
Rename retry macro
Shillaker Jul 5, 2021
fd9ad8a
Move latch constructor
Shillaker Jul 5, 2021
1f71e38
Move FB macro
Shillaker Jul 5, 2021
8d7565d
Message server interface to buffers
Shillaker Jul 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/transport/context.h>
#include <faabric/util/logging.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -38,6 +39,7 @@ class ExampleExecutorFactory : public ExecutorFactory
int main()
{
faabric::util::initLogging();
faabric::transport::initGlobalMessageContext();

// Start the worker pool
SPDLOG_INFO("Starting executor pool in the background");
Expand All @@ -53,6 +55,7 @@ int main()

SPDLOG_INFO("Shutting down endpoint");
m.shutdown();
faabric::transport::closeGlobalMessageContext();

return EXIT_SUCCESS;
}
8 changes: 4 additions & 4 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/config.h>

Expand All @@ -13,13 +13,13 @@ namespace faabric::scheduler {
// -----------------------------------
std::vector<std::pair<std::string, faabric::Message>> getFunctionCalls();

std::vector<std::pair<std::string, faabric::ResponseRequest>> getFlushCalls();
std::vector<std::pair<std::string, faabric::EmptyRequest>> getFlushCalls();

std::vector<
std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>>
getBatchRequests();

std::vector<std::pair<std::string, faabric::ResponseRequest>>
std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
Expand Down Expand Up @@ -47,7 +47,7 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(const faabric::UnregisterRequest& req);
void unregister(faabric::UnregisterRequest& req);

private:
void sendHeader(faabric::scheduler::FunctionCalls call);
Expand Down
24 changes: 12 additions & 12 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/transport/MessageEndpointServer.h>

namespace faabric::scheduler {
Expand All @@ -13,24 +12,25 @@ class FunctionCallServer final
public:
FunctionCallServer();

void stop() override;

private:
Scheduler& scheduler;

void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;

/* Function call server API */
void doAsyncRecv(int header,
const uint8_t* buffer,
size_t bufferSize) override;

void recvFlush(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message>
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override;

void recvExecuteFunctions(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvFlush(const uint8_t* buffer,
size_t bufferSize);

void recvGetResources(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvGetResources(
const uint8_t* buffer,
size_t bufferSize);

void recvUnregister(faabric::transport::Message& body);
void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvSetThreadResult(faabric::transport::Message& body);
void recvUnregister(const uint8_t* buffer, size_t bufferSize);
};
}
25 changes: 18 additions & 7 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <faabric/util/timing.h>

#include <atomic>
#include <unordered_map>

namespace faabric::scheduler {
typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
Expand All @@ -22,12 +23,11 @@ class MpiWorld

void create(const faabric::Message& call, int newId, int newSize);

void initialiseFromMsg(const faabric::Message& msg,
bool forceLocal = false);
void broadcastHostsToRanks();

std::string getHostForRank(int rank);
void initialiseFromMsg(const faabric::Message& msg);

void setAllRankHostsPorts(const faabric::MpiHostsToRanksMessage& msg);
std::string getHostForRank(int rank);

std::string getUser();

Expand Down Expand Up @@ -181,12 +181,12 @@ class MpiWorld
double getWTime();

private:
int id;
int size;
int id = -1;
int size = -1;
std::string thisHost;
int basePort = DEFAULT_MPI_BASE_PORT;
faabric::util::TimePoint creationTime;

std::shared_mutex worldMutex;
std::atomic_flag isDestroyed = false;

std::string user;
Expand All @@ -208,18 +208,29 @@ class MpiWorld
std::vector<int> basePorts;
std::vector<int> initLocalBasePorts(
const std::vector<std::string>& executedAt);

void initRemoteMpiEndpoint(int localRank, int remoteRank);

std::pair<int, int> getPortForRanks(int localRank, int remoteRank);

void sendRemoteMpiMessage(int sendRank,
int recvRank,
const std::shared_ptr<faabric::MPIMessage>& msg);

std::shared_ptr<faabric::MPIMessage> recvRemoteMpiMessage(int sendRank,
int recvRank);

faabric::MpiHostsToRanksMessage recvMpiHostRankMsg();

void sendMpiHostRankMsg(const std::string& hostIn,
const faabric::MpiHostsToRanksMessage msg);

void closeMpiMessageEndpoints();

// Support for asyncrhonous communications
std::shared_ptr<MpiMessageBuffer> getUnackedMessageBuffer(int sendRank,
int recvRank);

std::shared_ptr<faabric::MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);
Expand Down
12 changes: 2 additions & 10 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class Scheduler

void reset();

void resetThreadLocalCache();

void shutdown();

void broadcastSnapshotDelete(const faabric::Message& msg,
Expand Down Expand Up @@ -168,10 +170,6 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

void closeFunctionCallClients();

void closeSnapshotClients();

private:
std::string thisHost;

Expand All @@ -186,15 +184,9 @@ class Scheduler

std::unordered_map<uint32_t, std::promise<int32_t>> threadResults;

std::shared_mutex functionCallClientsMx;
std::unordered_map<std::string, faabric::scheduler::FunctionCallClient>
functionCallClients;
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

std::shared_mutex snapshotClientsMx;
std::unordered_map<std::string, faabric::scheduler::SnapshotClient>
snapshotClients;
faabric::scheduler::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

Expand Down
1 change: 1 addition & 0 deletions include/faabric/scheduler/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/snapshot.h>

Expand Down
22 changes: 13 additions & 9 deletions include/faabric/scheduler/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
public:
SnapshotServer();

void stop() override;

protected:
void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(int header,
const uint8_t* buffer,
size_t bufferSize) override;

/* Snapshot server API */
std::unique_ptr<google::protobuf::Message>
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override;

void recvPushSnapshot(faabric::transport::Message& msg);
std::unique_ptr<google::protobuf::Message> recvPushSnapshot(
const uint8_t* buffer,
size_t bufferSize);

void recvDeleteSnapshot(faabric::transport::Message& msg);
std::unique_ptr<google::protobuf::Message> recvPushSnapshotDiffs(
const uint8_t* buffer,
size_t bufferSize);

void recvPushSnapshotDiffs(faabric::transport::Message& msg);
void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);

void recvThreadResult(faabric::transport::Message& msg);
void recvThreadResult(const uint8_t* buffer, size_t bufferSize);

private:
void applyDiffsToSnapshot(
Expand Down
31 changes: 0 additions & 31 deletions include/faabric/state/DummyStateServer.h

This file was deleted.

18 changes: 3 additions & 15 deletions include/faabric/state/StateClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/state/InMemoryStateRegistry.h>
#include <faabric/state/State.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>

namespace faabric::state {
Expand All @@ -15,11 +16,6 @@ class StateClient : public faabric::transport::MessageEndpointClient

const std::string user;
const std::string key;
const std::string host;

InMemoryStateRegistry& reg;

/* External state client API */

void pushChunks(const std::vector<StateChunk>& chunks);

Expand All @@ -41,16 +37,8 @@ class StateClient : public faabric::transport::MessageEndpointClient
void unlock();

private:
void sendHeader(faabric::state::StateCalls call);

// Block, but ignore return value
faabric::transport::Message awaitResponse();

void sendStateRequest(faabric::state::StateCalls header, bool expectReply);

void sendStateRequest(faabric::state::StateCalls header,
const uint8_t* data = nullptr,
int length = 0,
bool expectReply = false);
const uint8_t* data,
int length);
};
}
39 changes: 27 additions & 12 deletions include/faabric/state/StateServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,42 @@ class StateServer final : public faabric::transport::MessageEndpointServer
private:
State& state;

void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(int header,
const uint8_t* buffer,
size_t bufferSize) override;

/* State server API */
std::unique_ptr<google::protobuf::Message>
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override;

void recvSize(faabric::transport::Message& body);
// Sync methods

void recvPull(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvSize(const uint8_t* buffer,
size_t bufferSize);

void recvPush(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPull(const uint8_t* buffer,
size_t bufferSize);

void recvAppend(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPush(const uint8_t* buffer,
size_t bufferSize);

void recvPullAppended(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvAppend(const uint8_t* buffer,
size_t bufferSize);

void recvClearAppended(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPullAppended(
const uint8_t* buffer,
size_t bufferSize);

void recvDelete(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvClearAppended(
const uint8_t* buffer,
size_t bufferSize);

void recvLock(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvDelete(const uint8_t* buffer,
size_t bufferSize);

void recvUnlock(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvLock(const uint8_t* buffer,
size_t bufferSize);

std::unique_ptr<google::protobuf::Message> recvUnlock(const uint8_t* buffer,
size_t bufferSize);
};
}
Loading