Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per peer prometheus metrics and a TUI to view them. #1477

Merged
merged 36 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
79c9817
Add per peer prometheus metrics and a TUI to view them.
jgiszczak Aug 2, 2023
d13dfa5
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 2, 2023
727a0c5
Remove debug log statement.
jgiszczak Aug 2, 2023
fbb0cec
Add net-util command line support for host and port.
jgiszczak Aug 2, 2023
86c6ced
Update prometheus metric names in plugin_http_api_test.
jgiszczak Aug 3, 2023
ed9eee6
Make p2p_per_connection_metrics constructor explicit.
jgiszczak Aug 3, 2023
a30d7c7
Count unique received blocks in process_signed_block
jgiszczak Aug 3, 2023
51c5f15
Rename construction_time to connection_time and update every connect
jgiszczak Aug 3, 2023
d511095
Make prometheus endpoint URL a variable at the top of the script.
jgiszczak Aug 3, 2023
ef80f69
Reserve enough space only for current connection count.
jgiszczak Aug 3, 2023
507efb7
Rename late_initialize to update_prometheus_info.
jgiszczak Aug 3, 2023
1e14aa0
Change default host from localhost to 127.0.0.1.
jgiszczak Aug 3, 2023
850a336
Check for non-200 status codes rather than only 404.
jgiszczak Aug 3, 2023
7420c72
Add reported p2p_address to Prometheus metrics per peer.
jgiszczak Aug 3, 2023
2774dea
Add refresh interval argument and default to the max possible.
jgiszczak Aug 4, 2023
df3554d
Add support for last data received and sent timestamps on peers.
jgiszczak Aug 5, 2023
7c8ebe4
Support IPv6 addresses.
jgiszczak Aug 8, 2023
fb00ebc
Convert peer metrics from struct of vectors to vector of structs.
jgiszczak Aug 9, 2023
e1dacb9
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 9, 2023
98e1e39
Default initialization and const correctness.
jgiszczak Aug 10, 2023
690d12e
Thread safety and collect connection time for inbound connections
jgiszczak Aug 10, 2023
9ec5410
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 10, 2023
ae87427
Retain chrono nanoseconds until the last possible moment.
jgiszczak Aug 10, 2023
639faf2
Relocate net-util to tools.
jgiszczak Aug 10, 2023
9ff857c
Move accounting of bytes written to peer to async_write's completion …
jgiszczak Aug 11, 2023
6abba1a
Use connection ID as per peer Prometheus metric key.
jgiszczak Aug 13, 2023
2754f02
Remove unnecessary retention of pointers to Prometheus gauges.
jgiszczak Aug 14, 2023
ef44bfb
Add logging when remote endpoint can't be retrieved.
jgiszczak Aug 14, 2023
6b6ea38
Use fc::unique_lock on an fc::mutex.
jgiszczak Aug 14, 2023
09ea0cc
Change over bandwidth field tracking to connection ID from IP.
jgiszczak Aug 14, 2023
af76bb0
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 14, 2023
e4aa07e
Adjust peer list column weightings.
jgiszczak Aug 14, 2023
8914850
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 14, 2023
e0deb07
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 15, 2023
1f1ea00
Update help preamble to better describe net-util's current capabilities.
jgiszczak Aug 15, 2023
a0b3985
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,15 +1296,6 @@ namespace chain_apis {

const string read_only::KEYi64 = "i64";

template<typename I>
std::string itoh(I n, size_t hlen = sizeof(I)<<1) {
static const char* digits = "0123456789abcdef";
std::string r(hlen, '0');
for(size_t i = 0, j = (hlen - 1) * 4 ; i < hlen; ++i, j -= 4)
r[i] = digits[(n>>j) & 0x0f];
return r;
}

read_only::get_info_results read_only::get_info(const read_only::get_info_params&, const fc::time_point&) const {
const auto& rm = db.get_resource_limits_manager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,15 @@ class read_write : public api_base {
}
};

template<typename I>
std::string itoh(I n, size_t hlen = sizeof(I)<<1) {
static const char* digits = "0123456789abcdef";
std::string r(hlen, '0');
for(size_t i = 0, j = (hlen - 1) * 4 ; i < hlen; ++i, j -= 4)
r[i] = digits[(n>>j) & 0x0f];
return r;
}

} // namespace chain_apis

class chain_plugin : public plugin<chain_plugin> {
Expand Down
40 changes: 40 additions & 0 deletions plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,49 @@ namespace eosio {
std::optional<connection_status> status( const string& endpoint )const;
vector<connection_status> connections()const;

struct p2p_per_connection_metrics {
struct connection_metric {
uint32_t connection_id{0};
boost::asio::ip::address_v6::bytes_type address;
heifner marked this conversation as resolved.
Show resolved Hide resolved
unsigned short port{0};
bool accepting_blocks{false};
uint32_t last_received_block{0};
uint32_t first_available_block{0};
uint32_t last_available_block{0};
size_t unique_first_block_count{0};
uint64_t latency{0};
size_t bytes_received{0};
std::chrono::nanoseconds last_bytes_received{0};
size_t bytes_sent{0};
std::chrono::nanoseconds last_bytes_sent{0};
std::chrono::nanoseconds connection_start_time{0};
std::string log_p2p_address;
};
explicit p2p_per_connection_metrics(size_t count) {
peers.reserve(count);
}
p2p_per_connection_metrics(p2p_per_connection_metrics&& other)
: peers{std::move(other.peers)}
{}
p2p_per_connection_metrics(const p2p_per_connection_metrics&) = delete;
p2p_per_connection_metrics& operator=(const p2p_per_connection_metrics&) = delete;
std::vector<connection_metric> peers;
};
struct p2p_connections_metrics {
p2p_connections_metrics(std::size_t peers, std::size_t clients, p2p_per_connection_metrics&& statistics)
: num_peers{peers}
, num_clients{clients}
, stats{std::move(statistics)}
{}
p2p_connections_metrics(p2p_connections_metrics&& statistics)
: num_peers{std::move(statistics.num_peers)}
, num_clients{std::move(statistics.num_clients)}
, stats{std::move(statistics.stats)}
{}
p2p_connections_metrics(const p2p_connections_metrics&) = delete;
std::size_t num_peers = 0;
std::size_t num_clients = 0;
p2p_per_connection_metrics stats;
};

void register_update_p2p_connection_metrics(std::function<void(p2p_connections_metrics)>&&);
Expand Down
89 changes: 78 additions & 11 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

using namespace eosio::chain::plugin_interface;

using namespace std::chrono_literals;

namespace boost
{
/// @brief Overload for boost::lexical_cast to convert vector of strings to string
Expand Down Expand Up @@ -790,6 +792,15 @@ namespace eosio {
bool is_blocks_only_connection()const { return connection_type == blocks_only; }
bool is_transactions_connection() const { return connection_type != blocks_only; } // thread safe, atomic
bool is_blocks_connection() const { return connection_type != transactions_only; } // thread safe, atomic
uint32_t get_peer_start_block_num() const { return peer_start_block_num.load(); }
uint32_t get_peer_head_block_num() const { return peer_head_block_num.load(); }
uint32_t get_last_received_block_num() const { return last_received_block_num.load(); }
uint32_t get_unique_blocks_rcvd_count() const { return unique_blocks_rcvd_count.load(); }
size_t get_bytes_received() const { return bytes_received.load(); }
std::chrono::nanoseconds get_last_bytes_received() const { return last_bytes_received.load(); }
size_t get_bytes_sent() const { return bytes_sent.load(); }
std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); }
boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
hb_timeout = msec;
}
Expand Down Expand Up @@ -819,6 +830,13 @@ namespace eosio {
std::atomic<connection_types> connection_type{both};
std::atomic<uint32_t> peer_start_block_num{0};
std::atomic<uint32_t> peer_head_block_num{0};
std::atomic<uint32_t> last_received_block_num{0};
std::atomic<uint32_t> unique_blocks_rcvd_count{0};
std::atomic<size_t> bytes_received{0};
std::atomic<std::chrono::nanoseconds> last_bytes_received{0ns};
std::atomic<size_t> bytes_sent{0};
std::atomic<std::chrono::nanoseconds> last_bytes_sent{0ns};
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};

public:
boost::asio::io_context::strand strand;
Expand Down Expand Up @@ -875,6 +893,9 @@ namespace eosio {
uint32_t fork_head_num GUARDED_BY(conn_mtx) {0};
fc::time_point last_close GUARDED_BY(conn_mtx);
string remote_endpoint_ip GUARDED_BY(conn_mtx);
boost::asio::ip::address_v6::bytes_type remote_endpoint_ip_array GUARDED_BY(conn_mtx);

std::chrono::nanoseconds connection_start_time{0};

connection_status get_status()const;

Expand Down Expand Up @@ -941,10 +962,12 @@ namespace eosio {
void send_time(const time_message& msg);
/** \brief Read system time and convert to a 64 bit integer.
*
* There are only two calls on this routine in the program. One
* when a packet arrives from the network and the other when a
* packet is placed on the send queue. Calls the kernel time of
* day routine and converts to a (at least) 64 bit integer.
* There are five calls to this routine in the program. One
* when a packet arrives from the network, one when a packet
* is placed on the send queue, one during start session, and
* one each when data is counted as received or sent.
* Calls the kernel time of day routine and converts to
* a (at least) 64 bit integer.
*/
static std::chrono::nanoseconds get_time() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch());
Expand Down Expand Up @@ -1158,6 +1181,7 @@ namespace eosio {
last_handshake_sent()
{
my_impl->mark_bp_connection(this);
update_endpoints();
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

Expand All @@ -1181,12 +1205,25 @@ namespace eosio {
boost::system::error_code ec2;
auto rep = socket->remote_endpoint(ec);
auto lep = socket->local_endpoint(ec2);
remote_endpoint_port = ec ? 0 : rep.port();
heifner marked this conversation as resolved.
Show resolved Hide resolved
log_remote_endpoint_ip = ec ? unknown : rep.address().to_string();
log_remote_endpoint_port = ec ? unknown : std::to_string(rep.port());
local_endpoint_ip = ec2 ? unknown : lep.address().to_string();
local_endpoint_port = ec2 ? unknown : std::to_string(lep.port());
fc::lock_guard g_conn( conn_mtx );
remote_endpoint_ip = log_remote_endpoint_ip;
if(!ec) {
if(rep.address().is_v4()) {
remote_endpoint_ip_array = make_address_v6(boost::asio::ip::v4_mapped, rep.address().to_v4()).to_bytes();
heifner marked this conversation as resolved.
Show resolved Hide resolved
}
else {
remote_endpoint_ip_array = rep.address().to_v6().to_bytes();
}
}
else {
fc_dlog( logger, "unable to retrieve remote endpoint for local ${address}:${port}", ("address", local_endpoint_ip)("port", local_endpoint_port));
remote_endpoint_ip_array = boost::asio::ip::address_v6().to_bytes();
}
}

// called from connection strand
Expand Down Expand Up @@ -1233,19 +1270,19 @@ namespace eosio {

connection_status connection::get_status()const {
connection_status stat;
stat.peer = peer_addr;
stat.remote_ip = log_remote_endpoint_ip;
stat.remote_port = log_remote_endpoint_port;
stat.connecting = state() == connection_state::connecting;
stat.syncing = peer_syncing_from_us;
stat.is_bp_peer = is_bp_connection;
stat.is_socket_open = socket_is_open();
fc::lock_guard g( conn_mtx );
stat.peer = peer_addr;
stat.remote_ip = log_remote_endpoint_ip;
stat.remote_port = log_remote_endpoint_port;
stat.last_handshake = last_handshake_recv;
return stat;
}

// called from connection stand
// called from connection strand
bool connection::start_session() {
verify_strand_in_this_thread( strand, __func__, __LINE__ );

Expand All @@ -1259,6 +1296,7 @@ namespace eosio {
} else {
peer_dlog( this, "connected" );
socket_open = true;
connection_start_time = get_time();
start_read_message();
return true;
}
Expand Down Expand Up @@ -1562,6 +1600,8 @@ namespace eosio {
c->close();
return;
}
c->bytes_sent += w;
c->last_bytes_sent = c->get_time();

c->buffer_queue.out_callback( ec, w );

Expand Down Expand Up @@ -2637,7 +2677,6 @@ namespace eosio {

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
connection_wptr weak_conn = c;
// Note: need to add support for IPv6 too
resolver->async_resolve(host, port, boost::asio::bind_executor( c->strand,
[resolver, weak_conn, host = host, port = port]( const boost::system::error_code& err, const tcp::resolver::results_type& endpoints ) {
auto c = weak_conn.lock();
Expand All @@ -2664,6 +2703,7 @@ namespace eosio {
boost::asio::bind_executor( strand,
[resolver, c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
if( !err && socket->is_open() && socket == c->socket ) {
c->update_endpoints();
if( c->start_session() ) {
c->send_handshake();
c->send_time();
Expand Down Expand Up @@ -2867,6 +2907,8 @@ namespace eosio {

// called from connection strand
bool connection::process_next_message( uint32_t message_length ) {
bytes_received += message_length;
last_bytes_received = get_time();
try {
latest_msg_time = std::chrono::system_clock::now();

Expand Down Expand Up @@ -2906,7 +2948,7 @@ namespace eosio {
fc::raw::unpack( peek_ds, bh );

const block_id_type blk_id = bh.calculate_id();
const uint32_t blk_num = block_header::num_from_id(blk_id);
const uint32_t blk_num = last_received_block_num = block_header::num_from_id(blk_id);
// don't add_peer_block because we have not validated this block header yet
if( my_impl->dispatcher->have_block( blk_id ) ) {
peer_dlog( this, "canceling wait, already received block ${num}, id ${id}...",
Expand Down Expand Up @@ -3639,6 +3681,7 @@ namespace eosio {
}

if( accepted ) {
++unique_blocks_rcvd_count;
boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", blk_num)("id", blk_id.str().substr(8,16)) );
dispatcher->add_peer_block( blk_id, c->connection_id );
Expand Down Expand Up @@ -4400,6 +4443,7 @@ namespace eosio {
auto it = (from ? connections.find(from) : connections.begin());
if (it == connections.end()) it = connections.begin();
size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0;
net_plugin::p2p_per_connection_metrics per_connection(connections.size());
while (it != connections.end()) {
if (fc::time_point::now() >= max_time) {
connection_wptr wit = *it;
Expand All @@ -4417,6 +4461,29 @@ namespace eosio {
} else {
++num_peers;
}
if (update_p2p_connection_metrics) {
fc::unique_lock g_conn((*it)->conn_mtx);
boost::asio::ip::address_v6::bytes_type addr = (*it)->remote_endpoint_ip_array;
g_conn.unlock();
net_plugin::p2p_per_connection_metrics::connection_metric metrics{
.connection_id = (*it)->connection_id
, .address = addr
, .port = (*it)->get_remote_endpoint_port()
, .accepting_blocks = (*it)->is_blocks_connection()
, .last_received_block = (*it)->get_last_received_block_num()
, .first_available_block = (*it)->get_peer_start_block_num()
, .last_available_block = (*it)->get_peer_head_block_num()
, .unique_first_block_count = (*it)->get_unique_blocks_rcvd_count()
, .latency = (*it)->get_peer_ping_time_ns()
, .bytes_received = (*it)->get_bytes_received()
, .last_bytes_received = (*it)->get_last_bytes_received()
, .bytes_sent = (*it)->get_bytes_sent()
, .last_bytes_sent = (*it)->get_last_bytes_sent()
, .connection_start_time = (*it)->connection_start_time
, .log_p2p_address = (*it)->log_p2p_address
};
per_connection.peers.push_back(metrics);
}

if (!(*it)->socket_is_open() && (*it)->state() != connection::connection_state::connecting) {
if (!(*it)->incoming()) {
Expand All @@ -4438,7 +4505,7 @@ namespace eosio {
g.unlock();

if (update_p2p_connection_metrics) {
update_p2p_connection_metrics({.num_peers = num_peers, .num_clients = num_clients});
update_p2p_connection_metrics({num_peers, num_clients, std::move(per_connection)});
}

if( num_clients > 0 || num_peers > 0 ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <eosio/chain/application.hpp>
#include <eosio/http_plugin/http_plugin.hpp>
#include <eosio/chain_plugin/chain_plugin.hpp>

namespace eosio {

Expand All @@ -12,7 +13,7 @@ namespace eosio {
prometheus_plugin();
~prometheus_plugin() override;

APPBASE_PLUGIN_REQUIRES((http_plugin))
APPBASE_PLUGIN_REQUIRES((http_plugin)(chain_plugin))

void set_program_options(options_description&, options_description& cfg) override;

Expand Down
Loading