Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per peer prometheus metrics and a TUI to view them. #1477

Merged
merged 36 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
79c9817
Add per peer prometheus metrics and a TUI to view them.
jgiszczak Aug 2, 2023
d13dfa5
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 2, 2023
727a0c5
Remove debug log statement.
jgiszczak Aug 2, 2023
fbb0cec
Add net-util command line support for host and port.
jgiszczak Aug 2, 2023
86c6ced
Update prometheus metric names in plugin_http_api_test.
jgiszczak Aug 3, 2023
ed9eee6
Make p2p_per_connection_metrics constructor explicit.
jgiszczak Aug 3, 2023
a30d7c7
Count unique received blocks in process_signed_block
jgiszczak Aug 3, 2023
51c5f15
Rename construction_time to connection_time and update every connect
jgiszczak Aug 3, 2023
d511095
Make prometheus endpoint URL a variable at the top of the script.
jgiszczak Aug 3, 2023
ef80f69
Reserve enough space only for current connection count.
jgiszczak Aug 3, 2023
507efb7
Rename late_initialize to update_prometheus_info.
jgiszczak Aug 3, 2023
1e14aa0
Change default host from localhost to 127.0.0.1.
jgiszczak Aug 3, 2023
850a336
Check for non-200 status codes rather than only 404.
jgiszczak Aug 3, 2023
7420c72
Add reported p2p_address to Prometheus metrics per peer.
jgiszczak Aug 3, 2023
2774dea
Add refresh interval argument and default to the max possible.
jgiszczak Aug 4, 2023
df3554d
Add support for last data received and sent timestamps on peers.
jgiszczak Aug 5, 2023
7c8ebe4
Support IPv6 addresses.
jgiszczak Aug 8, 2023
fb00ebc
Convert peer metrics from struct of vectors to vector of structs.
jgiszczak Aug 9, 2023
e1dacb9
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 9, 2023
98e1e39
Default initialization and const correctness.
jgiszczak Aug 10, 2023
690d12e
Thread safety and collect connection time for inbound connections
jgiszczak Aug 10, 2023
9ec5410
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 10, 2023
ae87427
Retain chrono nanoseconds until the last possible moment.
jgiszczak Aug 10, 2023
639faf2
Relocate net-util to tools.
jgiszczak Aug 10, 2023
9ff857c
Move accounting of bytes written to peer to async_write's completion …
jgiszczak Aug 11, 2023
6abba1a
Use connection ID as per peer Prometheus metric key.
jgiszczak Aug 13, 2023
2754f02
Remove unnecessary retention of pointers to Prometheus gauges.
jgiszczak Aug 14, 2023
ef44bfb
Add logging when remote endpoint can't be retrieved.
jgiszczak Aug 14, 2023
6b6ea38
Use fc::unique_lock on an fc::mutex.
jgiszczak Aug 14, 2023
09ea0cc
Change over bandwidth field tracking to connection ID from IP.
jgiszczak Aug 14, 2023
af76bb0
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 14, 2023
e4aa07e
Adjust peer list column weightings.
jgiszczak Aug 14, 2023
8914850
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 14, 2023
e0deb07
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 15, 2023
1f1ea00
Update help preamble to better describe net-util's current capabilities.
jgiszczak Aug 15, 2023
a0b3985
Merge branch 'main' into prometheus-peer-statistics
jgiszczak Aug 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,15 +1296,6 @@ namespace chain_apis {

const string read_only::KEYi64 = "i64";

template<typename I>
std::string itoh(I n, size_t hlen = sizeof(I)<<1) {
static const char* digits = "0123456789abcdef";
std::string r(hlen, '0');
for(size_t i = 0, j = (hlen - 1) * 4 ; i < hlen; ++i, j -= 4)
r[i] = digits[(n>>j) & 0x0f];
return r;
}

read_only::get_info_results read_only::get_info(const read_only::get_info_params&, const fc::time_point&) const {
const auto& rm = db.get_resource_limits_manager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,15 @@ class read_write : public api_base {
}
};

template<typename I>
std::string itoh(I n, size_t hlen = sizeof(I)<<1) {
static const char* digits = "0123456789abcdef";
std::string r(hlen, '0');
for(size_t i = 0, j = (hlen - 1) * 4 ; i < hlen; ++i, j -= 4)
r[i] = digits[(n>>j) & 0x0f];
return r;
}

} // namespace chain_apis

class chain_plugin : public plugin<chain_plugin> {
Expand Down
53 changes: 53 additions & 0 deletions plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,62 @@ namespace eosio {
std::optional<connection_status> status( const string& endpoint )const;
vector<connection_status> connections()const;

struct p2p_per_connection_metrics {
p2p_per_connection_metrics(size_t count) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
addresses.reserve(count);
ports.reserve(count);
accepting_blocks.reserve(count);
last_received_blocks.reserve(count);
first_available_blocks.reserve(count);
last_available_blocks.reserve(count);
unique_first_block_counts.reserve(count);
latencies.reserve(count);
bytes_received.reserve(count);
bytes_sent.reserve(count);
connection_start_times.reserve(count);
}
p2p_per_connection_metrics(p2p_per_connection_metrics&& metrics)
: addresses{std::move(metrics.addresses)}
, ports{std::move(metrics.ports)}
, accepting_blocks{std::move(metrics.accepting_blocks)}
, last_received_blocks{std::move(metrics.last_received_blocks)}
, first_available_blocks{std::move(metrics.first_available_blocks)}
, last_available_blocks{std::move(metrics.last_available_blocks)}
, unique_first_block_counts{std::move(metrics.unique_first_block_counts)}
, latencies{std::move(metrics.latencies)}
, bytes_received{std::move(metrics.bytes_received)}
, bytes_sent{std::move(metrics.bytes_sent)}
heifner marked this conversation as resolved.
Show resolved Hide resolved
, connection_start_times{std::move(metrics.connection_start_times)}
{}
p2p_per_connection_metrics(const p2p_per_connection_metrics&) = delete;
p2p_per_connection_metrics& operator=(const p2p_per_connection_metrics&) = delete;
std::vector<boost::asio::ip::address_v4::uint_type> addresses;
heifner marked this conversation as resolved.
Show resolved Hide resolved
std::vector<unsigned short> ports;
std::vector<bool> accepting_blocks;
std::vector<uint32_t> last_received_blocks;
std::vector<uint32_t> first_available_blocks;
std::vector<uint32_t> last_available_blocks;
std::vector<std::size_t> unique_first_block_counts;
std::vector<uint64_t> latencies;
std::vector<std::size_t> bytes_received;
std::vector<std::size_t> bytes_sent;
std::vector<std::chrono::nanoseconds> connection_start_times;
heifner marked this conversation as resolved.
Show resolved Hide resolved
};
struct p2p_connections_metrics {
p2p_connections_metrics(std::size_t peers, std::size_t clients, p2p_per_connection_metrics&& statistics)
: num_peers{peers}
, num_clients{clients}
, stats{std::move(statistics)}
{}
p2p_connections_metrics(p2p_connections_metrics&& statistics)
: num_peers{std::move(statistics.num_peers)}
, num_clients{std::move(statistics.num_clients)}
, stats{std::move(statistics.stats)}
{}
p2p_connections_metrics(const p2p_connections_metrics&) = delete;
std::size_t num_peers = 0;
std::size_t num_clients = 0;
p2p_per_connection_metrics stats;
};

void register_update_p2p_connection_metrics(std::function<void(p2p_connections_metrics)>&&);
Expand Down
56 changes: 48 additions & 8 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,12 @@ namespace eosio {
bool is_blocks_only_connection()const { return connection_type == blocks_only; }
bool is_transactions_connection() const { return connection_type != blocks_only; } // thread safe, atomic
bool is_blocks_connection() const { return connection_type != transactions_only; } // thread safe, atomic
uint32_t get_peer_start_block_num() const { return peer_start_block_num.load(); }
uint32_t get_peer_head_block_num() const { return peer_head_block_num.load(); }
uint32_t get_last_received_blk_num() const { return last_received_blk_num.load(); }
heifner marked this conversation as resolved.
Show resolved Hide resolved
uint32_t get_unique_blocks_rcvd_count() const { return unique_blocks_rcvd_count.load(); }
size_t get_bytes_received() const { return bytes_received.load(); }
size_t get_bytes_sent() const { return bytes_sent.load(); }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
hb_timeout = msec;
}
Expand Down Expand Up @@ -819,6 +825,10 @@ namespace eosio {
std::atomic<connection_types> connection_type{both};
std::atomic<uint32_t> peer_start_block_num{0};
std::atomic<uint32_t> peer_head_block_num{0};
std::atomic<uint32_t> last_received_blk_num{0};
std::atomic<uint32_t> unique_blocks_rcvd_count{0};
std::atomic<size_t> bytes_received{0};
std::atomic<size_t> bytes_sent{0};

public:
boost::asio::io_context::strand strand;
Expand All @@ -836,6 +846,9 @@ namespace eosio {
string log_remote_endpoint_port;
string local_endpoint_ip;
string local_endpoint_port;
bool remote_endpoint_ipv4;
uint32_t remote_endpoint_ip_integer;
uint32_t remote_endpoint_port;
heifner marked this conversation as resolved.
Show resolved Hide resolved
// kept in sync with last_handshake_recv.last_irreversible_block_num, only accessed from connection strand
uint32_t peer_lib_num = 0;

Expand Down Expand Up @@ -876,6 +889,8 @@ namespace eosio {
fc::time_point last_close GUARDED_BY(conn_mtx);
string remote_endpoint_ip GUARDED_BY(conn_mtx);

std::chrono::nanoseconds construction_time{0};

connection_status get_status()const;

/** \name Peer Timestamps
Expand Down Expand Up @@ -941,10 +956,11 @@ namespace eosio {
void send_time(const time_message& msg);
/** \brief Read system time and convert to a 64 bit integer.
*
* There are only two calls on this routine in the program. One
* when a packet arrives from the network and the other when a
* packet is placed on the send queue. Calls the kernel time of
* day routine and converts to a (at least) 64 bit integer.
* There are four calls to this routine in the program. One
* when a packet arrives from the network, one when a packet
* is placed on the send queue, and one during construction.
* Calls the kernel time of day routine and converts to
* a (at least) 64 bit integer.
*/
static std::chrono::nanoseconds get_time() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch());
Expand Down Expand Up @@ -1155,7 +1171,8 @@ namespace eosio {
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
last_handshake_sent(),
construction_time(get_time())
heifner marked this conversation as resolved.
Show resolved Hide resolved
{
my_impl->mark_bp_connection(this);
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
Expand All @@ -1169,7 +1186,8 @@ namespace eosio {
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
last_handshake_sent(),
construction_time(get_time())
{
update_endpoints();
fc_dlog( logger, "new connection object created for peer ${address}:${port} from listener ${addr}", ("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
Expand All @@ -1181,6 +1199,9 @@ namespace eosio {
boost::system::error_code ec2;
auto rep = socket->remote_endpoint(ec);
auto lep = socket->local_endpoint(ec2);
remote_endpoint_ipv4 = ec ? false : rep.address().is_v4();
remote_endpoint_ip_integer = remote_endpoint_ipv4 ? rep.address().to_v4().to_uint() : 0;
remote_endpoint_port = ec ? 0 : rep.port();
heifner marked this conversation as resolved.
Show resolved Hide resolved
log_remote_endpoint_ip = ec ? unknown : rep.address().to_string();
log_remote_endpoint_port = ec ? unknown : std::to_string(rep.port());
local_endpoint_ip = ec2 ? unknown : lep.address().to_string();
Expand Down Expand Up @@ -1517,6 +1538,7 @@ namespace eosio {
void connection::queue_write(const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue) {
bytes_sent += buff->size();
heifner marked this conversation as resolved.
Show resolved Hide resolved
if( !buffer_queue.add_write_queue( buff, std::move(callback), to_sync_queue )) {
peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) );
close();
Expand Down Expand Up @@ -2867,6 +2889,7 @@ namespace eosio {

// called from connection strand
bool connection::process_next_message( uint32_t message_length ) {
bytes_received += message_length;
try {
latest_msg_time = std::chrono::system_clock::now();

Expand Down Expand Up @@ -2906,7 +2929,7 @@ namespace eosio {
fc::raw::unpack( peek_ds, bh );

const block_id_type blk_id = bh.calculate_id();
const uint32_t blk_num = block_header::num_from_id(blk_id);
const uint32_t blk_num = last_received_blk_num = block_header::num_from_id(blk_id);
// don't add_peer_block because we have not validated this block header yet
if( my_impl->dispatcher->have_block( blk_id ) ) {
peer_dlog( this, "canceling wait, already received block ${num}, id ${id}...",
Expand Down Expand Up @@ -2941,6 +2964,7 @@ namespace eosio {
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, false);
}

++unique_blocks_rcvd_count;
heifner marked this conversation as resolved.
Show resolved Hide resolved
auto ds = pending_message_buffer.create_datastream();
fc::raw::unpack( ds, which );
shared_ptr<signed_block> ptr = std::make_shared<signed_block>();
Expand Down Expand Up @@ -4400,6 +4424,7 @@ namespace eosio {
auto it = (from ? connections.find(from) : connections.begin());
if (it == connections.end()) it = connections.begin();
size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0;
net_plugin::p2p_per_connection_metrics per_connection(max_client_count);
heifner marked this conversation as resolved.
Show resolved Hide resolved
while (it != connections.end()) {
if (fc::time_point::now() >= max_time) {
connection_wptr wit = *it;
Expand All @@ -4417,6 +4442,21 @@ namespace eosio {
} else {
++num_peers;
}
if((*it)->remote_endpoint_ipv4) {
per_connection.addresses.push_back((*it)->remote_endpoint_ip_integer);
per_connection.ports.push_back((*it)->remote_endpoint_port);
per_connection.accepting_blocks.push_back((*it)->is_blocks_connection());
per_connection.last_received_blocks.push_back((*it)->get_last_received_blk_num());
per_connection.first_available_blocks.push_back((*it)->get_peer_start_block_num());
per_connection.last_available_blocks.push_back((*it)->get_peer_head_block_num());
per_connection.bytes_received.push_back((*it)->get_bytes_received());
per_connection.bytes_sent.push_back((*it)->get_bytes_sent());
per_connection.connection_start_times.push_back((*it)->construction_time);
per_connection.unique_first_block_counts.push_back((*it)->get_unique_blocks_rcvd_count());
per_connection.latencies.push_back((*it)->get_peer_ping_time_ns());
heifner marked this conversation as resolved.
Show resolved Hide resolved
}
else
fc_wlog(logger, "socket remote endpoint is not IPv4");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't net_plugin support ipv6 now? Why would this be a wlog?

Copy link
Member

@heifner heifner Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, net_plugin supports ipv6. The Prometheus integration also needs to support ipv6.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still comments in net_plugin saying ipv6 support is needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove/update those comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


if (!(*it)->socket_is_open() && (*it)->state() != connection::connection_state::connecting) {
if (!(*it)->incoming()) {
Expand All @@ -4438,7 +4478,7 @@ namespace eosio {
g.unlock();

if (update_p2p_connection_metrics) {
update_p2p_connection_metrics({.num_peers = num_peers, .num_clients = num_clients});
update_p2p_connection_metrics({num_peers, num_clients, std::move(per_connection)});
}

if( num_clients > 0 || num_peers > 0 ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <eosio/chain/application.hpp>
#include <eosio/http_plugin/http_plugin.hpp>
#include <eosio/chain_plugin/chain_plugin.hpp>

namespace eosio {

Expand All @@ -12,7 +13,7 @@ namespace eosio {
prometheus_plugin();
~prometheus_plugin() override;

APPBASE_PLUGIN_REQUIRES((http_plugin))
APPBASE_PLUGIN_REQUIRES((http_plugin)(chain_plugin))

void set_program_options(options_description&, options_description& cfg) override;

Expand Down
Loading