Skip to content

Commit

Permalink
chore: export pipeline related metrics (#3104)
Browse files Browse the repository at this point in the history
* chore: export pipeline related metrics

Export in /metrics
1. Total pipeline queue length
2. Total pipeline commands
3. Total pipelined duration

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored May 30, 2024
1 parent 137bd31 commit 0394387
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 57 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
branches: [main]
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
pre-commit:
runs-on: ubuntu-latest
Expand Down
8 changes: 5 additions & 3 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,21 @@ class ConnectionContext {
bool async_dispatch : 1; // whether this connection is amid an async dispatch
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused = false; // whether this connection is paused due to CLIENT PAUSE

bool paused = false; // whether this connection is paused due to CLIENT PAUSE
// whether it's blocked on blocking commands like BLPOP, needs to be addressable
bool blocked = false;

// Skip ACL validation, used by internal commands and commands run on admin port
bool skip_acl_validation = false;

// How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions;

// TODO fix inherit actual values from default
std::string authed_username{"default"};
uint32_t acl_categories{dfly::acl::ALL};
std::vector<uint64_t> acl_commands;
// Skip ACL validation, used by internal commands and commands run on admin port
bool skip_acl_validation = false;
// keys
dfly::acl::AclKeys keys{{}, true};

Expand Down
28 changes: 9 additions & 19 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,6 @@ size_t Connection::MessageHandle::UsedMemory() const {
return sizeof(MessageHandle) + visit(MessageSize{}, this->handle);
}

bool Connection::MessageHandle::IsIntrusive() const {
return holds_alternative<AclUpdateMessagePtr>(handle) ||
holds_alternative<CheckpointMessage>(handle);
}

bool Connection::MessageHandle::IsPipelineMsg() const {
return holds_alternative<PipelineMessagePtr>(handle);
}

bool Connection::MessageHandle::IsPubMsg() const {
return holds_alternative<PubMessagePtr>(handle);
}

bool Connection::MessageHandle::IsReplying() const {
return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle) ||
(holds_alternative<MCPipelineMessagePtr>(handle) &&
Expand Down Expand Up @@ -751,6 +738,9 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co

string after;
absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id));
if (dispatch_q_.size()) {
absl::StrAppend(&after, " pipeline=", dispatch_q_.size());
}
absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_);
string_view phase_name = PHASE_NAMES[phase_];

Expand Down Expand Up @@ -1272,7 +1262,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
cc_->async_dispatch = false;

auto it = dispatch_q_.begin();
while (it->IsIntrusive()) // Skip all newly received intrusive messages
while (it->IsControl()) // Skip all newly received intrusive messages
++it;

for (auto rit = it; rit != it + dispatched; ++rit)
Expand All @@ -1291,7 +1281,7 @@ void Connection::ClearPipelinedMessages() {
// As well as to avoid pubsub backpressure leakege.
for (auto& msg : dispatch_q_) {
FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages
if (msg.IsIntrusive())
if (msg.IsControl())
visit(dispatch_op, msg.handle); // to not miss checkpoints
RecycleMessage(std::move(msg));
}
Expand All @@ -1309,7 +1299,7 @@ std::string Connection::DebugInfo() const {
absl::StrAppend(&info, "closing=", cc_->conn_closing, ", ");
absl::StrAppend(&info, "dispatch_fiber:joinable=", dispatch_fb_.IsJoinable(), ", ");

bool intrusive_front = dispatch_q_.size() > 0 && dispatch_q_.front().IsIntrusive();
bool intrusive_front = dispatch_q_.size() > 0 && dispatch_q_.front().IsControl();
absl::StrAppend(&info, "dispatch_queue:size=", dispatch_q_.size(), ", ");
absl::StrAppend(&info, "dispatch_queue:pipelined=", pending_pipeline_cmd_cnt_, ", ");
absl::StrAppend(&info, "dispatch_queue:intrusive=", intrusive_front, ", ");
Expand Down Expand Up @@ -1549,7 +1539,7 @@ void Connection::SendAsync(MessageHandle msg) {

// "Closing" connections might be still processing commands, as we don't interrupt them.
// So we still want to deliver control messages to them (like checkpoints).
if (cc_->conn_closing && !msg.IsIntrusive())
if (cc_->conn_closing && !msg.IsControl())
return;

// If we launch while closing, it won't be awaited. Control messages will be processed on cleanup.
Expand All @@ -1573,9 +1563,9 @@ void Connection::SendAsync(MessageHandle msg) {
pending_pipeline_cmd_cnt_++;
}

if (msg.IsIntrusive()) {
if (msg.IsControl()) {
auto it = dispatch_q_.begin();
while (it < dispatch_q_.end() && it->IsIntrusive())
while (it < dispatch_q_.end() && it->IsControl())
++it;
dispatch_q_.insert(it, std::move(msg));
} else {
Expand Down
19 changes: 14 additions & 5 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,21 @@ class Connection : public util::Connection {
struct MessageHandle {
size_t UsedMemory() const; // How much bytes this handle takes up in total.

// Intrusive messages put themselves at the front of the queue, but only after all other
// intrusive ones. Used for quick transfer of control / update messages.
bool IsIntrusive() const;
// Control messages put themselves at the front of the queue, but only after all other
// control ones. Used for management messages.
bool IsControl() const {
return std::holds_alternative<AclUpdateMessagePtr>(handle) ||
std::holds_alternative<CheckpointMessage>(handle);
}

bool IsPipelineMsg() const {
return std::holds_alternative<PipelineMessagePtr>(handle);
}

bool IsPubMsg() const {
return std::holds_alternative<PubMessagePtr>(handle);
}

bool IsPipelineMsg() const;
bool IsPubMsg() const;
bool IsReplying() const; // control messges don't reply, messages carrying data do

std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, MCPipelineMessagePtr,
Expand Down
4 changes: 2 additions & 2 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ inline std::string_view ToSV(std::string_view slice) {

struct ConnectionStats {
size_t read_buf_capacity = 0; // total capacity of input buffers
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries
uint64_t dispatch_queue_entries = 0; // total number of dispatch queue entries
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
size_t dispatch_queue_subscriber_bytes = 0; // total size of all publish messages

size_t pipeline_cmd_cache_bytes = 0;

size_t io_read_cnt = 0;
uint64_t io_read_cnt = 0;
size_t io_read_bytes = 0;

uint64_t command_cnt = 0;
Expand Down
11 changes: 9 additions & 2 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1071,10 +1071,17 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("dispatch_queue_bytes", "", conn_stats.dispatch_queue_bytes,
AppendMetricWithoutLabels("pipeline_queue_bytes", "", conn_stats.dispatch_queue_bytes,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_commands_total", "", conn_stats.pipelined_cmd_cnt,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("pipeline_commands_duration_seconds", "",
conn_stats.pipelined_cmd_latency * 1e-6, MetricType::COUNTER,
&resp->body());

// Memory metrics
auto sdata_res = io::ReadStatusInfo();
Expand Down Expand Up @@ -1977,7 +1984,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("max_clients", GetFlag(FLAGS_maxclients));
append("client_read_buffer_bytes", m.facade_stats.conn_stats.read_buf_capacity);
append("blocked_clients", m.facade_stats.conn_stats.num_blocked_clients);
append("dispatch_queue_entries", m.facade_stats.conn_stats.dispatch_queue_entries);
append("pipeline_queue_length", m.facade_stats.conn_stats.dispatch_queue_entries);
}

if (should_enter("MEMORY")) {
Expand Down
2 changes: 1 addition & 1 deletion tools/local/monitoring/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ volumes:

services:
prometheus:
image: prom/prometheus
image: prom/prometheus:v2.45.5
restart: always
volumes:
- ./prometheus:/etc/prometheus/
Expand Down
Loading

0 comments on commit 0394387

Please sign in to comment.