Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc/metrics: Track # connected nodes supporting specific protocol #2734

Merged
merged 16 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

- Update to `libp2p-kad` `v0.38.0`.

- Track number of connected nodes supporting a specific protocol via the identify protocol.
mxinden marked this conversation as resolved.
Show resolved Hide resolved

# 0.6.1

- Update `dcutr` events from `libp2p_relay_events` to `libp2p_dcutr_events`, to avoid conflict with `relay` events.
Expand Down
5 changes: 2 additions & 3 deletions misc/metrics/src/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType {
}
}

impl super::Recorder<libp2p_dcutr::behaviour::Event> for super::Metrics {
impl super::Recorder<libp2p_dcutr::behaviour::Event> for Metrics {
fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
self.dcutr
.events
self.events
.get_or_create(&EventLabels {
event: event.into(),
})
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for super::Metrics {
impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event {
self.gossipsub.messages.inc();
self.messages.inc();
}
}
}
136 changes: 126 additions & 10 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_core::PeerId;
use prometheus_client::encoding::text::{EncodeMetric, Encoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
use std::collections::HashMap;
use std::iter;
use std::sync::{Arc, Mutex};

pub struct Metrics {
protocols: Protocols,
error: Counter,
pushed: Counter,
received: Counter,
Expand All @@ -36,6 +42,13 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");

let protocols = Protocols::default();
sub_registry.register(
"protocols",
"Number of connected nodes supporting a specific protocol",
Box::new(protocols.clone()),
);

let error = Counter::default();
sub_registry.register(
"errors",
Expand Down Expand Up @@ -86,6 +99,7 @@ impl Metrics {
);

Self {
protocols,
error,
pushed,
received,
Expand All @@ -96,27 +110,129 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_identify::IdentifyEvent> for super::Metrics {
impl super::Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
match event {
libp2p_identify::IdentifyEvent::Error { .. } => {
self.identify.error.inc();
self.error.inc();
}
libp2p_identify::IdentifyEvent::Pushed { .. } => {
self.identify.pushed.inc();
self.pushed.inc();
}
libp2p_identify::IdentifyEvent::Received { info, .. } => {
self.identify.received.inc();
self.identify
.received_info_protocols
libp2p_identify::IdentifyEvent::Received { peer_id, info, .. } => {
{
let mut protocols: Vec<String> = info
.protocols
.iter()
.filter(|p| {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
let allowed_protocols: &[&[u8]] = &[
#[cfg(feature = "dcutr")]
libp2p_dcutr::PROTOCOL_NAME,
// #[cfg(feature = "gossipsub")]
// #[cfg(not(target_os = "unknown"))]
// TODO: Add Gossipsub protocol name
libp2p_identify::PROTOCOL_NAME,
libp2p_identify::PUSH_PROTOCOL_NAME,
#[cfg(feature = "kad")]
libp2p_kad::protocol::DEFAULT_PROTO_NAME,
#[cfg(feature = "ping")]
libp2p_ping::PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::STOP_PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::HOP_PROTOCOL_NAME,
];

allowed_protocols.contains(&p.as_bytes())
})
.cloned()
.collect();
protocols.sort_unstable();
protocols.dedup();

self.protocols.add(*peer_id, protocols);
}

self.received.inc();
self.received_info_protocols
.observe(info.protocols.len() as f64);
self.identify
.received_info_listen_addrs
self.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64);
}
libp2p_identify::IdentifyEvent::Sent { .. } => {
self.identify.sent.inc();
self.sent.inc();
}
}
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
..
} = event
{
if *num_established == 0 {
self.protocols.remove(*peer_id)
}
}
}
}

#[derive(Default, Clone)]
struct Protocols {
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,
}

impl Protocols {
fn add(&self, peer: PeerId, protocols: Vec<String>) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.insert(peer, protocols);
}

fn remove(&self, peer: PeerId) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.remove(&peer);
}
}

impl EncodeMetric for Protocols {
fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> {
let count_by_protocol = self
.peers
.lock()
.expect("Lock not to be poisoned")
.iter()
.fold(
HashMap::<String, u64>::default(),
|mut acc, (_, protocols)| {
for protocol in protocols {
let count = acc.entry(protocol.to_string()).or_default();
*count = *count + 1;
}
acc
},
);

for (protocol, count) in count_by_protocol {
encoder
.with_label_set(&("protocol", protocol))
.no_suffix()?
.no_bucket()?
.encode_value(count)?
.no_exemplar()?;
}

Ok(())
}

fn metric_type(&self) -> MetricType {
MetricType::Gauge
}
}
40 changes: 12 additions & 28 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,62 +159,52 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
self.kad
.query_result_num_requests
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
self.kad
.query_result_num_success
self.query_result_num_success
.get_or_create(&result.into())
.observe(stats.num_successes().into());
self.kad
.query_result_num_failure
self.query_result_num_failure
.get_or_create(&result.into())
.observe(stats.num_failures().into());
if let Some(duration) = stats.duration() {
self.kad
.query_result_duration
self.query_result_duration
.get_or_create(&result.into())
.observe(duration.as_secs_f64());
}

match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.kad
.query_result_get_record_ok
.observe(ok.records.len() as f64),
Err(error) => {
self.kad
.query_result_get_record_error
self.query_result_get_record_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetClosestPeers(result) => match result {
Ok(ok) => self
.kad
.query_result_get_closest_peers_ok
.observe(ok.peers.len() as f64),
Err(error) => {
self.kad
.query_result_get_closest_peers_error
self.query_result_get_closest_peers_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.kad
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Err(error) => {
self.kad
.query_result_get_providers_error
self.query_result_get_providers_error
.get_or_create(&error.into())
.inc();
}
Expand All @@ -230,16 +220,14 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
} => {
let bucket = low.ilog2().unwrap_or(0);
if *is_new_peer {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Added,
bucket,
})
.inc();
} else {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Updated,
bucket,
Expand All @@ -248,8 +236,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}

if old_peer.is_some() {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Evicted,
bucket,
Expand All @@ -259,10 +246,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}

libp2p_kad::KademliaEvent::InboundRequest { request } => {
self.kad
.inbound_requests
.get_or_create(&request.into())
.inc();
self.inbound_requests.get_or_create(&request.into()).inc();
}
_ => {}
}
Expand Down
52 changes: 52 additions & 0 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,55 @@ pub trait Recorder<Event> {
/// Record the given event.
fn record(&self, event: &Event);
}

#[cfg(feature = "dcutr")]
impl Recorder<libp2p_dcutr::behaviour::Event> for Metrics {
fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
self.dcutr.record(event)
}
}

#[cfg(feature = "gossipsub")]
#[cfg(not(target_os = "unknown"))]
impl Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
self.gossipsub.record(event)
}
}

#[cfg(feature = "identify")]
impl Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
self.identify.record(event)
}
}

#[cfg(feature = "kad")]
impl Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
self.kad.record(event)
}
}

#[cfg(feature = "ping")]
impl Recorder<libp2p_ping::PingEvent> for Metrics {
fn record(&self, event: &libp2p_ping::PingEvent) {
self.ping.record(event)
}
}

#[cfg(feature = "relay")]
impl Recorder<libp2p_relay::v2::relay::Event> for Metrics {
fn record(&self, event: &libp2p_relay::v2::relay::Event) {
self.relay.record(event)
}
}

impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
self.swarm.record(event);

#[cfg(feature = "identify")]
self.identify.record(event)
}
}
Loading