Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging in transport layer #121

Merged
merged 67 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6860a6d
Increase wait time on flush test
Shillaker Jun 18, 2021
bac7026
Small tidy-up in endpoint code
Shillaker Jun 18, 2021
7a183a1
Add timeout on close and catch-all error handling
Shillaker Jun 18, 2021
4dfc42d
Formatting
Shillaker Jun 18, 2021
1f5f000
Lazy-init global message context and don't shut down
Shillaker Jun 18, 2021
d38dd03
Remove need to close global message context
Shillaker Jun 18, 2021
9cb6b89
Fix up timeout test
Shillaker Jun 18, 2021
db3ad14
Removed catch-all error handling
Shillaker Jun 18, 2021
c746876
Remove message context argument from open and close
Shillaker Jun 21, 2021
b985bc7
Use stlib in Message object
Shillaker Jun 21, 2021
f40e12e
Remove MessageContext object
Shillaker Jun 22, 2021
9ee0e11
Make SocketType a property on the message endpoint
Shillaker Jun 22, 2021
9a3349a
Error checking around use of send and recv sockets
Shillaker Jun 22, 2021
880af93
Remove the need for persist on messages
Shillaker Jun 22, 2021
4b6fd9e
Restore simple class wrapper around context for shutdown
Shillaker Jun 22, 2021
d4e8f41
Close context from main thread
Shillaker Jun 22, 2021
2ee0ef4
Switch to raii sockets
Shillaker Jun 23, 2021
c2af35e
Remove MessageEndpointClient
Shillaker Jun 23, 2021
d038e8e
Explicitly open and close message context
Shillaker Jun 23, 2021
317bf5e
Simplify global context
Shillaker Jun 23, 2021
b3bcc5d
Reinstate timeout test
Shillaker Jun 23, 2021
1daeb8e
Move sendResponse into RecvMessageEndpoint
Shillaker Jun 23, 2021
d4be278
Default reply port
Shillaker Jun 23, 2021
96d2bf3
Add linger
Shillaker Jun 23, 2021
763594a
Self review
Shillaker Jun 23, 2021
a5c57f3
Added global context init/ close where necessary
Shillaker Jun 23, 2021
acfbe48
Formatting
Shillaker Jun 23, 2021
a0df2b0
Merge branch 'master' into hanging
Shillaker Jun 23, 2021
ed1b04b
Remove dummy state server
Shillaker Jun 23, 2021
97a1e38
Remove custom stop methods on snapshot and function call servers
Shillaker Jun 23, 2021
e00978a
Formatting
Shillaker Jun 24, 2021
80754d4
Starting req/rep refactor
Shillaker Jun 24, 2021
bae864e
Req/rep part 2
Shillaker Jun 24, 2021
0fe4ac3
Tests now running (and hanging)
Shillaker Jun 24, 2021
3e79318
Switch to using client class instead of sockets directly
Shillaker Jun 25, 2021
49b1410
Remove unused macros
Shillaker Jun 25, 2021
371091a
Explicitly set sync/async ports
Shillaker Jun 25, 2021
46f9d8d
Split MPI world broadcasting of ranks into separate function
Shillaker Jun 25, 2021
f2e9113
Move send/ recv host ranks to MpiWorld
Shillaker Jun 25, 2021
01ec57d
Clear up MPI worlds in tests, lazy init recv ranks hosts socket
Shillaker Jun 25, 2021
7f617bf
Thread-local sockets for rank-to-host mappings
Shillaker Jun 25, 2021
6c15fcb
Detailed logging of send/recv
Shillaker Jun 25, 2021
1d702f9
Switch local/remote in remote world tests
Shillaker Jun 25, 2021
2d88beb
Clear up all thread-local sockets when destroying MPI world
Shillaker Jun 28, 2021
705a29c
Fix remote world test hanging
Shillaker Jun 28, 2021
6026ffc
Switch to SLEEP_MS macro
Shillaker Jun 28, 2021
b656c49
Formatting
Shillaker Jun 28, 2021
6e089d0
Self review and restart dummy state server on initial error
Shillaker Jun 28, 2021
ec9d58d
Remove unnecessary threads in transport tests
Shillaker Jun 28, 2021
f9d45ba
Lengthen all timeouts
Shillaker Jun 28, 2021
e267134
Retry connecting socket
Shillaker Jun 28, 2021
8bb3888
Avoid arbitrary sleeps in tests
Shillaker Jun 28, 2021
6bebb61
Add timeout on barrier
Shillaker Jun 29, 2021
51fd3d4
Long-lived shutdown endpoints in server
Shillaker Jun 29, 2021
4de6f27
Added latch to async server
Shillaker Jun 29, 2021
9b0cbaf
Guard against null pointers and avoid memcpying
Shillaker Jun 29, 2021
ff00429
Share latch via shared pointer
Shillaker Jun 29, 2021
413ce6e
Typos
Shillaker Jun 29, 2021
4e6dbf2
Link util to transport
Shillaker Jun 29, 2021
b4d8cc7
Move global message context handling out of FaabricMain
Shillaker Jun 29, 2021
7584527
Add retry logic in servers, unify server threads into signle class
Shillaker Jun 30, 2021
19a7d6e
Fix port mixup and remove unused macros
Shillaker Jun 30, 2021
bf6b15c
Remove use thread-local cache in scheduler
Shillaker Jul 1, 2021
b3327e9
Rename retry macro
Shillaker Jul 5, 2021
fd9ad8a
Move latch constructor
Shillaker Jul 5, 2021
1f71e38
Move FB macro
Shillaker Jul 5, 2021
8d7565d
Message server interface to buffers
Shillaker Jul 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallApi.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/config.h>

Expand Down
11 changes: 4 additions & 7 deletions include/faabric/transport/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@ class Message

Message();

~Message();

char* data();

uint8_t* udata();

std::vector<uint8_t> dataCopy();
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved

int size();

bool more();

void persist();

private:
uint8_t* msg;
int _size;
std::vector<uint8_t> bytes;

bool _more;
bool _persist;
};
}
35 changes: 5 additions & 30 deletions include/faabric/transport/MessageContext.h
Original file line number Diff line number Diff line change
@@ -1,40 +1,15 @@
#pragma once

#include <shared_mutex>
#include <zmq.hpp>

namespace faabric::transport {
/* Wrapper around zmq::context_t
*
/*
* The context object is thread safe, and the constructor parameter indicates
* the number of hardware IO threads to be used. As a rule of thumb, use one
* IO thread per Gbps of data.
*/
class MessageContext
{
public:
MessageContext();

// Message context should not be copied as there must only be one ZMQ
// context
MessageContext(const MessageContext& ctx) = delete;

MessageContext(int overrideIoThreads);

~MessageContext();

zmq::context_t ctx;
#define ZMQ_CONTEXT_IO_THREADS 1

zmq::context_t& get();

/* Close the message context
*
* In 0MQ terms, this method calls close() on the context, which in turn
* first shuts down (i.e. stop blocking operations) and then closes.
*/
void close();

bool isContextShutDown;
};

faabric::transport::MessageContext& getGlobalMessageContext();
namespace faabric::transport {
std::shared_ptr<zmq::context_t> getGlobalMessageContext();
}
24 changes: 16 additions & 8 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <google/protobuf/message.h>

#include <faabric/transport/Message.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/util/exception.h>

#include <thread>
Expand All @@ -19,6 +18,9 @@
#define DEFAULT_RECV_TIMEOUT_MS 20000
#define DEFAULT_SEND_TIMEOUT_MS 20000

// The monitor is checking an asynchronous event has completed, so can be short
#define MONITOR_TIMEOUT_MS 2000

namespace faabric::transport {
enum class SocketType
{
Expand All @@ -36,7 +38,9 @@ enum class SocketType
class MessageEndpoint
{
public:
MessageEndpoint(const std::string& hostIn, int portIn);
MessageEndpoint(SocketType socketTypeIn,
const std::string& hostIn,
int portIn);

// Message endpoints shouldn't be assigned as ZeroMQ sockets are not thread
// safe
Expand All @@ -47,11 +51,9 @@ class MessageEndpoint

~MessageEndpoint();

void open(faabric::transport::MessageContext& context,
faabric::transport::SocketType sockTypeIn,
bool bind);
void open();

void close(bool bind);
void close();

void send(uint8_t* serialisedMsg, size_t msgSize, bool more = false);

Expand All @@ -70,15 +72,21 @@ class MessageEndpoint
void setSendTimeoutMs(int value);

protected:
const SocketType socketType;
const std::string host;
const int port;
const std::string address;
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
std::thread::id tid;
int id;

int recvTimeoutMs = DEFAULT_RECV_TIMEOUT_MS;
int sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS;

void validateTimeout(int value);

Message recvBuffer(int size);

Message recvNoBuffer();
};

/* Send and Recv Message Endpoints */
Expand All @@ -88,7 +96,7 @@ class SendMessageEndpoint : public MessageEndpoint
public:
SendMessageEndpoint(const std::string& hostIn, int portIn);

void open(MessageContext& context);
void open();

void close();
};
Expand All @@ -98,7 +106,7 @@ class RecvMessageEndpoint : public MessageEndpoint
public:
RecvMessageEndpoint(int portIn);

void open(MessageContext& context);
void open();

void close();
};
Expand Down
20 changes: 3 additions & 17 deletions include/faabric/transport/MessageEndpointServer.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <faabric/transport/Message.h>
#include <faabric/transport/MessageContext.h>
#include <faabric/transport/MessageEndpoint.h>
#include <faabric/transport/MessageEndpointClient.h>

Expand All @@ -22,27 +21,12 @@ class MessageEndpointServer
public:
MessageEndpointServer(int portIn);

/* Start and stop the server
*
* Generic methods to start and stop a message endpoint server. They take
* a, thread-safe, 0MQ context as an argument. The stop method will block
* until _all_ sockets within the context have been closed. Sockets blocking
* on a `recv` will be interrupted with ETERM upon context closure.
*/
void start(faabric::transport::MessageContext& context);

void stop(faabric::transport::MessageContext& context);

/* Common start and stop entrypoint
*
* Call the generic methods with the default global message context.
*/
void start();

virtual void stop();

protected:
int recv(faabric::transport::RecvMessageEndpoint& endpoint);
bool recv();

/* Template function to handle message reception
*
Expand All @@ -67,6 +51,8 @@ class MessageEndpointServer
private:
const int port;

std::unique_ptr<RecvMessageEndpoint> recvEndpoint = nullptr;

std::thread servingThread;
};
}
2 changes: 1 addition & 1 deletion src/scheduler/FunctionCallClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void clearMockRequests()
FunctionCallClient::FunctionCallClient(const std::string& hostIn)
: faabric::transport::MessageEndpointClient(hostIn, FUNCTION_CALL_PORT)
{
this->open(faabric::transport::getGlobalMessageContext());
this->open();
}

void FunctionCallClient::sendHeader(faabric::scheduler::FunctionCalls call)
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ void FunctionCallServer::stop()
faabric::scheduler::getScheduler().closeFunctionCallClients();

// Call the parent stop
MessageEndpointServer::stop(faabric::transport::getGlobalMessageContext());
MessageEndpointServer::stop();
}

void FunctionCallServer::doRecv(faabric::transport::Message& header,
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void clearMockSnapshotRequests()
SnapshotClient::SnapshotClient(const std::string& hostIn)
: faabric::transport::MessageEndpointClient(hostIn, SNAPSHOT_PORT)
{
this->open(faabric::transport::getGlobalMessageContext());
this->open();
}

void SnapshotClient::sendHeader(faabric::scheduler::SnapshotCalls call)
Expand Down
18 changes: 12 additions & 6 deletions src/scheduler/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <faabric/util/func.h>
#include <faabric/util/logging.h>

#include <sys/mman.h>

namespace faabric::scheduler {
SnapshotServer::SnapshotServer()
: faabric::transport::MessageEndpointServer(SNAPSHOT_PORT)
Expand All @@ -18,7 +20,7 @@ void SnapshotServer::stop()
faabric::scheduler::getScheduler().closeSnapshotClients();

// Call the parent stop
MessageEndpointServer::stop(faabric::transport::getGlobalMessageContext());
MessageEndpointServer::stop();
}

void SnapshotServer::doRecv(faabric::transport::Message& header,
Expand Down Expand Up @@ -60,12 +62,16 @@ void SnapshotServer::recvPushSnapshot(faabric::transport::Message& msg)
// Set up the snapshot
faabric::util::SnapshotData data;
data.size = r->contents()->size();
data.data = r->mutable_contents()->Data();
reg.takeSnapshot(r->key()->str(), data, true);

// Note that now the snapshot data is owned by Faabric and will be deleted
// later, so we don't want the message to delete it
msg.persist();
// TODO - avoid this copy by changing server superclass to allow subclasses
// to provide a buffer to receive data.
// TODO - work out snapshot ownership here, how do we know when to delete
// this data?
data.data = (uint8_t*)mmap(
nullptr, data.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
std::memcpy(data.data, r->mutable_contents()->Data(), data.size);

reg.takeSnapshot(r->key()->str(), data, true);

// Send response
faabric::EmptyResponse response;
Expand Down
2 changes: 1 addition & 1 deletion src/state/StateClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ StateClient::StateClient(const std::string& userIn,
, host(hostIn)
, reg(state::getInMemoryStateRegistry())
{
this->open(faabric::transport::getGlobalMessageContext());
this->open();
}

void StateClient::sendHeader(faabric::state::StateCalls call)
Expand Down
38 changes: 12 additions & 26 deletions src/transport/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,42 @@

namespace faabric::transport {
Message::Message(const zmq::message_t& msgIn)
: _size(msgIn.size())
: bytes(msgIn.size())
, _more(msgIn.more())
, _persist(false)
{
msg = reinterpret_cast<uint8_t*>(malloc(_size * sizeof(uint8_t)));
memcpy(msg, msgIn.data(), _size);
memcpy(bytes.data(), msgIn.data(), msgIn.size());
}

Message::Message(int sizeIn)
: _size(sizeIn)
: bytes(sizeIn)
, _more(false)
, _persist(false)
{
msg = reinterpret_cast<uint8_t*>(malloc(_size * sizeof(uint8_t)));
}
{}

// Empty message signals shutdown
Message::Message()
: msg(nullptr)
{}
Message::Message() {}

Message::~Message()
char* Message::data()
{
if (!_persist) {
free(reinterpret_cast<void*>(msg));
}
return reinterpret_cast<char*>(bytes.data());
}

char* Message::data()
uint8_t* Message::udata()
{
return reinterpret_cast<char*>(msg);
return bytes.data();
}

uint8_t* Message::udata()
std::vector<uint8_t> Message::dataCopy()
{
return msg;
return bytes;
}

int Message::size()
{
return _size;
return bytes.size();
}

bool Message::more()
{
return _more;
}

void Message::persist()
{
_persist = true;
}
}
Loading