Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in-proc PAIR sockets #206

Merged
merged 4 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ class SyncRecvMessageEndpoint final : public RecvMessageEndpoint
void sendResponse(const uint8_t* data, int size);
};

class AsyncDirectRecvEndpoint final : public RecvMessageEndpoint
{
public:
AsyncDirectRecvEndpoint(const std::string& inprocLabel,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

std::optional<Message> recv(int size = 0) override;
};

class AsyncDirectSendEndpoint final : public MessageEndpoint
{
public:
AsyncDirectSendEndpoint(const std::string& inProcLabel,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

void send(const uint8_t* data, size_t dataSize, bool more = false);

zmq::socket_t socket;
};

class MessageTimeoutException final : public faabric::util::FaabricException
{
public:
Expand Down
48 changes: 48 additions & 0 deletions src/transport/MessageEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ zmq::socket_t socketFactory(zmq::socket_type socketType,
CATCH_ZMQ_ERR_RETRY_ONCE(socket.bind(address), "bind")
break;
}
case zmq::socket_type::pair: {
SPDLOG_TRACE("Bind socket: pair {} (timeout {}ms)",
address,
timeoutMs);
CATCH_ZMQ_ERR_RETRY_ONCE(socket.bind(address), "bind")
break;
}
case zmq::socket_type::pub: {
SPDLOG_TRACE(
"Bind socket: pub {} (timeout {}ms)", address, timeoutMs);
Expand Down Expand Up @@ -123,6 +130,13 @@ zmq::socket_t socketFactory(zmq::socket_type socketType,
}
case (MessageEndpointConnectType::CONNECT): {
switch (socketType) {
case zmq::socket_type::pair: {
SPDLOG_TRACE("Connect socket: pair {} (timeout {}ms)",
address,
timeoutMs);
CATCH_ZMQ_ERR_RETRY_ONCE(socket.connect(address), "connect")
break;
}
case zmq::socket_type::pull: {
SPDLOG_TRACE("Connect socket: pull {} (timeout {}ms)",
address,
Expand Down Expand Up @@ -559,4 +573,38 @@ void SyncRecvMessageEndpoint::sendResponse(const uint8_t* data, int size)
SPDLOG_TRACE("REP {} ({} bytes)", address, size);
doSend(socket, data, size, false);
}

// ----------------------------------------------
// INTERNAL DIRECT MESSAGE ENDPOINTS
// ----------------------------------------------

AsyncDirectRecvEndpoint::AsyncDirectRecvEndpoint(const std::string& inprocLabel,
int timeoutMs)
: RecvMessageEndpoint(inprocLabel,
timeoutMs,
zmq::socket_type::pair,
MessageEndpointConnectType::BIND)
{}

std::optional<Message> AsyncDirectRecvEndpoint::recv(int size)
{
SPDLOG_TRACE("PAIR recv {} ({} bytes)", address, size);
return RecvMessageEndpoint::recv(size);
}

AsyncDirectSendEndpoint::AsyncDirectSendEndpoint(const std::string& inprocLabel,
int timeoutMs)
: MessageEndpoint("inproc://" + inprocLabel, timeoutMs)
{
socket =
setUpSocket(zmq::socket_type::pair, MessageEndpointConnectType::CONNECT);
}

void AsyncDirectSendEndpoint::send(const uint8_t* data,
size_t dataSize,
bool more)
{
SPDLOG_TRACE("PAIR send {} ({} bytes, more {})", address, dataSize, more);
doSend(socket, data, dataSize, more);
}
}
80 changes: 79 additions & 1 deletion tests/test/transport/test_message_endpoint_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <unistd.h>

#include <faabric/transport/MessageEndpoint.h>
#include <faabric/util/latch.h>
#include <faabric/util/macros.h>

using namespace faabric::transport;
Expand Down Expand Up @@ -224,6 +225,83 @@ TEST_CASE_METHOD(SchedulerTestFixture,
}
}

#endif
TEST_CASE_METHOD(SchedulerTestFixture, "Test direct messaging", "[transport]")
{
std::string expected = "Direct hello";
const uint8_t* msg = BYTES_CONST(expected.c_str());

std::string inprocLabel = "direct-test";

AsyncDirectSendEndpoint sender(inprocLabel, TEST_PORT);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaict TEST_PORT here is passed as the timeoutMs argument.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good spot.

sender.send(msg, expected.size());

AsyncDirectRecvEndpoint receiver(inprocLabel);

std::string actual;
SECTION("Recv with size")
{
faabric::transport::Message recvMsg =
receiver.recv(expected.size()).value();
actual = std::string(recvMsg.data(), recvMsg.size());
}

SECTION("Recv no size")
{
faabric::transport::Message recvMsg = receiver.recv().value();
actual = std::string(recvMsg.data(), recvMsg.size());
}

REQUIRE(actual == expected);
}

TEST_CASE_METHOD(SchedulerTestFixture,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my particular use case, I'd be interested in seeing a stress test with various threads producing and various threads consuming to different in proc labels (each producer matched to one consumer).

Feel free to ignore, but I think it could be a relevant stress test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, have updated.

"Stress test direct messaging",
"[transport]")
{
int nMessages = 1000;
std::string inprocLabel = "direct-test";

std::shared_ptr<faabric::util::Latch> startLatch =
faabric::util::Latch::create(2);

std::thread t([nMessages, inprocLabel, &startLatch] {
AsyncDirectSendEndpoint sender(inprocLabel, TEST_PORT);

for (int i = 0; i < nMessages; i++) {
std::string expected = "Direct hello " + std::to_string(i);
const uint8_t* msg = BYTES_CONST(expected.c_str());
sender.send(msg, expected.size());

if (i % 100 == 0) {
SLEEP_MS(10);
}

// Make main thread wait until messages are queued
if (i == 10) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to wait for messages to be queued? Does this mean the receiver thread won't block waiting for messages to be in place?

Copy link
Collaborator Author

@Shillaker Shillaker Dec 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receiver thread will block, it's just that under the hood the receiver is doing the bind and the sender is doing a connect. This test forces the sender to do a connect and send messages before the recevier binds (which doesn't work for certain types of sockets IIRC). Have added a comment in the code to this effect.

startLatch->wait();
}
}
});

// Wait for queued messages
startLatch->wait();

AsyncDirectRecvEndpoint receiver(inprocLabel);

// Receive messages
for (int i = 0; i < nMessages; i++) {
faabric::transport::Message recvMsg = receiver.recv().value();
std::string actual(recvMsg.data(), recvMsg.size());

std::string expected = "Direct hello " + std::to_string(i);
REQUIRE(actual == expected);
}

if (t.joinable()) {
t.join();
}
}

#endif // End ThreadSanitizer exclusion

}