Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging in transport layer #121

Merged
merged 67 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6860a6d
Increase wait time on flush test
Shillaker Jun 18, 2021
bac7026
Small tidy-up in endpoint code
Shillaker Jun 18, 2021
7a183a1
Add timeout on close and catch-all error handling
Shillaker Jun 18, 2021
4dfc42d
Formatting
Shillaker Jun 18, 2021
1f5f000
Lazy-init global message context and don't shut down
Shillaker Jun 18, 2021
d38dd03
Remove need to close global message context
Shillaker Jun 18, 2021
9cb6b89
Fix up timeout test
Shillaker Jun 18, 2021
db3ad14
Removed catch-all error handling
Shillaker Jun 18, 2021
c746876
Remove message context argument from open and close
Shillaker Jun 21, 2021
b985bc7
Use stlib in Message object
Shillaker Jun 21, 2021
f40e12e
Remove MessageContext object
Shillaker Jun 22, 2021
9ee0e11
Make SocketType a property on the message endpoint
Shillaker Jun 22, 2021
9a3349a
Error checking around use of send and recv sockets
Shillaker Jun 22, 2021
880af93
Remove the need for persist on messages
Shillaker Jun 22, 2021
4b6fd9e
Restore simple class wrapper around context for shutdown
Shillaker Jun 22, 2021
d4e8f41
Close context from main thread
Shillaker Jun 22, 2021
2ee0ef4
Switch to raii sockets
Shillaker Jun 23, 2021
c2af35e
Remove MessageEndpointClient
Shillaker Jun 23, 2021
d038e8e
Explicitly open and close message context
Shillaker Jun 23, 2021
317bf5e
Simplify global context
Shillaker Jun 23, 2021
b3bcc5d
Reinstate timeout test
Shillaker Jun 23, 2021
1daeb8e
Move sendResponse into RecvMessageEndpoint
Shillaker Jun 23, 2021
d4be278
Default reply port
Shillaker Jun 23, 2021
96d2bf3
Add linger
Shillaker Jun 23, 2021
763594a
Self review
Shillaker Jun 23, 2021
a5c57f3
Added global context init/ close where necessary
Shillaker Jun 23, 2021
acfbe48
Formatting
Shillaker Jun 23, 2021
a0df2b0
Merge branch 'master' into hanging
Shillaker Jun 23, 2021
ed1b04b
Remove dummy state server
Shillaker Jun 23, 2021
97a1e38
Remove custom stop methods on snapshot and function call servers
Shillaker Jun 23, 2021
e00978a
Formatting
Shillaker Jun 24, 2021
80754d4
Starting req/rep refactor
Shillaker Jun 24, 2021
bae864e
Req/rep part 2
Shillaker Jun 24, 2021
0fe4ac3
Tests now running (and hanging)
Shillaker Jun 24, 2021
3e79318
Switch to using client class instead of sockets directly
Shillaker Jun 25, 2021
49b1410
Remove unused macros
Shillaker Jun 25, 2021
371091a
Explicitly set sync/async ports
Shillaker Jun 25, 2021
46f9d8d
Split MPI world broadcasting of ranks into separate function
Shillaker Jun 25, 2021
f2e9113
Move send/ recv host ranks to MpiWorld
Shillaker Jun 25, 2021
01ec57d
Clear up MPI worlds in tests, lazy init recv ranks hosts socket
Shillaker Jun 25, 2021
7f617bf
Thread-local sockets for rank-to-host mappings
Shillaker Jun 25, 2021
6c15fcb
Detailed logging of send/recv
Shillaker Jun 25, 2021
1d702f9
Switch local/remote in remote world tests
Shillaker Jun 25, 2021
2d88beb
Clear up all thread-local sockets when destroying MPI world
Shillaker Jun 28, 2021
705a29c
Fix remote world test hanging
Shillaker Jun 28, 2021
6026ffc
Switch to SLEEP_MS macro
Shillaker Jun 28, 2021
b656c49
Formatting
Shillaker Jun 28, 2021
6e089d0
Self review and restart dummy state server on initial error
Shillaker Jun 28, 2021
ec9d58d
Remove unnecessary threads in transport tests
Shillaker Jun 28, 2021
f9d45ba
Lengthen all timeouts
Shillaker Jun 28, 2021
e267134
Retry connecting socket
Shillaker Jun 28, 2021
8bb3888
Avoid arbitrary sleeps in tests
Shillaker Jun 28, 2021
6bebb61
Add timeout on barrier
Shillaker Jun 29, 2021
51fd3d4
Long-lived shutdown endpoints in server
Shillaker Jun 29, 2021
4de6f27
Added latch to async server
Shillaker Jun 29, 2021
9b0cbaf
Guard against null pointers and avoid memcpying
Shillaker Jun 29, 2021
ff00429
Share latch via shared pointer
Shillaker Jun 29, 2021
413ce6e
Typos
Shillaker Jun 29, 2021
4e6dbf2
Link util to transport
Shillaker Jun 29, 2021
b4d8cc7
Move global message context handling out of FaabricMain
Shillaker Jun 29, 2021
7584527
Add retry logic in servers, unify server threads into signle class
Shillaker Jun 30, 2021
19a7d6e
Fix port mixup and remove unused macros
Shillaker Jun 30, 2021
bf6b15c
Remove use thread-local cache in scheduler
Shillaker Jul 1, 2021
b3327e9
Rename retry macro
Shillaker Jul 5, 2021
fd9ad8a
Move latch constructor
Shillaker Jul 5, 2021
1f71e38
Move FB macro
Shillaker Jul 5, 2021
8d7565d
Message server interface to buffers
Shillaker Jul 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/config.h>

Expand All @@ -13,13 +13,13 @@ namespace faabric::scheduler {
// -----------------------------------
std::vector<std::pair<std::string, faabric::Message>> getFunctionCalls();

std::vector<std::pair<std::string, faabric::ResponseRequest>> getFlushCalls();
std::vector<std::pair<std::string, faabric::EmptyRequest>> getFlushCalls();

std::vector<
std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>>
getBatchRequests();

std::vector<std::pair<std::string, faabric::ResponseRequest>>
std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
Expand Down Expand Up @@ -47,7 +47,7 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(const faabric::UnregisterRequest& req);
void unregister(faabric::UnregisterRequest& req);

private:
void sendHeader(faabric::scheduler::FunctionCalls call);
Expand Down
21 changes: 10 additions & 11 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/transport/MessageEndpointServer.h>

namespace faabric::scheduler {
Expand All @@ -13,24 +12,24 @@ class FunctionCallServer final
public:
FunctionCallServer();

void stop() override;

private:
Scheduler& scheduler;

void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;

/* Function call server API */
std::unique_ptr<google::protobuf::Message> doSyncRecv(
faabric::transport::Message& header,
faabric::transport::Message& body) override;

void recvFlush(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvFlush(
faabric::transport::Message& body);

void recvExecuteFunctions(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvGetResources(
faabric::transport::Message& body);

void recvGetResources(faabric::transport::Message& body);
void recvExecuteFunctions(faabric::transport::Message& body);

void recvUnregister(faabric::transport::Message& body);

void recvSetThreadResult(faabric::transport::Message& body);
};
}
25 changes: 18 additions & 7 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <faabric/util/timing.h>

#include <atomic>
#include <unordered_map>

namespace faabric::scheduler {
typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
Expand All @@ -22,12 +23,11 @@ class MpiWorld

void create(const faabric::Message& call, int newId, int newSize);

void initialiseFromMsg(const faabric::Message& msg,
bool forceLocal = false);
void broadcastHostsToRanks();

std::string getHostForRank(int rank);
void initialiseFromMsg(const faabric::Message& msg);

void setAllRankHostsPorts(const faabric::MpiHostsToRanksMessage& msg);
std::string getHostForRank(int rank);

std::string getUser();

Expand Down Expand Up @@ -181,12 +181,12 @@ class MpiWorld
double getWTime();

private:
int id;
int size;
int id = -1;
int size = -1;
std::string thisHost;
int basePort = DEFAULT_MPI_BASE_PORT;
faabric::util::TimePoint creationTime;

std::shared_mutex worldMutex;
std::atomic_flag isDestroyed = false;

std::string user;
Expand All @@ -208,18 +208,29 @@ class MpiWorld
std::vector<int> basePorts;
std::vector<int> initLocalBasePorts(
const std::vector<std::string>& executedAt);

void initRemoteMpiEndpoint(int localRank, int remoteRank);

std::pair<int, int> getPortForRanks(int localRank, int remoteRank);

void sendRemoteMpiMessage(int sendRank,
int recvRank,
const std::shared_ptr<faabric::MPIMessage>& msg);

std::shared_ptr<faabric::MPIMessage> recvRemoteMpiMessage(int sendRank,
int recvRank);

faabric::MpiHostsToRanksMessage recvMpiHostRankMsg();

void sendMpiHostRankMsg(const std::string& hostIn,
const faabric::MpiHostsToRanksMessage msg);

void closeMpiMessageEndpoints();

// Support for asyncrhonous communications
std::shared_ptr<MpiMessageBuffer> getUnackedMessageBuffer(int sendRank,
int recvRank);

std::shared_ptr<faabric::MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);
Expand Down
4 changes: 0 additions & 4 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

void closeFunctionCallClients();

void closeSnapshotClients();

private:
std::string thisHost;

Expand Down
1 change: 1 addition & 0 deletions include/faabric/scheduler/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/flat/faabric_generated.h>
#include <faabric/scheduler/SnapshotApi.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/snapshot.h>

Expand Down
18 changes: 10 additions & 8 deletions include/faabric/scheduler/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
public:
SnapshotServer();

void stop() override;

protected:
void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;

/* Snapshot server API */
std::unique_ptr<google::protobuf::Message> doSyncRecv(
faabric::transport::Message& header,
faabric::transport::Message& body) override;

void recvPushSnapshot(faabric::transport::Message& msg);
std::unique_ptr<google::protobuf::Message> recvPushSnapshot(
faabric::transport::Message& msg);

void recvDeleteSnapshot(faabric::transport::Message& msg);
std::unique_ptr<google::protobuf::Message> recvPushSnapshotDiffs(
faabric::transport::Message& msg);

void recvPushSnapshotDiffs(faabric::transport::Message& msg);
void recvDeleteSnapshot(faabric::transport::Message& msg);

void recvThreadResult(faabric::transport::Message& msg);

Expand Down
31 changes: 0 additions & 31 deletions include/faabric/state/DummyStateServer.h

This file was deleted.

18 changes: 3 additions & 15 deletions include/faabric/state/StateClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/state/InMemoryStateRegistry.h>
#include <faabric/state/State.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>

namespace faabric::state {
Expand All @@ -15,11 +16,6 @@ class StateClient : public faabric::transport::MessageEndpointClient

const std::string user;
const std::string key;
const std::string host;

InMemoryStateRegistry& reg;

/* External state client API */

void pushChunks(const std::vector<StateChunk>& chunks);

Expand All @@ -41,16 +37,8 @@ class StateClient : public faabric::transport::MessageEndpointClient
void unlock();

private:
void sendHeader(faabric::state::StateCalls call);

// Block, but ignore return value
faabric::transport::Message awaitResponse();

void sendStateRequest(faabric::state::StateCalls header, bool expectReply);

void sendStateRequest(faabric::state::StateCalls header,
const uint8_t* data = nullptr,
int length = 0,
bool expectReply = false);
const uint8_t* data,
int length);
};
}
37 changes: 25 additions & 12 deletions include/faabric/state/StateServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,40 @@ class StateServer final : public faabric::transport::MessageEndpointServer
private:
State& state;

void doRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;
void doAsyncRecv(faabric::transport::Message& header,
faabric::transport::Message& body) override;

/* State server API */
std::unique_ptr<google::protobuf::Message> doSyncRecv(
faabric::transport::Message& header,
faabric::transport::Message& body) override;

void recvSize(faabric::transport::Message& body);
// Sync methods

void recvPull(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvSize(
faabric::transport::Message& body);

void recvPush(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPull(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels strange that sometimes we use our own Message wrapper, and other times' protobuf's.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good point, I'll revisit this and see if I can standardise it a bit. I'd like the rule to be: our Message wrapper inside the transport layer, and protobuf/ flatbuffers everywhere else.

faabric::transport::Message& body);

void recvAppend(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPush(
faabric::transport::Message& body);

void recvPullAppended(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvAppend(
faabric::transport::Message& body);

void recvClearAppended(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvPullAppended(
faabric::transport::Message& body);

void recvDelete(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvClearAppended(
faabric::transport::Message& body);

void recvLock(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvDelete(
faabric::transport::Message& body);

void recvUnlock(faabric::transport::Message& body);
std::unique_ptr<google::protobuf::Message> recvLock(
faabric::transport::Message& body);

std::unique_ptr<google::protobuf::Message> recvUnlock(
faabric::transport::Message& body);
};
}
11 changes: 4 additions & 7 deletions include/faabric/transport/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@ class Message

Message();

~Message();

char* data();

uint8_t* udata();

std::vector<uint8_t> dataCopy();
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved

int size();

bool more();

void persist();

private:
uint8_t* msg;
int _size;
std::vector<uint8_t> bytes;

bool _more;
bool _persist;
};
}
40 changes: 0 additions & 40 deletions include/faabric/transport/MessageContext.h

This file was deleted.

Loading