-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix hanging in transport layer #121
Conversation
@@ -67,6 +67,8 @@ class MessageEndpointServer | |||
private: | |||
const int port; | |||
|
|||
std::unique_ptr<RecvMessageEndpoint> endpoint = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fell out of the refactor away from using the message context to shut down. Now the endpoint server can manage its own endpoint internally.
} catch (zmq::error_t& e) { | ||
SPDLOG_ERROR("Error sending message: {}", e.what()); | ||
throw; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if
/ else
was essentially doing the same send
operation in both branches, just with different send_flags
, so I merged it into one.
// Print default message and rethrow | ||
SPDLOG_ERROR("Error receiving message: {} ({})", e.num(), e.what()); | ||
throw; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make things simpler to parse I split recv
up into two methods based on whether a size
is provided.
src/state/StateKeyValue.cpp
Outdated
@@ -559,7 +559,7 @@ uint32_t StateKeyValue::waitOnRedisRemoteLock(const std::string& redisKey) | |||
break; | |||
} | |||
|
|||
std::this_thread::sleep_for(std::chrono::milliseconds(1)); | |||
SLEEP_MS(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was dealing with so many sleeps and writing usleep(123 * 1000)
that I made a macro.
|
||
virtual void stop(); | ||
void awaitAsyncLatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These setAsyncLatch
and awaitAsyncLatch
functions are to support testing. Tests frequently check the side effects of sending an async message by sending it, then performing some arbitrary sleep before checking that it's done. This allows us to avoid the sleep and the inherent flakiness of that approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a beast of a PR, and what a grueling effort as well! Fingers crossed we can now stop re-running the tests until they pass 🤣
Seeing this finished, the changes are actually quite easy to follow.
I've left very minor comments. I guess my biggest concern is with managing a lot of google::protobuf::Message
objects, which is not very clear to me why. Other than that, nicely done, LGTM 👍
include/faabric/state/StateServer.h
Outdated
|
||
void recvPush(faabric::transport::Message& body); | ||
std::unique_ptr<google::protobuf::Message> recvPull( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels strange that sometimes we use our own Message
wrapper, and other times' protobuf's.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's a good point, I'll revisit this and see if I can standardise it a bit. I'd like the rule to be: our Message
wrapper inside the transport layer, and protobuf/ flatbuffers everywhere else.
faabric::transport::Message& header, | ||
faabric::transport::Message& body) = 0; | ||
|
||
void sendSyncResponse(google::protobuf::Message* resp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we recv
a faabric::transport::Message
yet send a google::protobuf::Message
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, as said above I'll look at this and standardise the interface.
src/scheduler/SnapshotClient.cpp
Outdated
uint8_t* buffer = mb.GetBufferPointer(); \ | ||
int size = mb.GetSize(); \ | ||
send(buffer, size); | ||
#define SEND_FB_MSG(T, mb) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this live with the other macros? We may eventually need it elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good point.
@@ -3,49 +3,62 @@ | |||
#include <faabric/scheduler/SnapshotServer.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments I made to the other server re. function order and return type also apply here.
This PR attempts to fix the issues causing the application to hang indefinitely. This was apparently due to us making some mistakes with the lifecycle of ZeroMQ sockets in our transport layer, and also the use of fragile timeouts in our tests.
AFAICT we must ensure that:
Previously we were creating multiple contexts which made it difficult to reason about these rules, so this PR switches to using only one. We were using ephemeral sockets which would occasionally break the second rule, i.e. be destructed before their communication was complete. Finally we had the ZeroMQ contexts living in a global scope, as well as sockets related to MPI living in a thread-local global scope; these could not be removed, but we now make sure they are cleared up properly.
The changes in this PR are:
MessageEndpointServer
andMessageEndpointClient
to have two sockets each, one for async, one for sync.SendMessageEndpoint
/RecvMessageEndpoint
) to four (SyncSendMessageEndpoint
,AsyncSendMessageEndpoint
,SyncRecvMessageEndpoint
,AsyncRecvMessageEndpoint
).MessageEndpointServer
now have to implementdoAsyncRecv
anddoSyncRecv
accordingly.doSyncRecv
returns the response message. Subclasses ofMessageEndpointClient
can callsyncSend
andasyncSend
, withsyncSend
taking a pointer to a response which gets populated.MessageContext
wrapper object and just use the basiczmq::context_t
. Without the need to use contexts for closing things down, we just need a shared pointer to the global context.MessageEndpoint::open
method and merge the opening of the socket into the constructor (it was always called straight afterwards anyway).Message
s by copying the message body in the snapshot server. This simplifies theMessage
class and means we can use a stdlibvector
rather than a raw pointer, which removes complexity around message lifecycle. We can introduce a zero-copy recv approach for the snapshot server in a later PR.MessageEndpoint
subclasses.DummyStateServer
used in tests, moved to a simpler test fixture in a single test file. This removes another moving part in the tests (and makes debugging the state tests much easier).REQUIRE_RETRY
macro used in tests.Barrier
class to a simplerLatch
shared between threads via ashared_ptr
.MPI-specific:
MpiWorld
class and avoid ephemeral sockets.We now manage the zmq context in a global
shared_ptr
which works fine until it comes to destruction. It seems that cppzmq doesn't like being destructed so late in the application lifecycle and prints an error message along the lines of:The solution to this is to explicitly close the context from the
main
method. This means we'll have to add the opening and closing of the zmq context to anymain
method using Faabric for now. I've added a check ingetGlobalMessageContext
which errors if it hasn't been initialised, which should prompt us into adding it where necessary.