Skip to content

Commit

Permalink
Merge pull request #1750 from AntelopeIO/GH-1683-stable-id-5.0
Browse files Browse the repository at this point in the history
[5.0] Prometheus: Add stable identifier for P2P connections
  • Loading branch information
heifner authored Oct 12, 2023
2 parents 6c24a3d + 292df50 commit b317322
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 96 deletions.
3 changes: 2 additions & 1 deletion plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ namespace eosio {
size_t block_sync_bytes_sent{0};
bool block_sync_throttling{false};
std::chrono::nanoseconds connection_start_time{0};
std::string log_p2p_address;
std::string p2p_address;
std::string unique_conn_node_id;
};
explicit p2p_per_connection_metrics(size_t count) {
peers.reserve(count);
Expand Down
16 changes: 13 additions & 3 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,9 @@ namespace eosio {
block_id_type fork_head GUARDED_BY(conn_mtx);
uint32_t fork_head_num GUARDED_BY(conn_mtx) {0};
fc::time_point last_close GUARDED_BY(conn_mtx);
string remote_endpoint_ip GUARDED_BY(conn_mtx);
std::string p2p_address GUARDED_BY(conn_mtx);
std::string unique_conn_node_id GUARDED_BY(conn_mtx);
std::string remote_endpoint_ip GUARDED_BY(conn_mtx);
boost::asio::ip::address_v6::bytes_type remote_endpoint_ip_array GUARDED_BY(conn_mtx);

std::chrono::nanoseconds connection_start_time{0};
Expand Down Expand Up @@ -1254,7 +1256,8 @@ namespace eosio {
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
last_handshake_sent(),
p2p_address( endpoint )
{
my_impl->mark_bp_connection(this);
update_endpoints();
Expand Down Expand Up @@ -3256,6 +3259,10 @@ namespace eosio {
}

log_p2p_address = msg.p2p_address;
fc::unique_lock g_conn( conn_mtx );
p2p_address = msg.p2p_address;
unique_conn_node_id = msg.node_id.str().substr( 0, 7 );
g_conn.unlock();

my_impl->mark_bp_connection(this);
if (my_impl->exceeding_connection_limit(shared_from_this())) {
Expand Down Expand Up @@ -4741,6 +4748,8 @@ namespace eosio {
}
fc::unique_lock g_conn(c->conn_mtx);
boost::asio::ip::address_v6::bytes_type addr = c->remote_endpoint_ip_array;
std::string p2p_addr = c->p2p_address;
std::string conn_node_id = c->unique_conn_node_id;
g_conn.unlock();
per_connection.peers.emplace_back(
net_plugin::p2p_per_connection_metrics::connection_metric{
Expand All @@ -4761,7 +4770,8 @@ namespace eosio {
, .block_sync_bytes_sent = c->get_block_sync_bytes_sent()
, .block_sync_throttling = c->get_block_sync_throttling()
, .connection_start_time = c->connection_start_time
, .log_p2p_address = c->log_p2p_address
, .p2p_address = p2p_addr
, .unique_conn_node_id = conn_node_id
});
}
g.unlock();
Expand Down
113 changes: 76 additions & 37 deletions plugins/prometheus_plugin/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,37 @@ struct catalog_type {
// http plugin
prometheus::Family<Counter>& http_request_counts;

// net plugin p2p-connections
prometheus::Family<Gauge>& p2p_connections;

Gauge& num_peers;
Gauge& num_clients;

// net plugin failed p2p connection
Counter& failed_p2p_connections;

// net plugin dropped_trxs
Counter& dropped_trxs_total;

struct p2p_connection_metrics {
Gauge& num_peers;
Gauge& num_clients;

prometheus::Family<Gauge>& addr; // Empty gauge; ipv6 address can't be transmitted as a double
prometheus::Family<Gauge>& port;
prometheus::Family<Gauge>& connection_number;
prometheus::Family<Gauge>& accepting_blocks;
prometheus::Family<Gauge>& last_received_block;
prometheus::Family<Gauge>& first_available_block;
prometheus::Family<Gauge>& last_available_block;
prometheus::Family<Gauge>& unique_first_block_count;
prometheus::Family<Gauge>& latency;
prometheus::Family<Gauge>& bytes_received;
prometheus::Family<Gauge>& last_bytes_received;
prometheus::Family<Gauge>& bytes_sent;
prometheus::Family<Gauge>& last_bytes_sent;
prometheus::Family<Gauge>& block_sync_bytes_received;
prometheus::Family<Gauge>& block_sync_bytes_sent;
prometheus::Family<Gauge>& block_sync_throttling;
prometheus::Family<Gauge>& connection_start_time;
prometheus::Family<Gauge>& peer_addr; // Empty gauge; we only want the label
};
p2p_connection_metrics p2p_metrics;

// producer plugin
prometheus::Family<Counter>& cpu_usage_us;
prometheus::Family<Counter>& net_usage_us;
Expand Down Expand Up @@ -97,12 +116,30 @@ struct catalog_type {
catalog_type()
: info(family<prometheus::Info>("nodeos", "static information about the server"))
, http_request_counts(family<Counter>("nodeos_http_requests_total", "number of HTTP requests"))
, p2p_connections(family<Gauge>("nodeos_p2p_connections", "current number of connected p2p connections"))
, num_peers(p2p_connections.Add({{"direction", "out"}}))
, num_clients(p2p_connections.Add({{"direction", "in"}}))
, failed_p2p_connections(
build<Counter>("nodeos_failed_p2p_connections", "total number of failed out-going p2p connections"))
, dropped_trxs_total(build<Counter>("nodeos_dropped_trxs_total", "total number of dropped transactions by net plugin"))
, failed_p2p_connections(build<Counter>("nodeos_p2p_failed_connections", "total number of failed out-going p2p connections"))
, dropped_trxs_total(build<Counter>("nodeos_p2p_dropped_trxs_total", "total number of dropped transactions by net plugin"))
, p2p_metrics{
.num_peers{build<Gauge>("nodeos_p2p_peers", "current number of connected outgoing peers")}
, .num_clients{build<Gauge>("nodeos_p2p_clients", "current number of connected incoming clients")}
, .addr{family<Gauge>("nodeos_p2p_addr", "ipv6 address")}
, .port{family<Gauge>("nodeos_p2p_port", "port")}
, .connection_number{family<Gauge>("nodeos_p2p_connection_number", "monatomic increasing connection number")}
, .accepting_blocks{family<Gauge>("nodeos_p2p_accepting_blocks", "accepting blocks on connection")}
, .last_received_block{family<Gauge>("nodeos_p2p_last_received_block", "last received block on connection")}
, .first_available_block{family<Gauge>("nodeos_p2p_first_available_block", "first block available from connection")}
, .last_available_block{family<Gauge>("nodeos_p2p_last_available_block", "last block available from connection")}
, .unique_first_block_count{family<Gauge>("nodeos_p2p_unique_first_block_count", "number of blocks first received from any connection on this connection")}
, .latency{family<Gauge>("nodeos_p2p_latency", "last calculated latency with connection")}
, .bytes_received{family<Gauge>("nodeos_p2p_bytes_received", "total bytes received on connection")}
, .last_bytes_received{family<Gauge>("nodeos_p2p_last_bytes_received", "last time anything received from peer")}
, .bytes_sent{family<Gauge>("nodeos_p2p_bytes_sent", "total bytes sent to peer")}
, .last_bytes_sent{family<Gauge>("nodeos_p2p_last_bytes_sent", "last time anything sent to peer")}
, .block_sync_bytes_received{family<Gauge>("nodeos_p2p_block_sync_bytes_received", "bytes of blocks received during syncing")}
, .block_sync_bytes_sent{family<Gauge>("nodeos_p2p_block_sync_bytes_sent", "bytes of blocks sent during syncing")}
, .block_sync_throttling{family<Gauge>("nodeos_p2p_block_sync_throttling", "is block sync throttling currently active")}
, .connection_start_time{family<Gauge>("nodeos_p2p_connection_start_time", "time of last connection to peer")}
, .peer_addr{family<Gauge>("nodeos_p2p_peer_addr", "peer address")}
}
, cpu_usage_us(family<Counter>("nodeos_cpu_usage_us_total", "total cpu usage in microseconds for blocks"))
, net_usage_us(family<Counter>("nodeos_net_usage_us_total", "total net usage in microseconds for blocks"))
, last_irreversible(build<Gauge>("nodeos_last_irreversible", "last irreversible block number"))
Expand Down Expand Up @@ -164,34 +201,36 @@ struct catalog_type {
}

void update(const net_plugin::p2p_connections_metrics& metrics) {
num_peers.Set(metrics.num_peers);
num_clients.Set(metrics.num_clients);
p2p_metrics.num_peers.Set(metrics.num_peers);
p2p_metrics.num_clients.Set(metrics.num_clients);
for(size_t i = 0; i < metrics.stats.peers.size(); ++i) {
std::string label{"connid_" + to_string(metrics.stats.peers[i].connection_id)};
auto add_and_set_gauge = [&](const std::string& label_value,
const auto& value) {
auto& gauge = p2p_connections.Add({{label, label_value}});
const auto& peer = metrics.stats.peers[i];
const auto& conn_id = peer.unique_conn_node_id;

const auto addr = boost::asio::ip::make_address_v6(peer.address).to_string();
p2p_metrics.addr.Add({{"connid", conn_id},{"ipv6", addr},{"address", peer.p2p_address}});

auto add_and_set_gauge = [&](auto& fam, const auto& value) {
auto& gauge = fam.Add({{"connid", conn_id}});
gauge.Set(value);
};
auto& peer = metrics.stats.peers[i];
auto addr = std::string("addr_") + boost::asio::ip::make_address_v6(peer.address).to_string();
add_and_set_gauge(addr, 0); // Empty gauge; ipv6 address can't be transmitted as a double
add_and_set_gauge("port", peer.port);
add_and_set_gauge("accepting_blocks", peer.accepting_blocks);
add_and_set_gauge("last_received_block", peer.last_received_block);
add_and_set_gauge("first_available_block", peer.first_available_block);
add_and_set_gauge("last_available_block", peer.last_available_block);
add_and_set_gauge("unique_first_block_count", peer.unique_first_block_count);
add_and_set_gauge("latency", peer.latency);
add_and_set_gauge("bytes_received", peer.bytes_received);
add_and_set_gauge("last_bytes_received", peer.last_bytes_received.count());
add_and_set_gauge("bytes_sent", peer.bytes_sent);
add_and_set_gauge("last_bytes_sent", peer.last_bytes_sent.count());
add_and_set_gauge("block_sync_bytes_received", peer.block_sync_bytes_received);
add_and_set_gauge("block_sync_bytes_sent", peer.block_sync_bytes_sent);
add_and_set_gauge("block_sync_throttling", peer.block_sync_throttling);
add_and_set_gauge("connection_start_time", peer.connection_start_time.count());
add_and_set_gauge(peer.log_p2p_address, 0); // Empty gauge; we only want the label

add_and_set_gauge(p2p_metrics.connection_number, peer.connection_id);
add_and_set_gauge(p2p_metrics.port, peer.port);
add_and_set_gauge(p2p_metrics.accepting_blocks, peer.accepting_blocks);
add_and_set_gauge(p2p_metrics.last_received_block, peer.last_received_block);
add_and_set_gauge(p2p_metrics.first_available_block, peer.first_available_block);
add_and_set_gauge(p2p_metrics.last_available_block, peer.last_available_block);
add_and_set_gauge(p2p_metrics.unique_first_block_count, peer.unique_first_block_count);
add_and_set_gauge(p2p_metrics.latency, peer.latency);
add_and_set_gauge(p2p_metrics.bytes_received, peer.bytes_received);
add_and_set_gauge(p2p_metrics.last_bytes_received, peer.last_bytes_received.count());
add_and_set_gauge(p2p_metrics.bytes_sent, peer.bytes_sent);
add_and_set_gauge(p2p_metrics.last_bytes_sent, peer.last_bytes_sent.count());
add_and_set_gauge(p2p_metrics.block_sync_bytes_received, peer.block_sync_bytes_received);
add_and_set_gauge(p2p_metrics.block_sync_bytes_sent, peer.block_sync_bytes_sent);
add_and_set_gauge(p2p_metrics.block_sync_throttling, peer.block_sync_throttling);
add_and_set_gauge(p2p_metrics.connection_start_time, peer.connection_start_time.count());
}
}

Expand Down
3 changes: 2 additions & 1 deletion tests/nodeos_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@

abs_path = os.path.abspath(os.getcwd() + '/unittests/contracts/eosio.token/eosio.token.abi')
traceNodeosArgs=" --http-max-response-time-ms 990000 --trace-rpc-abi eosio.token=" + abs_path
extraNodeosArgs=traceNodeosArgs + " --plugin eosio::prometheus_plugin --database-map-mode mapped_private "
specificNodeosInstances={0: "bin/nodeos"}
if cluster.launch(totalNodes=2, prodCount=prodCount, onlyBios=onlyBios, dontBootstrap=dontBootstrap, extraNodeosArgs=traceNodeosArgs, specificNodeosInstances=specificNodeosInstances) is False:
if cluster.launch(totalNodes=2, prodCount=prodCount, onlyBios=onlyBios, dontBootstrap=dontBootstrap, extraNodeosArgs=extraNodeosArgs, specificNodeosInstances=specificNodeosInstances) is False:
cmdError("launcher")
errorExit("Failed to stand up eos cluster.")
else:
Expand Down
27 changes: 17 additions & 10 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
walletMgr=WalletMgr(True)

def extractPrometheusMetric(connID: str, metric: str, text: str):
searchStr = f'nodeos_p2p_connections{{connid_{connID}="{metric}"}} '
searchStr = f'nodeos_p2p_{metric}{{connid="{connID}"}} '
begin = text.find(searchStr) + len(searchStr)
return int(text[begin:text.find('\n', begin)])

prometheusHostPortPattern = re.compile(r'^nodeos_p2p_connections.connid_([0-9])="localhost:([0-9]*)', re.MULTILINE)
prometheusHostPortPattern = re.compile(r'^nodeos_p2p_port.connid="([a-f0-9]*)". ([0-9]*)', re.MULTILINE)

try:
TestHelper.printSystemInfo("BEGIN")
Expand Down Expand Up @@ -120,6 +120,8 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):

errorLimit = 40 # Approximately 20 retries required
throttledNode = cluster.getNode(3)
throttledNodeConnId = None
throttlingNodeConnId = None
while errorLimit > 0:
try:
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', returnType=ReturnType.raw, printReturnLimit=16).decode()
Expand All @@ -134,17 +136,19 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
errorLimit -= 1
continue
connPorts = prometheusHostPortPattern.findall(response)
Print(connPorts)
if len(connPorts) < 3:
# wait for node to be connected
errorLimit -= 1
time.sleep(0.5)
continue
Print('Throttling Node Start State')
throttlingNodePortMap = {port: id for id, port in connPorts}
startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'],
throttlingNodePortMap = {port: id for id, port in connPorts if id != '' and port != '9877'}
throttlingNodeConnId = next(iter(throttlingNodePortMap.values())) # 9879
startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_bytes_sent',
response)
startSyncThrottlingState = extractPrometheusMetric(throttlingNodePortMap['9879'],
startSyncThrottlingState = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_throttling',
response)
Print(f'Start sync throttling bytes sent: {startSyncThrottlingBytesSent}')
Expand All @@ -170,13 +174,16 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
time.sleep(0.5)
continue
connPorts = prometheusHostPortPattern.findall(response)
Print(connPorts)
if len(connPorts) < 2:
# wait for sending node to be connected
errorLimit -= 1
continue
Print('Throttled Node Start State')
throttledNodePortMap = {port: id for id, port in connPorts}
startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'],
throttledNodePortMap = {port: id for id, port in connPorts if id != ''}
throttledNodeConnId = next(iter(throttledNodePortMap.values())) # 9878
Print(throttledNodeConnId)
startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodeConnId,
'block_sync_bytes_received',
response)
Print(f'Start sync throttled bytes received: {startSyncThrottledBytesReceived}')
Expand All @@ -190,7 +197,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
endThrottlingSync = time.time()
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttling Node End State')
endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodePortMap['9879'],
endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_bytes_sent',
response)
Print(f'End sync throttling bytes sent: {endSyncThrottlingBytesSent}')
Expand All @@ -200,7 +207,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
while time.time() < endThrottlingSync + 30:
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True,
returnType=ReturnType.raw, printReturnLimit=16).decode()
throttledState = extractPrometheusMetric(throttlingNodePortMap['9879'],
throttledState = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_throttling',
response)
if throttledState:
Expand All @@ -210,7 +217,7 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
endThrottledSync = time.time()
response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttled Node End State')
endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodePortMap['9878'],
endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodeConnId,
'block_sync_bytes_received',
response)
Print(f'End sync throttled bytes received: {endSyncThrottledBytesReceived}')
Expand Down
Loading

0 comments on commit b317322

Please sign in to comment.