Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Refactor & detach network metrics. (#6986)
Browse files Browse the repository at this point in the history
* Refactor sc-network/service metrics.

  1. Aggregate sc-network metrics into a submodule, introducing
  two more sourced metrics to avoid duplicate atomics.

  2. Decouple periodic sc-service network metrics from other
  metrics, so that they can be updated independently.

* Update client/service/src/metrics.rs

* Update client/service/src/metrics.rs
  • Loading branch information
romanb authored and bkchr committed Sep 18, 2020
1 parent 130b7d4 commit 29c993a
Show file tree
Hide file tree
Showing 8 changed files with 571 additions and 403 deletions.
6 changes: 3 additions & 3 deletions client/informant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::prelude::*;
use log::{info, trace, warn};
use parity_util_mem::MallocSizeOf;
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::{network_state::NetworkState, NetworkStatus};
use sc_network::NetworkStatus;
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::TransactionPool;
Expand Down Expand Up @@ -81,7 +81,7 @@ impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for
/// Builds the informant and returns a `Future` that drives the informant.
pub fn build<B: BlockT, C>(
client: Arc<C>,
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>,
network_status_sinks: Arc<status_sinks::StatusSinks<NetworkStatus<B>>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
format: OutputFormat,
) -> impl futures::Future<Output = ()>
Expand All @@ -96,7 +96,7 @@ where
network_status_sinks.push(Duration::from_millis(5000), network_status_sink);

let display_notifications = network_status_stream
.for_each(move |(net_status, _)| {
.for_each(move |net_status| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
trace!(target: "usage", "Usage statistics: {}", usage);
Expand Down
4 changes: 0 additions & 4 deletions client/network/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ pub struct NetworkState {
pub connected_peers: HashMap<String, Peer>,
/// List of node that we know of but that we're not connected to.
pub not_connected_peers: HashMap<String, NotConnectedPeer>,
/// The total number of bytes received.
pub total_bytes_inbound: u64,
/// The total number of bytes sent.
pub total_bytes_outbound: u64,
/// State of the peerset manager.
pub peerset: serde_json::Value,
}
Expand Down
294 changes: 21 additions & 273 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! which is then processed by [`NetworkWorker::poll`].
use crate::{
ExHashT, NetworkStateInfo,
ExHashT, NetworkStateInfo, NetworkStatus,
behaviour::{self, Behaviour, BehaviourOut},
config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent,
Expand All @@ -49,12 +49,8 @@ use libp2p::kad::record;
use libp2p::ping::handler::PingFailure;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handler::NodeHandlerWrapperError};
use log::{error, info, trace, warn};
use metrics::{Metrics, MetricSources, Histogram, HistogramVec};
use parking_lot::Mutex;
use prometheus_endpoint::{
register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts,
PrometheusError, Registry, U64,
SourcedCounter, MetricSource
};
use sc_peerset::PeersetHandle;
use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link};
use sp_runtime::{
Expand All @@ -80,6 +76,7 @@ use wasm_timer::Instant;

pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};

mod metrics;
mod out_events;
#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -365,10 +362,11 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// Initialize the metrics.
let metrics = match &params.metrics_registry {
Some(registry) => {
// Sourced metrics.
BandwidthCounters::register(registry, bandwidth.clone())?;
// Other (i.e. new) metrics.
Some(Metrics::register(registry)?)
Some(metrics::register(registry, MetricSources {
bandwidth: bandwidth.clone(),
major_syncing: is_major_syncing.clone(),
connected_peers: num_connected.clone(),
})?)
}
None => None
};
Expand Down Expand Up @@ -423,6 +421,19 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
})
}

/// High-level network status information.
pub fn status(&self) -> NetworkStatus<B> {
NetworkStatus {
sync_state: self.sync_state(),
best_seen_block: self.best_seen_block(),
num_sync_peers: self.num_sync_peers(),
num_connected_peers: self.num_connected_peers(),
num_active_peers: self.num_active_peers(),
total_bytes_inbound: self.total_bytes_inbound(),
total_bytes_outbound: self.total_bytes_outbound(),
}
}

/// Returns the total number of bytes received so far.
pub fn total_bytes_inbound(&self) -> u64 {
self.service.bandwidth.total_inbound()
Expand Down Expand Up @@ -562,8 +573,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peer_id: Swarm::<B, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, H>::external_addresses(&swarm).cloned().collect(),
total_bytes_inbound: self.service.bandwidth.total_inbound(),
total_bytes_outbound: self.service.bandwidth.total_outbound(),
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
Expand Down Expand Up @@ -1175,265 +1184,6 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
}

struct Metrics {
// This list is ordered alphabetically
connections_closed_total: CounterVec<U64>,
connections_opened_total: CounterVec<U64>,
distinct_peers_connections_closed_total: Counter<U64>,
distinct_peers_connections_opened_total: Counter<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_finality_proofs_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
incoming_connections_errors_total: CounterVec<U64>,
incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>,
kademlia_query_duration: HistogramVec,
kademlia_random_queries_total: CounterVec<U64>,
kademlia_records_count: GaugeVec<U64>,
kademlia_records_sizes_total: GaugeVec<U64>,
kbuckets_num_nodes: GaugeVec<U64>,
listeners_local_addresses: Gauge<U64>,
listeners_errors_total: Counter<U64>,
notifications_sizes: HistogramVec,
notifications_streams_closed_total: CounterVec<U64>,
notifications_streams_opened_total: CounterVec<U64>,
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
peerset_num_requested: Gauge<U64>,
pending_connections: Gauge<U64>,
pending_connections_errors_total: CounterVec<U64>,
requests_in_failure_total: CounterVec<U64>,
requests_in_success_total: HistogramVec,
requests_out_failure_total: CounterVec<U64>,
requests_out_success_total: HistogramVec,
requests_out_started_total: CounterVec<U64>,
}

/// The source for bandwidth metrics.
#[derive(Clone)]
struct BandwidthCounters(Arc<transport::BandwidthSinks>);

impl BandwidthCounters {
fn register(registry: &Registry, sinks: Arc<transport::BandwidthSinks>)
-> Result<(), PrometheusError>
{
register(SourcedCounter::new(
&Opts::new(
"sub_libp2p_network_bytes_total",
"Total bandwidth usage"
).variable_label("direction"),
BandwidthCounters(sinks),
)?, registry)?;

Ok(())
}
}

impl MetricSource for BandwidthCounters {
type N = u64;

fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
set(&[&"in"], self.0.total_inbound());
set(&[&"out"], self.0.total_outbound());
}
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
// This list is ordered alphabetically
connections_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_closed_total",
"Total number of connections closed, by direction and reason"
),
&["direction", "reason"]
)?, registry)?,
connections_opened_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_opened_total",
"Total number of connections opened by direction"
),
&["direction"]
)?, registry)?,
distinct_peers_connections_closed_total: register(Counter::new(
"sub_libp2p_distinct_peers_connections_closed_total",
"Total number of connections closed with distinct peers"
)?, registry)?,
distinct_peers_connections_opened_total: register(Counter::new(
"sub_libp2p_distinct_peers_connections_opened_total",
"Total number of connections opened with distinct peers"
)?, registry)?,
import_queue_blocks_submitted: register(Counter::new(
"import_queue_blocks_submitted",
"Number of blocks submitted to the import queue.",
)?, registry)?,
import_queue_finality_proofs_submitted: register(Counter::new(
"import_queue_finality_proofs_submitted",
"Number of finality proofs submitted to the import queue.",
)?, registry)?,
import_queue_justifications_submitted: register(Counter::new(
"import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?, registry)?,
incoming_connections_errors_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_incoming_connections_handshake_errors_total",
"Total number of incoming connections that have failed during the \
initial handshake"
),
&["reason"]
)?, registry)?,
incoming_connections_total: register(Counter::new(
"sub_libp2p_incoming_connections_total",
"Total number of incoming connections on the listening sockets"
)?, registry)?,
is_major_syncing: register(Gauge::new(
"sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.",
)?, registry)?,
issued_light_requests: register(Counter::new(
"issued_light_requests",
"Number of light client requests that our node has issued.",
)?, registry)?,
kademlia_query_duration: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_kademlia_query_duration",
"Duration of Kademlia queries per query type"
),
buckets: prometheus_endpoint::exponential_buckets(0.5, 2.0, 10)
.expect("parameters are always valid values; qed"),
},
&["type"]
)?, registry)?,
kademlia_random_queries_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_kademlia_random_queries_total",
"Number of random Kademlia queries started"
),
&["protocol"]
)?, registry)?,
kademlia_records_count: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kademlia_records_count",
"Number of records in the Kademlia records store"
),
&["protocol"]
)?, registry)?,
kademlia_records_sizes_total: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kademlia_records_sizes_total",
"Total size of all the records in the Kademlia records store"
),
&["protocol"]
)?, registry)?,
kbuckets_num_nodes: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kbuckets_num_nodes",
"Number of nodes in the Kademlia k-buckets"
),
&["protocol"]
)?, registry)?,
listeners_local_addresses: register(Gauge::new(
"sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on"
)?, registry)?,
listeners_errors_total: register(Counter::new(
"sub_libp2p_listeners_errors_total",
"Total number of non-fatal errors reported by a listener"
)?, registry)?,
notifications_sizes: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_notifications_sizes",
"Sizes of the notifications send to and received from all nodes"
),
buckets: prometheus_endpoint::exponential_buckets(64.0, 4.0, 8)
.expect("parameters are always valid values; qed"),
},
&["direction", "protocol"]
)?, registry)?,
notifications_streams_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed"
),
&["protocol"]
)?, registry)?,
notifications_streams_opened_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened"
),
&["protocol"]
)?, registry)?,
peers_count: register(Gauge::new(
"sub_libp2p_peers_count", "Number of network gossip peers",
)?, registry)?,
peerset_num_discovered: register(Gauge::new(
"sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager",
)?, registry)?,
peerset_num_requested: register(Gauge::new(
"sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to",
)?, registry)?,
pending_connections: register(Gauge::new(
"sub_libp2p_pending_connections",
"Number of connections in the process of being established",
)?, registry)?,
pending_connections_errors_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_pending_connections_errors_total",
"Total number of pending connection errors"
),
&["reason"]
)?, registry)?,
requests_in_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_in_failure_total",
"Total number of incoming requests that the node has failed to answer"
),
&["protocol", "reason"]
)?, registry)?,
requests_in_success_total: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_in_success_total",
"Total number of requests received and answered"
),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
requests_out_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_failure_total",
"Total number of requests that have failed"
),
&["protocol", "reason"]
)?, registry)?,
requests_out_success_total: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_out_success_total",
"For successful requests, time between a request's start and finish"
),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
requests_out_started_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_started_total",
"Total number of requests emitted"
),
&["protocol"]
)?, registry)?,
})
}
}

impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
type Output = ();

Expand Down Expand Up @@ -1902,7 +1652,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);

if let Some(metrics) = this.metrics.as_ref() {
metrics.is_major_syncing.set(is_major_syncing as u64);
for (proto, num_entries) in this.network_service.num_kbuckets_entries() {
metrics.kbuckets_num_nodes.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
Expand All @@ -1912,7 +1661,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
for (proto, num_entries) in this.network_service.kademlia_records_total_size() {
metrics.kademlia_records_sizes_total.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64);
Expand Down
Loading

0 comments on commit 29c993a

Please sign in to comment.