Skip to content

Commit

Permalink
agents: implement data handle ring buffer
Browse files Browse the repository at this point in the history
Add a basic `RingBuffer<T>` implementation to `nsolid_util.h` with
accompanying tests.

Use it for ZMQ `data` channel messages, so we're able to buffer up to
`MAX_DATA_BUFFER_SIZE` messages in case of channel disconnection. Fixes
problems observed while load testing the agent, where data messages
were generated before the actual `data` channel connection was
established and were actually dropped.
  • Loading branch information
santigimeno committed Jan 24, 2024
1 parent 13c420e commit 63ebaa6
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 59 deletions.
124 changes: 66 additions & 58 deletions agents/zmq/src/zmq_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ inline void DebugEvent(uint16_t event, const ZmqHandle& handle) {
}
}

const int MAX_DATA_BUFFER_SIZE = 100;
const int MAX_AUTH_RETRIES = 20;
const char* kNSOLID_AUTH_URL = "NSOLID_AUTH_URL";
const char* NSOLID_AUTH_URL =
Expand Down Expand Up @@ -652,6 +653,7 @@ ZmqAgent::ZmqAgent()
context_(),
command_handle_(nullptr),
data_handle_(nullptr),
data_ring_buffer_(MAX_DATA_BUFFER_SIZE),
bulk_handle_(nullptr),
version_(4),
proc_metrics_(),
Expand Down Expand Up @@ -1091,11 +1093,6 @@ int ZmqAgent::config_sockets(const json& sockets) {
int ZmqAgent::send_command_message(const char* command,
const char* request_id,
const char* body) {
if (data_handle_ == nullptr) {
// Don't send command as no data_handle
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE, command);
}

const char* real_body = body ? (strlen(body) ? body : "\"\"") : "null";
auto recorded = create_recorded(system_clock::now());
int r;
Expand Down Expand Up @@ -1136,36 +1133,52 @@ int ZmqAgent::send_command_message(const char* command,
return send_command_message(command, request_id, body);
}

r = data_handle_->send(msg_buf_, r);
if (r == -1) {
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(zmq_errno()),
zmq_errno(),
command,
msg_buf_,
data_handle_->endpoint().to_string().c_str());
r = send_data_msg(msg_buf_, r);
if (r < 0) {
switch (r) {
case zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE:
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE,
command);
default:
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(r),
r,
command,
request_id,
body);
}
}

return r;
}

int ZmqAgent::send_data_msg(const std::string& msg) const {
int ZmqAgent::send_data_msg(const char* buf, size_t len) {
int r = 0;
if (data_handle_ == nullptr) {
// Don't send command as no data_handle
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE,
msg.c_str());
r = zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE;
goto push_msg;
}

int r = data_handle_->send(msg);
while (!data_ring_buffer_.empty()) {
std::string& msg = data_ring_buffer_.front();
r = data_handle_->send(msg.c_str(), msg.size());
if (r == -1) {
goto push_msg;
}

data_ring_buffer_.pop();
}

r = data_handle_->send(buf, len);
if (r == -1) {
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(zmq_errno()),
zmq_errno(),
"",
msg.c_str(),
data_handle_->endpoint().to_string().c_str());
goto push_msg;
} else {
return r;
}

push_msg:

data_ring_buffer_.push(std::string(buf, len));
return r;
}

Expand Down Expand Up @@ -1717,11 +1730,6 @@ ZmqAgent::ZmqCommandError ZmqAgent::create_command_error(

void ZmqAgent::send_error_message(const std::string& msg,
uint32_t code) {
if (data_handle_ == nullptr) {
// Don't send as no data_handle
return;
}

auto recorded = create_recorded(system_clock::now());
int r = snprintf(msg_buf_,
msg_size_,
Expand Down Expand Up @@ -1752,7 +1760,7 @@ void ZmqAgent::send_error_message(const std::string& msg,

ASSERT_LT(r, msg_size_);

r = data_handle_->send(msg_buf_, r);
r = send_data_msg(msg_buf_, r);
if (r == -1) {
return;
}
Expand All @@ -1761,12 +1769,6 @@ void ZmqAgent::send_error_message(const std::string& msg,
int ZmqAgent::send_error_command_message(const std::string& req_id,
const std::string& command,
const ZmqCommandError& err) {
if (data_handle_ == nullptr) {
// Don't send command as no data_handle
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE,
command.c_str());
}

auto recorded = create_recorded(system_clock::now());
int r = snprintf(msg_buf_,
msg_size_,
Expand Down Expand Up @@ -1801,14 +1803,20 @@ int ZmqAgent::send_error_command_message(const std::string& req_id,

ASSERT_LT(r, msg_size_);

r = data_handle_->send(msg_buf_, r);
if (r == -1) {
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(zmq_errno()),
zmq_errno(),
command,
msg_buf_,
data_handle_->endpoint().to_string().c_str());
r = send_data_msg(msg_buf_, r);
if (r < 0) {
switch (r) {
case zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE:
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE,
command);
default:
return PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(zmq_errno()),
zmq_errno(),
command,
msg_buf_,
data_handle_->endpoint().to_string().c_str());
}
}

return r;
Expand All @@ -1824,12 +1832,6 @@ int ZmqAgent::send_error_response(const std::string& req_id,
}

void ZmqAgent::send_exit() {
if (data_handle_ == nullptr) {
// Don't send command as no data_handle
PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE, kExit);
return;
}

auto* error = GetExitError();
int exit_code = GetExitCode();

Expand Down Expand Up @@ -1887,14 +1889,20 @@ void ZmqAgent::send_exit() {
return send_exit();
}

r = data_handle_->send(msg_buf_, r);
if (r == -1) {
PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(zmq_errno()),
zmq_errno(),
kExit,
msg_buf_,
data_handle_->endpoint().to_string().c_str());
r = send_data_msg(msg_buf_, r);
if (r < 0) {
switch (r) {
case zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE:
PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_NO_DATA_HANDLE, kExit);
break;
default:
PrintZmqError(zmq::ZMQ_ERROR_SEND_COMMAND_MESSAGE,
zmq_strerror(zmq_errno()),
zmq_errno(),
kExit,
msg_buf_,
data_handle_->endpoint().to_string().c_str());
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion agents/zmq/src/zmq_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ class ZmqAgent {
const char* request_id,
const char* body);

int send_data_msg(const std::string& msg) const;
int send_data_msg(const char* buf, size_t len);

void send_exit();

Expand Down Expand Up @@ -646,6 +646,7 @@ class ZmqAgent {
ZmqContext context_;
std::unique_ptr<ZmqCommandHandle> command_handle_;
std::unique_ptr<ZmqDataHandle> data_handle_;
utils::RingBuffer<std::string> data_ring_buffer_;
std::unique_ptr<ZmqBulkHandle> bulk_handle_;
int version_;
nsuv::ns_timer invalid_key_timer_;
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@
'test/cctest/http_server_fixture.h',
'test/cctest/test_agents_zmq_http_client.cc',
'test/cctest/test_nsolid_lru_map.cc',
'test/cctest/test_nsolid_ring_buffer.cc',
'test/cctest/test_nsolid_thread_safe.cc',
],
'node_cctest_openssl_sources': [
Expand Down
45 changes: 45 additions & 0 deletions src/nsolid/nsolid_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <random>
#include <vector>
#include <cstring>
#include <queue>

#include "uv.h"
#include "nlohmann/json.hpp"
Expand Down Expand Up @@ -193,6 +194,50 @@ class ring_buffer {
std::vector<double> buffer_;
};

template <typename T>
class RingBuffer {
public:
NSOLID_DELETE_DEFAULT_CONSTRUCTORS(RingBuffer)
explicit RingBuffer(size_t s) : size_(s) {
}

bool empty() const {
return buffer_.empty();
}

const T& front() const {
return buffer_.front();
}

T& front() {
return buffer_.front();
}

void pop() {
buffer_.pop();
}

void push(const T& value) {
if (buffer_.size() == size_) {
buffer_.pop();
}

buffer_.push(value);
}

void push(T&& value) {
if (buffer_.size() == size_) {
buffer_.pop();
}

buffer_.push(value);
}

private:
const size_t size_;
std::queue<T> buffer_;
};

} // namespace utils
} // namespace nsolid
} // namespace node
Expand Down
39 changes: 39 additions & 0 deletions test/cctest/test_nsolid_ring_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "gtest/gtest.h"
#include "nsolid/nsolid_util.h"

using node::nsolid::utils::RingBuffer;

TEST(RingBufferTest, Basic) {
// Create a buffer of size 3.
RingBuffer<int> buffer(3);

// Test that the buffer is initially empty.
EXPECT_TRUE(buffer.empty());

// Push some elements into the buffer.
buffer.push(1);
buffer.push(2);
buffer.push(3);

// Test that the buffer is not empty.
EXPECT_FALSE(buffer.empty());

// Test that the front of the buffer is the first element pushed.
EXPECT_EQ(buffer.front(), 1);

// Pop an element and test that the front of the buffer is the second element
// pushed.
buffer.pop();
EXPECT_EQ(buffer.front(), 2);

// Push another element and test that the front of the buffer is still the
// second element pushed.
buffer.push(4);
EXPECT_EQ(buffer.front(), 2);

// Push another element. This should cause the second element to be popped
// (since the buffer size is 3), so the front of the buffer should now be the
// third element pushed.
buffer.push(5);
EXPECT_EQ(buffer.front(), 3);
}

0 comments on commit 63ebaa6

Please sign in to comment.