From 9c940fa3f2f593cc61e49b95b1c7c948e436695c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 25 Oct 2023 21:24:20 +0200 Subject: [PATCH 01/27] feat(libp2p): track bandwidth per transport protocol stack Previously one could use the `with_bandwidth_logging` to measure the overall bandwidth usage, only differentiated by inbound and outbound traffic. With this commit bandwidth can be tracked per transport protocol stack (e.g. `/ip4/tcp`). In addition this commit adds the ability to redirect these metrics to a `prometheus_client::Registry`. ``` bandwidth_total{protocols="/ip4/tcp",direction="inbound"} 363 bandwidth_total{protocols="/ip4/tcp",direction="outbound"} 348 ``` --- Cargo.lock | 1 + examples/metrics/src/main.rs | 5 ++- libp2p/Cargo.toml | 1 + libp2p/src/bandwidth.rs | 37 +++++++++++++++++- libp2p/src/builder/phase/bandwidth_logging.rs | 5 ++- libp2p/src/builder/phase/other_transport.rs | 5 ++- libp2p/src/builder/phase/quic.rs | 7 +++- libp2p/src/transport_ext.rs | 38 ++++++++++++++++--- misc/metrics/src/lib.rs | 2 + 9 files changed, 88 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61095bb524e..f8e3c6d07ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2409,6 +2409,7 @@ dependencies = [ "libp2p-yamux", "multiaddr", "pin-project", + "prometheus-client", "rw-stream-sink", "thiserror", "tokio", diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index 09d4f7a5941..d4c6b442426 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -37,13 +37,15 @@ mod http_service; fn main() -> Result<(), Box> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - let mut swarm = libp2p::SwarmBuilder::with_new_identity() + let (builder, bandwidth_logging) = libp2p::SwarmBuilder::with_new_identity() .with_async_std() .with_tcp( tcp::Config::default(), noise::Config::new, yamux::Config::default, )? + .with_bandwidth_logging(); + let mut swarm = builder .with_behaviour(|key| Behaviour::new(key.public()))? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))) .build(); @@ -58,6 +60,7 @@ fn main() -> Result<(), Box> { let mut metric_registry = Registry::default(); let metrics = Metrics::new(&mut metric_registry); + libp2p::bandwidth::register_bandwidth_sinks(&mut metric_registry, bandwidth_logging); thread::spawn(move || block_on(http_service::metrics_server(metric_registry))); block_on(async { diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 01c4fea81cc..9511686d81a 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -100,6 +100,7 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` featu instant = "0.1.12" # Explicit dependency to be used in `wasm-bindgen` feature # TODO feature flag? rw-stream-sink = { workspace = true } +prometheus-client = { workspace = true } libp2p-allow-block-list = { workspace = true } libp2p-autonat = { workspace = true, optional = true } diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index dc696ce07e2..fb60a449ca2 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -25,13 +25,18 @@ use futures::{ prelude::*, ready, }; +use prometheus_client::{ + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::{counter::ConstCounter, MetricType}, +}; use std::{ + collections::HashMap, convert::TryFrom as _, io, pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, RwLock, }, task::{Context, Poll}, }; @@ -101,6 +106,7 @@ where } /// Allows obtaining the average bandwidth of the streams. +#[derive(Default, Debug)] pub struct BandwidthSinks { inbound: AtomicU64, outbound: AtomicU64, @@ -209,3 +215,32 @@ impl AsyncWrite for InstrumentedStream { this.inner.poll_close(cx) } } + +// TODO: Ideally this should go somewhere else. I.e. good to not depend on prometheus-client in libp2p. +pub fn register_bandwidth_sinks( + registry: &mut prometheus_client::registry::Registry, + sinks: Arc>>>, +) { + registry.register_collector(Box::new(SinksCollector(sinks))); +} + +#[derive(Debug)] +struct SinksCollector(Arc>>>); + +impl prometheus_client::collector::Collector for SinksCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let mut family_encoder = + encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?; + for (protocols, sink) in self.0.read().expect("todo").iter() { + let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; + let metric_encoder = family_encoder.encode_family(&labels)?; + ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?; + + let labels = [("protocols", protocols.as_str()), ("direction", "outbound")]; + let metric_encoder = family_encoder.encode_family(&labels)?; + ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?; + } + + Ok(()) + } +} diff --git a/libp2p/src/builder/phase/bandwidth_logging.rs b/libp2p/src/builder/phase/bandwidth_logging.rs index 3f3142d31a2..134de7a924b 100644 --- a/libp2p/src/builder/phase/bandwidth_logging.rs +++ b/libp2p/src/builder/phase/bandwidth_logging.rs @@ -2,8 +2,9 @@ use super::*; use crate::bandwidth::BandwidthSinks; use crate::transport_ext::TransportExt; use crate::SwarmBuilder; +use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; pub struct BandwidthLoggingPhase { pub(crate) relay_behaviour: R, @@ -17,7 +18,7 @@ impl self, ) -> ( SwarmBuilder>, - Arc, + Arc>>>, ) { let (transport, sinks) = self.phase.transport.with_bandwidth_logging(); ( diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index 946b696323c..3699a278163 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -1,6 +1,7 @@ +use std::collections::HashMap; use std::convert::Infallible; use std::marker::PhantomData; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; use libp2p_core::Transport; @@ -153,7 +154,7 @@ impl Provider, BehaviourPhase, >, - Arc, + Arc>>>, ) { self.without_any_other_transports() .without_dns() diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index ae8d9400c25..716febd4abd 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -1,4 +1,5 @@ use super::*; +use crate::bandwidth::BandwidthSinks; use crate::SwarmBuilder; #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))] use libp2p_core::muxing::StreamMuxer; @@ -8,7 +9,9 @@ use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; all(not(target_arch = "wasm32"), feature = "websocket") ))] use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo}; -use std::{marker::PhantomData, sync::Arc}; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::{Arc, RwLock}; pub struct QuicPhase { pub(crate) transport: T, @@ -254,7 +257,7 @@ impl SwarmBuilder, >, - Arc, + Arc>>>, ) { self.without_quic() .without_any_other_transports() diff --git a/libp2p/src/transport_ext.rs b/libp2p/src/transport_ext.rs index 8f7c16574f6..435b20c5f0c 100644 --- a/libp2p/src/transport_ext.rs +++ b/libp2p/src/transport_ext.rs @@ -29,7 +29,11 @@ use crate::{ Transport, }; use libp2p_identity::PeerId; -use std::sync::Arc; +use multiaddr::Multiaddr; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; /// Trait automatically implemented on all objects that implement `Transport`. Provides some /// additional utilities. @@ -66,7 +70,12 @@ pub trait TransportExt: Transport { /// /// let (transport, sinks) = transport.with_bandwidth_logging(); /// ``` - fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) + fn with_bandwidth_logging( + self, + ) -> ( + Boxed<(PeerId, StreamMuxerBox)>, + Arc>>>, + ) where Self: Sized + Send + Unpin + 'static, Self::Dial: Send + 'static, @@ -77,13 +86,32 @@ pub trait TransportExt: Transport { S::Substream: Send + 'static, S::Error: Send + Sync + 'static, { - let sinks = BandwidthSinks::new(); + let sinks: Arc>>> = Arc::new(RwLock::new(HashMap::new())); let sinks_copy = sinks.clone(); - let transport = Transport::map(self, |output, _| { + let transport = Transport::map(self, move |output, connected_point| { + fn as_string(ma: &Multiaddr) -> String { + let len = ma + .protocol_stack() + .fold(0, |acc, proto| acc + proto.len() + 1); + let mut protocols = String::with_capacity(len); + for proto_tag in ma.protocol_stack() { + protocols.push('/'); + protocols.push_str(proto_tag); + } + protocols + } + + let sink = sinks_copy + .write() + .expect("todo") + .entry(as_string(connected_point.get_remote_address())) + .or_default() + .clone(); + let (peer_id, stream_muxer_box) = output.into(); ( peer_id, - StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)), + StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sink)), ) }) .boxed(); diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 2132dd5d7fb..1f8dd224674 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -42,6 +42,8 @@ mod protocol_stack; mod relay; mod swarm; +use std::{sync::{Arc, RwLock}, collections::HashMap}; + use prometheus_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. From e1b449780bc38085f9dd16d98bd44681c9fc66f1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 27 Oct 2023 15:37:34 +0200 Subject: [PATCH 02/27] Move bandwidth logging to misc/metrics --- Cargo.lock | 2 + libp2p/src/builder.rs | 25 +- libp2p/src/builder/phase/bandwidth_logging.rs | 33 +- libp2p/src/builder/phase/other_transport.rs | 15 +- libp2p/src/builder/phase/quic.rs | 14 +- misc/metrics/Cargo.toml | 2 + misc/metrics/src/bandwidth.rs | 383 ++++++++++++++++++ misc/metrics/src/lib.rs | 6 +- 8 files changed, 429 insertions(+), 51 deletions(-) create mode 100644 misc/metrics/src/bandwidth.rs diff --git a/Cargo.lock b/Cargo.lock index f8e3c6d07ee..6f8d942d372 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2753,6 +2753,7 @@ dependencies = [ name = "libp2p-metrics" version = "0.14.0" dependencies = [ + "futures", "instant", "libp2p-core", "libp2p-dcutr", @@ -2763,6 +2764,7 @@ dependencies = [ "libp2p-ping", "libp2p-relay", "libp2p-swarm", + "pin-project", "prometheus-client", ] diff --git a/libp2p/src/builder.rs b/libp2p/src/builder.rs index 60b977f4397..eb7695604f8 100644 --- a/libp2p/src/builder.rs +++ b/libp2p/src/builder.rs @@ -73,7 +73,7 @@ mod tests { use crate::SwarmBuilder; use libp2p_core::{muxing::StreamMuxerBox, transport::dummy::DummyTransport}; use libp2p_identity::PeerId; - use libp2p_swarm::{NetworkBehaviour, Swarm}; + use libp2p_swarm::NetworkBehaviour; #[test] #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))] @@ -301,7 +301,7 @@ mod tests { relay: libp2p_relay::client::Behaviour, } - let (builder, _bandwidth_sinks) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( Default::default(), @@ -317,8 +317,7 @@ mod tests { .unwrap() .with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default) .unwrap() - .with_bandwidth_logging(); - let _: Swarm = builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_key, relay| MyBehaviour { relay }) .unwrap() .build(); @@ -327,16 +326,14 @@ mod tests { #[test] #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "yamux"))] fn tcp_bandwidth_logging() -> Result<(), Box> { - let (builder, _logging) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( Default::default(), libp2p_tls::Config::new, libp2p_yamux::Config::default, )? - .with_bandwidth_logging(); - - builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); @@ -347,12 +344,10 @@ mod tests { #[test] #[cfg(all(feature = "tokio", feature = "quic"))] fn quic_bandwidth_logging() -> Result<(), Box> { - let (builder, _logging) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_quic() - .with_bandwidth_logging(); - - builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); @@ -363,12 +358,10 @@ mod tests { #[test] #[cfg(feature = "tokio")] fn other_transport_bandwidth_logging() -> Result<(), Box> { - let (builder, _logging) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_other_transport(|_| DummyTransport::<(PeerId, StreamMuxerBox)>::new())? - .with_bandwidth_logging(); - - builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); diff --git a/libp2p/src/builder/phase/bandwidth_logging.rs b/libp2p/src/builder/phase/bandwidth_logging.rs index 134de7a924b..be848744cb0 100644 --- a/libp2p/src/builder/phase/bandwidth_logging.rs +++ b/libp2p/src/builder/phase/bandwidth_logging.rs @@ -1,5 +1,7 @@ +use multiaddr::Multiaddr; + use super::*; -use crate::bandwidth::BandwidthSinks; +use crate::metrics::bandwidth::{BandwidthSinks, Muxer}; use crate::transport_ext::TransportExt; use crate::SwarmBuilder; use std::collections::HashMap; @@ -11,29 +13,28 @@ pub struct BandwidthLoggingPhase { pub(crate) transport: T, } +#[cfg(feature = "metrics")] impl SwarmBuilder> { pub fn with_bandwidth_logging( self, - ) -> ( - SwarmBuilder>, - Arc>>>, - ) { - let (transport, sinks) = self.phase.transport.with_bandwidth_logging(); - ( - SwarmBuilder { - phase: BehaviourPhase { - relay_behaviour: self.phase.relay_behaviour, - transport, - }, - keypair: self.keypair, - phantom: PhantomData, + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder> { + SwarmBuilder { + phase: BehaviourPhase { + relay_behaviour: self.phase.relay_behaviour, + transport: crate::metrics::bandwidth::Transport::new(self.phase.transport, registry), }, - sinks, - ) + keypair: self.keypair, + phantom: PhantomData, + } } +} +impl + SwarmBuilder> +{ pub fn without_bandwidth_logging(self) -> SwarmBuilder> { SwarmBuilder { phase: BehaviourPhase { diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index 3699a278163..7891e7119b5 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -144,23 +144,22 @@ impl .with_relay_client(security_upgrade, multiplexer_upgrade) } } +#[cfg(feature = "metrics")] impl SwarmBuilder> { pub fn with_bandwidth_logging( self, - ) -> ( - SwarmBuilder< - Provider, - BehaviourPhase, - >, - Arc>>>, - ) { + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_any_other_transports() .without_dns() .without_websocket() .without_relay() - .with_bandwidth_logging() + .with_bandwidth_logging(registry) } } impl diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index 716febd4abd..4ffc9846f3c 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -252,18 +252,16 @@ impl_quic_phase_with_websocket!( impl SwarmBuilder> { pub fn with_bandwidth_logging( self, - ) -> ( - SwarmBuilder< - Provider, - BehaviourPhase, - >, - Arc>>>, - ) { + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_quic() .without_any_other_transports() .without_dns() .without_websocket() .without_relay() - .with_bandwidth_logging() + .with_bandwidth_logging(registry) } } diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 506f8a574ce..c2b023db999 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -19,6 +19,7 @@ ping = ["libp2p-ping"] relay = ["libp2p-relay"] [dependencies] +futures = "0.3.26" instant = "0.1.12" libp2p-core = { workspace = true } libp2p-dcutr = { workspace = true, optional = true } @@ -29,6 +30,7 @@ libp2p-kad = { workspace = true, optional = true } libp2p-ping = { workspace = true, optional = true } libp2p-relay = { workspace = true, optional = true } libp2p-swarm = { workspace = true } +pin-project = "1.0.0" prometheus-client = { workspace = true } [dev-dependencies] diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs new file mode 100644 index 00000000000..58f0fc81663 --- /dev/null +++ b/misc/metrics/src/bandwidth.rs @@ -0,0 +1,383 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::{ + muxing::{StreamMuxer, StreamMuxerBox, StreamMuxerEvent}, + transport::{ListenerId, TransportError, TransportEvent}, + Multiaddr, +}; + +use futures::{ + io::{IoSlice, IoSliceMut}, + prelude::*, + ready, +}; +use libp2p_identity::PeerId; +use prometheus_client::registry::Registry; +use prometheus_client::{ + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::{counter::ConstCounter, MetricType}, +}; +use std::{ + collections::HashMap, + convert::TryFrom as _, + io, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, + task::{Context, Poll}, +}; + +// TODO: rename to Transport +/// See `Transport::map`. +#[derive(Debug, Clone)] +#[pin_project::pin_project] +pub struct Transport { + #[pin] + transport: T, + sinks: Arc>>>, +} + +impl Transport { + pub fn new(transport: T, registry: &mut Registry) -> Self { + let sinks: Arc>>> = + Arc::new(RwLock::new(HashMap::new())); + + registry.register_collector(Box::new(SinksCollector(sinks.clone()))); + + Transport { transport, sinks } + } +} + +impl libp2p_core::Transport for Transport +where + // TODO: Consider depending on StreamMuxer only. + T: libp2p_core::Transport, +{ + type Output = (PeerId, StreamMuxerBox); + type Error = T::Error; + type ListenerUpgrade = MapFuture; + type Dial = MapFuture; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.transport.listen_on(id, addr) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + self.transport.remove_listener(id) + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let sinks = self + .sinks + .write() + .expect("todo") + .entry(as_string(&addr)) + .or_default() + .clone(); + let future = self.transport.dial(addr.clone())?; + Ok(MapFuture { + inner: future, + sinks: Some(sinks), + }) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + let sinks = self + .sinks + .write() + .expect("todo") + .entry(as_string(&addr)) + .or_default() + .clone(); + let future = self.transport.dial_as_listener(addr.clone())?; + Ok(MapFuture { + inner: future, + sinks: Some(sinks), + }) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + match this.transport.poll(cx) { + Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade, + local_addr, + send_back_addr, + }) => { + // TODO: Abstract into method? + let sinks = this + .sinks + .write() + .expect("todo") + .entry(as_string(&send_back_addr)) + .or_default() + .clone(); + Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade: MapFuture { + inner: upgrade, + sinks: Some(sinks), + }, + local_addr, + send_back_addr, + }) + } + Poll::Ready(other) => { + let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched")); + Poll::Ready(mapped) + } + Poll::Pending => Poll::Pending, + } + } +} + +/// Custom `Future` to avoid boxing. +/// +/// Applies a function to the inner future's result. +#[pin_project::pin_project] +#[derive(Clone, Debug)] +pub struct MapFuture { + #[pin] + inner: T, + sinks: Option>, +} + +impl Future for MapFuture +where + T: TryFuture, +{ + type Output = Result<(PeerId, StreamMuxerBox), T::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let (peer_id, stream_muxer) = match TryFuture::try_poll(this.inner, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + }; + Poll::Ready(Ok(( + peer_id, + StreamMuxerBox::new(Muxer::new( + stream_muxer, + this.sinks.take().expect("todo"), + )), + ))) + } +} + +/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened +/// streams. +#[derive(Clone)] +#[pin_project::pin_project] +pub struct Muxer { + #[pin] + inner: SMInner, + sinks: Arc, +} + +impl Muxer { + /// Creates a new [`BandwidthLogging`] around the stream muxer. + pub fn new(inner: SMInner, sinks: Arc) -> Self { + Self { inner, sinks } + } +} + +impl StreamMuxer for Muxer +where + SMInner: StreamMuxer, +{ + type Substream = InstrumentedStream; + type Error = SMInner::Error; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + this.inner.poll(cx) + } + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_inbound(cx)?); + let logged = InstrumentedStream { + inner, + sinks: this.sinks.clone(), + }; + Poll::Ready(Ok(logged)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_outbound(cx)?); + let logged = InstrumentedStream { + inner, + sinks: this.sinks.clone(), + }; + Poll::Ready(Ok(logged)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } +} + +/// Allows obtaining the average bandwidth of the streams. +#[derive(Default, Debug)] +pub struct BandwidthSinks { + inbound: AtomicU64, + outbound: AtomicU64, +} + +/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. +#[pin_project::pin_project] +pub struct InstrumentedStream { + #[pin] + inner: SMInner, + sinks: Arc, +} + +impl AsyncRead for InstrumentedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read(cx, buf))?; + this.sinks.inbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; + this.sinks.inbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } +} + +impl AsyncWrite for InstrumentedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write(cx, buf))?; + this.sinks.outbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; + this.sinks.outbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } +} + +#[derive(Debug)] +pub struct SinksCollector(Arc>>>); + +impl prometheus_client::collector::Collector for SinksCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let mut family_encoder = + encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?; + for (protocols, sink) in self.0.read().expect("todo").iter() { + let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; + let metric_encoder = family_encoder.encode_family(&labels)?; + ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?; + + let labels = [("protocols", protocols.as_str()), ("direction", "outbound")]; + let metric_encoder = family_encoder.encode_family(&labels)?; + ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?; + } + + Ok(()) + } +} + +fn as_string(ma: &Multiaddr) -> String { + let len = ma + .protocol_stack() + .fold(0, |acc, proto| acc + proto.len() + 1); + let mut protocols = String::with_capacity(len); + for proto_tag in ma.protocol_stack() { + protocols.push('/'); + protocols.push_str(proto_tag); + } + protocols +} diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 1f8dd224674..cd99bc7d7fd 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -27,6 +27,8 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +// TODO: pub? +pub mod bandwidth; #[cfg(feature = "dcutr")] mod dcutr; #[cfg(feature = "gossipsub")] @@ -42,9 +44,7 @@ mod protocol_stack; mod relay; mod swarm; -use std::{sync::{Arc, RwLock}, collections::HashMap}; - -use prometheus_client::registry::Registry; +pub use prometheus_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. pub struct Metrics { From 84f68e7e2bf63f61e3e9f9b44b9d609df7c669c5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 27 Oct 2023 16:05:33 +0200 Subject: [PATCH 03/27] Add bandwidth metrics to server --- libp2p/src/bandwidth.rs | 26 -------------------------- libp2p/src/builder/phase/quic.rs | 1 + libp2p/src/builder/phase/websocket.rs | 14 ++++++++++++++ misc/server/src/main.rs | 4 +++- 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index fb60a449ca2..880072118b4 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -112,32 +112,6 @@ pub struct BandwidthSinks { outbound: AtomicU64, } -impl BandwidthSinks { - /// Returns a new [`BandwidthSinks`]. - pub(crate) fn new() -> Arc { - Arc::new(Self { - inbound: AtomicU64::new(0), - outbound: AtomicU64::new(0), - }) - } - - /// Returns the total number of bytes that have been downloaded on all the streams. - /// - /// > **Note**: This method is by design subject to race conditions. The returned value should - /// > only ever be used for statistics purposes. - pub fn total_inbound(&self) -> u64 { - self.inbound.load(Ordering::Relaxed) - } - - /// Returns the total number of bytes that have been uploaded on all the streams. - /// - /// > **Note**: This method is by design subject to race conditions. The returned value should - /// > only ever be used for statistics purposes. - pub fn total_outbound(&self) -> u64 { - self.outbound.load(Ordering::Relaxed) - } -} - /// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. #[pin_project::pin_project] pub(crate) struct InstrumentedStream { diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index 4ffc9846f3c..f8db1ee84c3 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -249,6 +249,7 @@ impl_quic_phase_with_websocket!( super::provider::Tokio, rw_stream_sink::RwStreamSink> ); +#[cfg(feature = "metrics")] impl SwarmBuilder> { pub fn with_bandwidth_logging( self, diff --git a/libp2p/src/builder/phase/websocket.rs b/libp2p/src/builder/phase/websocket.rs index aeb6236a026..3abb8060a05 100644 --- a/libp2p/src/builder/phase/websocket.rs +++ b/libp2p/src/builder/phase/websocket.rs @@ -197,6 +197,20 @@ impl SwarmBuilder SwarmBuilder> { + pub fn with_bandwidth_logging( + self, + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { + self.without_websocket() + .without_relay() + .with_bandwidth_logging(registry) + } +} #[derive(Debug, thiserror::Error)] #[error(transparent)] diff --git a/misc/server/src/main.rs b/misc/server/src/main.rs index 0573aae5c6f..32f75cd48af 100644 --- a/misc/server/src/main.rs +++ b/misc/server/src/main.rs @@ -53,6 +53,8 @@ async fn main() -> Result<(), Box> { let config = Zeroizing::new(config::Config::from_file(opt.config.as_path())?); + let mut metric_registry = Registry::default(); + let local_keypair = { let keypair = identity::Keypair::from_protobuf_encoding(&Zeroizing::new( base64::engine::general_purpose::STANDARD @@ -78,6 +80,7 @@ async fn main() -> Result<(), Box> { )? .with_quic() .with_dns()? + .with_bandwidth_logging(&mut metric_registry) .with_behaviour(|key| { behaviour::Behaviour::new(key.public(), opt.enable_kademlia, opt.enable_autonat) })? @@ -107,7 +110,6 @@ async fn main() -> Result<(), Box> { swarm.external_addresses().collect::>() ); - let mut metric_registry = Registry::default(); let metrics = Metrics::new(&mut metric_registry); let build_info = Info::new(vec![("version".to_string(), env!("CARGO_PKG_VERSION"))]); metric_registry.register( From c3e785d1ad66cdd7736efbf58297e6b373fa0ece Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 27 Oct 2023 16:07:44 +0200 Subject: [PATCH 04/27] Rename bandwidth metric --- misc/metrics/src/bandwidth.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 58f0fc81663..264bbfb8aa9 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -192,10 +192,7 @@ where }; Poll::Ready(Ok(( peer_id, - StreamMuxerBox::new(Muxer::new( - stream_muxer, - this.sinks.take().expect("todo"), - )), + StreamMuxerBox::new(Muxer::new(stream_muxer, this.sinks.take().expect("todo"))), ))) } } @@ -354,8 +351,13 @@ pub struct SinksCollector(Arc>>>); impl prometheus_client::collector::Collector for SinksCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let mut family_encoder = - encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?; + let mut family_encoder = encoder.encode_descriptor( + "libp2p_swarm_bandwidth", + "Bandwidth usage by direction and transport protocols", + None, + MetricType::Counter, + )?; + for (protocols, sink) in self.0.read().expect("todo").iter() { let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; let metric_encoder = family_encoder.encode_family(&labels)?; From 76cbf19fef901e8fafbf94aba4038617371558b5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 28 Oct 2023 19:17:34 +0200 Subject: [PATCH 05/27] Use Counter instead of Collector --- libp2p/src/bandwidth.rs | 63 +++++++--------- misc/metrics/src/bandwidth.rs | 136 ++++++++++++++++++++-------------- 2 files changed, 106 insertions(+), 93 deletions(-) diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index 880072118b4..dc696ce07e2 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -25,18 +25,13 @@ use futures::{ prelude::*, ready, }; -use prometheus_client::{ - encoding::{DescriptorEncoder, EncodeMetric}, - metrics::{counter::ConstCounter, MetricType}, -}; use std::{ - collections::HashMap, convert::TryFrom as _, io, pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, - Arc, RwLock, + Arc, }, task::{Context, Poll}, }; @@ -106,12 +101,37 @@ where } /// Allows obtaining the average bandwidth of the streams. -#[derive(Default, Debug)] pub struct BandwidthSinks { inbound: AtomicU64, outbound: AtomicU64, } +impl BandwidthSinks { + /// Returns a new [`BandwidthSinks`]. + pub(crate) fn new() -> Arc { + Arc::new(Self { + inbound: AtomicU64::new(0), + outbound: AtomicU64::new(0), + }) + } + + /// Returns the total number of bytes that have been downloaded on all the streams. + /// + /// > **Note**: This method is by design subject to race conditions. The returned value should + /// > only ever be used for statistics purposes. + pub fn total_inbound(&self) -> u64 { + self.inbound.load(Ordering::Relaxed) + } + + /// Returns the total number of bytes that have been uploaded on all the streams. + /// + /// > **Note**: This method is by design subject to race conditions. The returned value should + /// > only ever be used for statistics purposes. + pub fn total_outbound(&self) -> u64 { + self.outbound.load(Ordering::Relaxed) + } +} + /// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. #[pin_project::pin_project] pub(crate) struct InstrumentedStream { @@ -189,32 +209,3 @@ impl AsyncWrite for InstrumentedStream { this.inner.poll_close(cx) } } - -// TODO: Ideally this should go somewhere else. I.e. good to not depend on prometheus-client in libp2p. -pub fn register_bandwidth_sinks( - registry: &mut prometheus_client::registry::Registry, - sinks: Arc>>>, -) { - registry.register_collector(Box::new(SinksCollector(sinks))); -} - -#[derive(Debug)] -struct SinksCollector(Arc>>>); - -impl prometheus_client::collector::Collector for SinksCollector { - fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let mut family_encoder = - encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?; - for (protocols, sink) in self.0.read().expect("todo").iter() { - let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; - let metric_encoder = family_encoder.encode_family(&labels)?; - ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?; - - let labels = [("protocols", protocols.as_str()), ("direction", "outbound")]; - let metric_encoder = family_encoder.encode_family(&labels)?; - ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?; - } - - Ok(()) - } -} diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 264bbfb8aa9..9fbea470911 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -30,11 +30,15 @@ use futures::{ ready, }; use libp2p_identity::PeerId; -use prometheus_client::registry::Registry; use prometheus_client::{ encoding::{DescriptorEncoder, EncodeMetric}, metrics::{counter::ConstCounter, MetricType}, }; +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{counter::Counter, family::Family}, + registry::{Registry, Unit}, +}; use std::{ collections::HashMap, convert::TryFrom as _, @@ -54,20 +58,37 @@ use std::{ pub struct Transport { #[pin] transport: T, - sinks: Arc>>>, + metrics: Family, } impl Transport { pub fn new(transport: T, registry: &mut Registry) -> Self { - let sinks: Arc>>> = - Arc::new(RwLock::new(HashMap::new())); + let metrics = Family::::default(); - registry.register_collector(Box::new(SinksCollector(sinks.clone()))); + registry.register_with_unit( + // TODO: Ideally no prefix would be needed. + "libp2p_swarm_bandwidth", + "Bandwidth usage by direction and transport protocols", + Unit::Bytes, + metrics.clone(), + ); - Transport { transport, sinks } + Transport { transport, metrics } } } +#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] +struct Labels { + protocols: String, + direction: Direction, +} + +#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelValue, Debug)] +enum Direction { + Inbound, + Outbound, +} + impl libp2p_core::Transport for Transport where // TODO: Consider depending on StreamMuxer only. @@ -91,17 +112,12 @@ where } fn dial(&mut self, addr: Multiaddr) -> Result> { - let sinks = self - .sinks - .write() - .expect("todo") - .entry(as_string(&addr)) - .or_default() - .clone(); - let future = self.transport.dial(addr.clone())?; Ok(MapFuture { - inner: future, - sinks: Some(sinks), + metrics: Some(ConnectionMetrics::from_family_and_protocols( + &self.metrics, + as_string(&addr), + )), + inner: self.transport.dial(addr.clone())?, }) } @@ -109,17 +125,12 @@ where &mut self, addr: Multiaddr, ) -> Result> { - let sinks = self - .sinks - .write() - .expect("todo") - .entry(as_string(&addr)) - .or_default() - .clone(); - let future = self.transport.dial_as_listener(addr.clone())?; Ok(MapFuture { - inner: future, - sinks: Some(sinks), + metrics: Some(ConnectionMetrics::from_family_and_protocols( + &self.metrics, + as_string(&addr), + )), + inner: self.transport.dial_as_listener(addr.clone())?, }) } @@ -139,19 +150,14 @@ where local_addr, send_back_addr, }) => { - // TODO: Abstract into method? - let sinks = this - .sinks - .write() - .expect("todo") - .entry(as_string(&send_back_addr)) - .or_default() - .clone(); Poll::Ready(TransportEvent::Incoming { listener_id, upgrade: MapFuture { + metrics: Some(ConnectionMetrics::from_family_and_protocols( + this.metrics, + as_string(&send_back_addr), + )), inner: upgrade, - sinks: Some(sinks), }, local_addr, send_back_addr, @@ -166,6 +172,31 @@ where } } +#[derive(Clone, Debug)] +struct ConnectionMetrics { + outbound: Counter, + inbound: Counter, +} + +impl ConnectionMetrics { + fn from_family_and_protocols(family: &Family, protocols: String) -> Self { + ConnectionMetrics { + outbound: family + .get_or_create(&Labels { + protocols: protocols.clone(), + direction: Direction::Outbound, + }) + .clone(), + inbound: family + .get_or_create(&Labels { + protocols: protocols, + direction: Direction::Inbound, + }) + .clone(), + } + } +} + /// Custom `Future` to avoid boxing. /// /// Applies a function to the inner future's result. @@ -174,7 +205,7 @@ where pub struct MapFuture { #[pin] inner: T, - sinks: Option>, + metrics: Option, } impl Future for MapFuture @@ -192,7 +223,7 @@ where }; Poll::Ready(Ok(( peer_id, - StreamMuxerBox::new(Muxer::new(stream_muxer, this.sinks.take().expect("todo"))), + StreamMuxerBox::new(Muxer::new(stream_muxer, this.metrics.take().expect("todo"))), ))) } } @@ -204,13 +235,13 @@ where pub struct Muxer { #[pin] inner: SMInner, - sinks: Arc, + metrics: ConnectionMetrics, } impl Muxer { /// Creates a new [`BandwidthLogging`] around the stream muxer. - pub fn new(inner: SMInner, sinks: Arc) -> Self { - Self { inner, sinks } + fn new(inner: SMInner, metrics: ConnectionMetrics) -> Self { + Self { inner, metrics } } } @@ -237,7 +268,7 @@ where let inner = ready!(this.inner.poll_inbound(cx)?); let logged = InstrumentedStream { inner, - sinks: this.sinks.clone(), + metrics: this.metrics.clone(), }; Poll::Ready(Ok(logged)) } @@ -250,7 +281,7 @@ where let inner = ready!(this.inner.poll_outbound(cx)?); let logged = InstrumentedStream { inner, - sinks: this.sinks.clone(), + metrics: this.metrics.clone(), }; Poll::Ready(Ok(logged)) } @@ -273,7 +304,7 @@ pub struct BandwidthSinks { pub struct InstrumentedStream { #[pin] inner: SMInner, - sinks: Arc, + metrics: ConnectionMetrics, } impl AsyncRead for InstrumentedStream { @@ -284,9 +315,8 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read(cx, buf))?; - this.sinks.inbound.fetch_add( + this.metrics.inbound.inc_by( u64::try_from(num_bytes).unwrap_or(u64::max_value()), - Ordering::Relaxed, ); Poll::Ready(Ok(num_bytes)) } @@ -298,9 +328,8 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; - this.sinks.inbound.fetch_add( + this.metrics.inbound.inc_by( u64::try_from(num_bytes).unwrap_or(u64::max_value()), - Ordering::Relaxed, ); Poll::Ready(Ok(num_bytes)) } @@ -314,9 +343,8 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write(cx, buf))?; - this.sinks.outbound.fetch_add( + this.metrics.outbound.inc_by( u64::try_from(num_bytes).unwrap_or(u64::max_value()), - Ordering::Relaxed, ); Poll::Ready(Ok(num_bytes)) } @@ -328,9 +356,8 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; - this.sinks.outbound.fetch_add( + this.metrics.outbound.inc_by( u64::try_from(num_bytes).unwrap_or(u64::max_value()), - Ordering::Relaxed, ); Poll::Ready(Ok(num_bytes)) } @@ -351,12 +378,7 @@ pub struct SinksCollector(Arc>>>); impl prometheus_client::collector::Collector for SinksCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let mut family_encoder = encoder.encode_descriptor( - "libp2p_swarm_bandwidth", - "Bandwidth usage by direction and transport protocols", - None, - MetricType::Counter, - )?; + let mut family_encoder = encoder.encode_descriptor("", "", None, MetricType::Counter)?; for (protocols, sink) in self.0.read().expect("todo").iter() { let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; From f378cc78b1e0c143c7d745970f5612b2ae72a550 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 28 Oct 2023 19:48:52 +0200 Subject: [PATCH 06/27] Introduce with_bandwidth_metrics builder step --- examples/metrics/src/main.rs | 8 +-- libp2p/src/builder.rs | 15 ++-- libp2p/src/builder/phase.rs | 2 + libp2p/src/builder/phase/bandwidth_logging.rs | 62 ++++++++++------- libp2p/src/builder/phase/bandwidth_metrics.rs | 68 +++++++++++++++++++ libp2p/src/builder/phase/other_transport.rs | 34 +++++++--- libp2p/src/builder/phase/quic.rs | 37 +++++++--- libp2p/src/builder/phase/websocket.rs | 25 +++---- libp2p/src/transport_ext.rs | 38 ++--------- misc/server/src/main.rs | 2 +- 10 files changed, 192 insertions(+), 99 deletions(-) create mode 100644 libp2p/src/builder/phase/bandwidth_metrics.rs diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index d4c6b442426..adba275d9b6 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -37,15 +37,16 @@ mod http_service; fn main() -> Result<(), Box> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - let (builder, bandwidth_logging) = libp2p::SwarmBuilder::with_new_identity() + let mut metric_registry = Registry::default(); + + let mut swarm = libp2p::SwarmBuilder::with_new_identity() .with_async_std() .with_tcp( tcp::Config::default(), noise::Config::new, yamux::Config::default, )? - .with_bandwidth_logging(); - let mut swarm = builder + .with_bandwidth_metrics(&mut metrics_registry) .with_behaviour(|key| Behaviour::new(key.public()))? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))) .build(); @@ -58,7 +59,6 @@ fn main() -> Result<(), Box> { info!("Dialed {}", addr) } - let mut metric_registry = Registry::default(); let metrics = Metrics::new(&mut metric_registry); libp2p::bandwidth::register_bandwidth_sinks(&mut metric_registry, bandwidth_logging); thread::spawn(move || block_on(http_service::metrics_server(metric_registry))); diff --git a/libp2p/src/builder.rs b/libp2p/src/builder.rs index eb7695604f8..45641724f74 100644 --- a/libp2p/src/builder.rs +++ b/libp2p/src/builder.rs @@ -293,6 +293,7 @@ mod tests { feature = "dns", feature = "relay", feature = "websocket", + feature = "metrics", ))] async fn all() { #[derive(NetworkBehaviour)] @@ -317,7 +318,7 @@ mod tests { .unwrap() .with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default) .unwrap() - .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) + .with_bandwidth_metrics(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_key, relay| MyBehaviour { relay }) .unwrap() .build(); @@ -325,7 +326,7 @@ mod tests { #[test] #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "yamux"))] - fn tcp_bandwidth_logging() -> Result<(), Box> { + fn tcp_bandwidth_metrics() -> Result<(), Box> { let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( @@ -333,7 +334,7 @@ mod tests { libp2p_tls::Config::new, libp2p_yamux::Config::default, )? - .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) + .with_bandwidth_metrics(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); @@ -343,11 +344,11 @@ mod tests { #[test] #[cfg(all(feature = "tokio", feature = "quic"))] - fn quic_bandwidth_logging() -> Result<(), Box> { + fn quic_bandwidth_metrics() -> Result<(), Box> { let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_quic() - .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) + .with_bandwidth_metrics(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); @@ -357,11 +358,11 @@ mod tests { #[test] #[cfg(feature = "tokio")] - fn other_transport_bandwidth_logging() -> Result<(), Box> { + fn other_transport_bandwidth_metrics() -> Result<(), Box> { let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_other_transport(|_| DummyTransport::<(PeerId, StreamMuxerBox)>::new())? - .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) + .with_bandwidth_metrics(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); diff --git a/libp2p/src/builder/phase.rs b/libp2p/src/builder/phase.rs index 4871adf65ca..f3ad8261ba9 100644 --- a/libp2p/src/builder/phase.rs +++ b/libp2p/src/builder/phase.rs @@ -1,6 +1,7 @@ #![allow(unused_imports)] mod bandwidth_logging; +mod bandwidth_metrics; mod behaviour; mod build; mod dns; @@ -14,6 +15,7 @@ mod tcp; mod websocket; use bandwidth_logging::*; +use bandwidth_metrics::*; use behaviour::*; use build::*; use dns::*; diff --git a/libp2p/src/builder/phase/bandwidth_logging.rs b/libp2p/src/builder/phase/bandwidth_logging.rs index be848744cb0..e3c41e15612 100644 --- a/libp2p/src/builder/phase/bandwidth_logging.rs +++ b/libp2p/src/builder/phase/bandwidth_logging.rs @@ -1,43 +1,42 @@ -use multiaddr::Multiaddr; - use super::*; -use crate::metrics::bandwidth::{BandwidthSinks, Muxer}; +use crate::bandwidth::BandwidthSinks; use crate::transport_ext::TransportExt; use crate::SwarmBuilder; -use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; pub struct BandwidthLoggingPhase { pub(crate) relay_behaviour: R, pub(crate) transport: T, } -#[cfg(feature = "metrics")] impl SwarmBuilder> { + #[deprecated(note = "Use `with_bandwidth_metrics` instead.")] pub fn with_bandwidth_logging( self, - registry: &mut libp2p_metrics::Registry, - ) -> SwarmBuilder> { - SwarmBuilder { - phase: BehaviourPhase { - relay_behaviour: self.phase.relay_behaviour, - transport: crate::metrics::bandwidth::Transport::new(self.phase.transport, registry), + ) -> ( + SwarmBuilder>, + Arc, + ) { + let (transport, sinks) = self.phase.transport.with_bandwidth_logging(); + ( + SwarmBuilder { + phase: BandwidthMetricsPhase { + relay_behaviour: self.phase.relay_behaviour, + transport, + }, + keypair: self.keypair, + phantom: PhantomData, }, - keypair: self.keypair, - phantom: PhantomData, - } + sinks, + ) } -} -impl - SwarmBuilder> -{ - pub fn without_bandwidth_logging(self) -> SwarmBuilder> { + pub fn without_bandwidth_logging(self) -> SwarmBuilder> { SwarmBuilder { - phase: BehaviourPhase { + phase: BandwidthMetricsPhase { relay_behaviour: self.phase.relay_behaviour, transport: self.phase.transport, }, @@ -48,6 +47,18 @@ impl } // Shortcuts +#[cfg(feature = "metrics")] +impl + SwarmBuilder> +{ + pub fn with_bandwidth_metrics( + self, + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder> { + self.without_bandwidth_logging() + .with_bandwidth_metrics(registry) + } +} #[cfg(feature = "relay")] impl SwarmBuilder> @@ -56,10 +67,11 @@ impl self, constructor: impl FnOnce(&libp2p_identity::Keypair, libp2p_relay::client::Behaviour) -> R, ) -> Result>, R::Error> { - self.without_bandwidth_logging().with_behaviour(constructor) + self.without_bandwidth_logging() + .without_bandwidth_metrics() + .with_behaviour(constructor) } } - impl SwarmBuilder> { @@ -67,6 +79,8 @@ impl self, constructor: impl FnOnce(&libp2p_identity::Keypair) -> R, ) -> Result>, R::Error> { - self.without_bandwidth_logging().with_behaviour(constructor) + self.without_bandwidth_logging() + .without_bandwidth_metrics() + .with_behaviour(constructor) } } diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs new file mode 100644 index 00000000000..73d061dc781 --- /dev/null +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -0,0 +1,68 @@ +use super::*; +use crate::bandwidth::BandwidthSinks; +use crate::transport_ext::TransportExt; +use crate::SwarmBuilder; +use std::marker::PhantomData; +use std::sync::Arc; + +pub struct BandwidthMetricsPhase { + pub(crate) relay_behaviour: R, + pub(crate) transport: T, +} + +#[cfg(feature = "metrics")] +impl + SwarmBuilder> +{ + pub fn with_bandwidth_metrics( + self, + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder> { + SwarmBuilder { + phase: BehaviourPhase { + relay_behaviour: self.phase.relay_behaviour, + transport: libp2p_metrics::bandwidth::Transport::new( + self.phase.transport, + registry, + ), + }, + keypair: self.keypair, + phantom: PhantomData, + } + } + + pub fn without_bandwidth_metrics(self) -> SwarmBuilder> { + SwarmBuilder { + phase: BehaviourPhase { + relay_behaviour: self.phase.relay_behaviour, + transport: self.phase.transport, + }, + keypair: self.keypair, + phantom: PhantomData, + } + } +} + +// Shortcuts +#[cfg(feature = "relay")] +impl + SwarmBuilder> +{ + pub fn with_behaviour>( + self, + constructor: impl FnOnce(&libp2p_identity::Keypair, libp2p_relay::client::Behaviour) -> R, + ) -> Result>, R::Error> { + self.without_bandwidth_metrics().with_behaviour(constructor) + } +} + +impl + SwarmBuilder> +{ + pub fn with_behaviour>( + self, + constructor: impl FnOnce(&libp2p_identity::Keypair) -> R, + ) -> Result>, R::Error> { + self.without_bandwidth_metrics().with_behaviour(constructor) + } +} diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index 7891e7119b5..943cc75a100 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -1,7 +1,6 @@ -use std::collections::HashMap; use std::convert::Infallible; use std::marker::PhantomData; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; use libp2p_core::Transport; @@ -144,22 +143,41 @@ impl .with_relay_client(security_upgrade, multiplexer_upgrade) } } -#[cfg(feature = "metrics")] impl SwarmBuilder> { + #[deprecated(note = "Use `with_bandwidth_metrics` instead.")] pub fn with_bandwidth_logging( self, + ) -> ( + SwarmBuilder< + Provider, + BandwidthMetricsPhase, + >, + Arc, + ) { + #[allow(deprecated)] + self.without_any_other_transports() + .without_dns() + .without_websocket() + .without_relay() + .with_bandwidth_logging() + } +} +#[cfg(feature = "metrics")] +impl + SwarmBuilder> +{ + pub fn with_bandwidth_metrics( + self, registry: &mut libp2p_metrics::Registry, - ) -> SwarmBuilder< - Provider, - BehaviourPhase, - > { + ) -> SwarmBuilder> { self.without_any_other_transports() .without_dns() .without_websocket() .without_relay() - .with_bandwidth_logging(registry) + .without_bandwidth_logging() + .with_bandwidth_metrics(registry) } } impl diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index f8db1ee84c3..e4edd0a2f4f 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -1,5 +1,4 @@ use super::*; -use crate::bandwidth::BandwidthSinks; use crate::SwarmBuilder; #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))] use libp2p_core::muxing::StreamMuxer; @@ -9,9 +8,7 @@ use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; all(not(target_arch = "wasm32"), feature = "websocket") ))] use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo}; -use std::collections::HashMap; -use std::marker::PhantomData; -use std::sync::{Arc, RwLock}; +use std::{marker::PhantomData, sync::Arc}; pub struct QuicPhase { pub(crate) transport: T, @@ -249,20 +246,40 @@ impl_quic_phase_with_websocket!( super::provider::Tokio, rw_stream_sink::RwStreamSink> ); -#[cfg(feature = "metrics")] impl SwarmBuilder> { + #[deprecated(note = "Use `with_bandwidth_metrics` instead.")] pub fn with_bandwidth_logging( self, + ) -> ( + SwarmBuilder< + Provider, + BandwidthMetricsPhase, + >, + Arc, + ) { + #[allow(deprecated)] + self.without_quic() + .without_any_other_transports() + .without_dns() + .without_websocket() + .without_relay() + .with_bandwidth_logging() + } +} +#[cfg(feature = "metrics")] +impl + SwarmBuilder> +{ + pub fn with_bandwidth_metrics( + self, registry: &mut libp2p_metrics::Registry, - ) -> SwarmBuilder< - Provider, - BehaviourPhase, - > { + ) -> SwarmBuilder> { self.without_quic() .without_any_other_transports() .without_dns() .without_websocket() .without_relay() - .with_bandwidth_logging(registry) + .without_bandwidth_logging() + .with_bandwidth_metrics(registry) } } diff --git a/libp2p/src/builder/phase/websocket.rs b/libp2p/src/builder/phase/websocket.rs index 3abb8060a05..68a85bb77b7 100644 --- a/libp2p/src/builder/phase/websocket.rs +++ b/libp2p/src/builder/phase/websocket.rs @@ -186,29 +186,30 @@ impl SwarmBuilder SwarmBuilder> { - pub fn with_behaviour>( + pub fn with_bandwidth_metrics( self, - constructor: impl FnOnce(&libp2p_identity::Keypair) -> R, - ) -> Result>, R::Error> { + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_websocket() .without_relay() .without_bandwidth_logging() - .with_behaviour(constructor) + .with_bandwidth_metrics(registry) } } -#[cfg(feature = "metrics")] impl SwarmBuilder> { - pub fn with_bandwidth_logging( + pub fn with_behaviour>( self, - registry: &mut libp2p_metrics::Registry, - ) -> SwarmBuilder< - Provider, - BehaviourPhase, - > { + constructor: impl FnOnce(&libp2p_identity::Keypair) -> R, + ) -> Result>, R::Error> { self.without_websocket() .without_relay() - .with_bandwidth_logging(registry) + .without_bandwidth_logging() + .with_behaviour(constructor) } } diff --git a/libp2p/src/transport_ext.rs b/libp2p/src/transport_ext.rs index 435b20c5f0c..8f7c16574f6 100644 --- a/libp2p/src/transport_ext.rs +++ b/libp2p/src/transport_ext.rs @@ -29,11 +29,7 @@ use crate::{ Transport, }; use libp2p_identity::PeerId; -use multiaddr::Multiaddr; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::sync::Arc; /// Trait automatically implemented on all objects that implement `Transport`. Provides some /// additional utilities. @@ -70,12 +66,7 @@ pub trait TransportExt: Transport { /// /// let (transport, sinks) = transport.with_bandwidth_logging(); /// ``` - fn with_bandwidth_logging( - self, - ) -> ( - Boxed<(PeerId, StreamMuxerBox)>, - Arc>>>, - ) + fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) where Self: Sized + Send + Unpin + 'static, Self::Dial: Send + 'static, @@ -86,32 +77,13 @@ pub trait TransportExt: Transport { S::Substream: Send + 'static, S::Error: Send + Sync + 'static, { - let sinks: Arc>>> = Arc::new(RwLock::new(HashMap::new())); + let sinks = BandwidthSinks::new(); let sinks_copy = sinks.clone(); - let transport = Transport::map(self, move |output, connected_point| { - fn as_string(ma: &Multiaddr) -> String { - let len = ma - .protocol_stack() - .fold(0, |acc, proto| acc + proto.len() + 1); - let mut protocols = String::with_capacity(len); - for proto_tag in ma.protocol_stack() { - protocols.push('/'); - protocols.push_str(proto_tag); - } - protocols - } - - let sink = sinks_copy - .write() - .expect("todo") - .entry(as_string(connected_point.get_remote_address())) - .or_default() - .clone(); - + let transport = Transport::map(self, |output, _| { let (peer_id, stream_muxer_box) = output.into(); ( peer_id, - StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sink)), + StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)), ) }) .boxed(); diff --git a/misc/server/src/main.rs b/misc/server/src/main.rs index 32f75cd48af..2ee16f6e10e 100644 --- a/misc/server/src/main.rs +++ b/misc/server/src/main.rs @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box> { )? .with_quic() .with_dns()? - .with_bandwidth_logging(&mut metric_registry) + .with_bandwidth_metrics(&mut metric_registry) .with_behaviour(|key| { behaviour::Behaviour::new(key.public(), opt.enable_kademlia, opt.enable_autonat) })? From 8cc2b1b322dc6349b55ffc67e31f9c853766c861 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 28 Oct 2023 19:53:14 +0200 Subject: [PATCH 07/27] Remove collector --- Cargo.lock | 1 - libp2p/Cargo.toml | 1 - misc/metrics/src/bandwidth.rs | 88 ++++++++++------------------------- 3 files changed, 25 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f8d942d372..4fc05e3fbf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2409,7 +2409,6 @@ dependencies = [ "libp2p-yamux", "multiaddr", "pin-project", - "prometheus-client", "rw-stream-sink", "thiserror", "tokio", diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 9511686d81a..01c4fea81cc 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -100,7 +100,6 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` featu instant = "0.1.12" # Explicit dependency to be used in `wasm-bindgen` feature # TODO feature flag? rw-stream-sink = { workspace = true } -prometheus-client = { workspace = true } libp2p-allow-block-list = { workspace = true } libp2p-autonat = { workspace = true, optional = true } diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 9fbea470911..1915b589798 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -30,24 +30,15 @@ use futures::{ ready, }; use libp2p_identity::PeerId; -use prometheus_client::{ - encoding::{DescriptorEncoder, EncodeMetric}, - metrics::{counter::ConstCounter, MetricType}, -}; use prometheus_client::{ encoding::{EncodeLabelSet, EncodeLabelValue}, metrics::{counter::Counter, family::Family}, registry::{Registry, Unit}, }; use std::{ - collections::HashMap, convert::TryFrom as _, io, pin::Pin, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, RwLock, - }, task::{Context, Poll}, }; @@ -149,20 +140,18 @@ where upgrade, local_addr, send_back_addr, - }) => { - Poll::Ready(TransportEvent::Incoming { - listener_id, - upgrade: MapFuture { - metrics: Some(ConnectionMetrics::from_family_and_protocols( - this.metrics, - as_string(&send_back_addr), - )), - inner: upgrade, - }, - local_addr, - send_back_addr, - }) - } + }) => Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade: MapFuture { + metrics: Some(ConnectionMetrics::from_family_and_protocols( + this.metrics, + as_string(&send_back_addr), + )), + inner: upgrade, + }, + local_addr, + send_back_addr, + }), Poll::Ready(other) => { let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched")); Poll::Ready(mapped) @@ -292,13 +281,6 @@ where } } -/// Allows obtaining the average bandwidth of the streams. -#[derive(Default, Debug)] -pub struct BandwidthSinks { - inbound: AtomicU64, - outbound: AtomicU64, -} - /// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. #[pin_project::pin_project] pub struct InstrumentedStream { @@ -315,9 +297,9 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read(cx, buf))?; - this.metrics.inbound.inc_by( - u64::try_from(num_bytes).unwrap_or(u64::max_value()), - ); + this.metrics + .inbound + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::max_value())); Poll::Ready(Ok(num_bytes)) } @@ -328,9 +310,9 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; - this.metrics.inbound.inc_by( - u64::try_from(num_bytes).unwrap_or(u64::max_value()), - ); + this.metrics + .inbound + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::max_value())); Poll::Ready(Ok(num_bytes)) } } @@ -343,9 +325,9 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write(cx, buf))?; - this.metrics.outbound.inc_by( - u64::try_from(num_bytes).unwrap_or(u64::max_value()), - ); + this.metrics + .outbound + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::max_value())); Poll::Ready(Ok(num_bytes)) } @@ -356,9 +338,9 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; - this.metrics.outbound.inc_by( - u64::try_from(num_bytes).unwrap_or(u64::max_value()), - ); + this.metrics + .outbound + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::max_value())); Poll::Ready(Ok(num_bytes)) } @@ -373,27 +355,7 @@ impl AsyncWrite for InstrumentedStream { } } -#[derive(Debug)] -pub struct SinksCollector(Arc>>>); - -impl prometheus_client::collector::Collector for SinksCollector { - fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let mut family_encoder = encoder.encode_descriptor("", "", None, MetricType::Counter)?; - - for (protocols, sink) in self.0.read().expect("todo").iter() { - let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; - let metric_encoder = family_encoder.encode_family(&labels)?; - ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?; - - let labels = [("protocols", protocols.as_str()), ("direction", "outbound")]; - let metric_encoder = family_encoder.encode_family(&labels)?; - ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?; - } - - Ok(()) - } -} - +// TODO: rename fn as_string(ma: &Multiaddr) -> String { let len = ma .protocol_stack() From 67fb7cd13942f61922886ba59eb5ddffb20a8df0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 30 Oct 2023 15:17:20 +0100 Subject: [PATCH 08/27] Remove license header --- misc/metrics/src/bandwidth.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 1915b589798..a881e9bd454 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -1,23 +1,3 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - use libp2p_core::{ muxing::{StreamMuxer, StreamMuxerBox, StreamMuxerEvent}, transport::{ListenerId, TransportError, TransportEvent}, From fd7c5d960e0e20b74ca6d953f18209d0bf554c14 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 30 Oct 2023 15:18:31 +0100 Subject: [PATCH 09/27] Fix metrics example --- examples/metrics/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index adba275d9b6..0c056f445a3 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { noise::Config::new, yamux::Config::default, )? - .with_bandwidth_metrics(&mut metrics_registry) + .with_bandwidth_metrics(&mut metric_registry) .with_behaviour(|key| Behaviour::new(key.public()))? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))) .build(); @@ -60,7 +60,6 @@ fn main() -> Result<(), Box> { } let metrics = Metrics::new(&mut metric_registry); - libp2p::bandwidth::register_bandwidth_sinks(&mut metric_registry, bandwidth_logging); thread::spawn(move || block_on(http_service::metrics_server(metric_registry))); block_on(async { From f9fd560c3edaa40abd925b726ecf45288e701c48 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 30 Oct 2023 15:22:41 +0100 Subject: [PATCH 10/27] Depend on StreamMuxer and not StreamMuxerBox --- misc/metrics/src/bandwidth.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index a881e9bd454..4a894c0f0b5 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -22,8 +22,6 @@ use std::{ task::{Context, Poll}, }; -// TODO: rename to Transport -/// See `Transport::map`. #[derive(Debug, Clone)] #[pin_project::pin_project] pub struct Transport { @@ -60,10 +58,12 @@ enum Direction { Outbound, } -impl libp2p_core::Transport for Transport +impl libp2p_core::Transport for Transport where - // TODO: Consider depending on StreamMuxer only. - T: libp2p_core::Transport, + T: libp2p_core::Transport, + M: StreamMuxer + Send + 'static, + M::Substream: Send + 'static, + M::Error: Send + Sync + 'static, { type Output = (PeerId, StreamMuxerBox); type Error = T::Error; @@ -177,9 +177,12 @@ pub struct MapFuture { metrics: Option, } -impl Future for MapFuture +impl Future for MapFuture where - T: TryFuture, + T: TryFuture, + M: StreamMuxer + Send + 'static, + M::Substream: Send + 'static, + M::Error: Send + Sync + 'static, { type Output = Result<(PeerId, StreamMuxerBox), T::Error>; From fe46696661b58c5fac10b2bae709b9c8451817b8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 30 Oct 2023 15:25:20 +0100 Subject: [PATCH 11/27] Adjust doc comment --- misc/metrics/src/bandwidth.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 4a894c0f0b5..87284ec86a4 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -166,9 +166,7 @@ impl ConnectionMetrics { } } -/// Custom `Future` to avoid boxing. -/// -/// Applies a function to the inner future's result. +/// Map the resulting [`StreamMuxer`] of a connection upgrade with a [`Muxer`]. #[pin_project::pin_project] #[derive(Clone, Debug)] pub struct MapFuture { From 60c6e7ae8ef240678f273dd97066b91e1a0599fd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 30 Oct 2023 21:42:54 +0100 Subject: [PATCH 12/27] Use subregistry --- misc/metrics/src/bandwidth.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 87284ec86a4..8cc1fe35ee4 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -33,10 +33,8 @@ pub struct Transport { impl Transport { pub fn new(transport: T, registry: &mut Registry) -> Self { let metrics = Family::::default(); - - registry.register_with_unit( - // TODO: Ideally no prefix would be needed. - "libp2p_swarm_bandwidth", + registry.sub_registry_with_prefix("libp2p").register_with_unit( + "bandwidth", "Bandwidth usage by direction and transport protocols", Unit::Bytes, metrics.clone(), From cda371d9c0928cf063a13e01e12940f8da0fba04 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 31 Oct 2023 20:43:55 +0100 Subject: [PATCH 13/27] Fix dead-lock when cloning metrics --- misc/metrics/src/bandwidth.rs | 45 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 8cc1fe35ee4..15ab927cab3 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -33,12 +33,14 @@ pub struct Transport { impl Transport { pub fn new(transport: T, registry: &mut Registry) -> Self { let metrics = Family::::default(); - registry.sub_registry_with_prefix("libp2p").register_with_unit( - "bandwidth", - "Bandwidth usage by direction and transport protocols", - Unit::Bytes, - metrics.clone(), - ); + registry + .sub_registry_with_prefix("libp2p") + .register_with_unit( + "bandwidth", + "Bandwidth usage by direction and transport protocols", + Unit::Bytes, + metrics.clone(), + ); Transport { transport, metrics } } @@ -147,20 +149,23 @@ struct ConnectionMetrics { impl ConnectionMetrics { fn from_family_and_protocols(family: &Family, protocols: String) -> Self { - ConnectionMetrics { - outbound: family - .get_or_create(&Labels { - protocols: protocols.clone(), - direction: Direction::Outbound, - }) - .clone(), - inbound: family - .get_or_create(&Labels { - protocols: protocols, - direction: Direction::Inbound, - }) - .clone(), - } + // Additional scope to make sure to drop the lock guard from `get_or_create`. + let outbound = { + let m = family.get_or_create(&Labels { + protocols: protocols.clone(), + direction: Direction::Outbound, + }); + m.clone() + }; + // Additional scope to make sure to drop the lock guard from `get_or_create`. + let inbound = { + let m = family.get_or_create(&Labels { + protocols: protocols.clone(), + direction: Direction::Inbound, + }); + m.clone() + }; + ConnectionMetrics { outbound, inbound } } } From 68414d5646e030ddbd78a6f4752da0e1ee3b5342 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 15:21:33 +0100 Subject: [PATCH 14/27] Box closure wrapping muxer --- libp2p/src/builder/phase/bandwidth_metrics.rs | 3 +- misc/metrics/src/bandwidth.rs | 100 +++++++----------- 2 files changed, 38 insertions(+), 65 deletions(-) diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs index 73d061dc781..393170d8e71 100644 --- a/libp2p/src/builder/phase/bandwidth_metrics.rs +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -24,7 +24,8 @@ impl transport: libp2p_metrics::bandwidth::Transport::new( self.phase.transport, registry, - ), + ) + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))), }, keypair: self.keypair, phantom: PhantomData, diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 15ab927cab3..07650a09674 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -1,10 +1,11 @@ use libp2p_core::{ - muxing::{StreamMuxer, StreamMuxerBox, StreamMuxerEvent}, + muxing::{StreamMuxer, StreamMuxerEvent}, transport::{ListenerId, TransportError, TransportEvent}, Multiaddr, }; use futures::{ + future::{MapOk, TryFutureExt}, io::{IoSlice, IoSliceMut}, prelude::*, ready, @@ -65,10 +66,11 @@ where M::Substream: Send + 'static, M::Error: Send + Sync + 'static, { - type Output = (PeerId, StreamMuxerBox); + type Output = (PeerId, Muxer); type Error = T::Error; - type ListenerUpgrade = MapFuture; - type Dial = MapFuture; + type ListenerUpgrade = + MapOk (PeerId, Muxer) + Send>>; + type Dial = MapOk (PeerId, Muxer) + Send>>; fn listen_on( &mut self, @@ -83,26 +85,26 @@ where } fn dial(&mut self, addr: Multiaddr) -> Result> { - Ok(MapFuture { - metrics: Some(ConnectionMetrics::from_family_and_protocols( - &self.metrics, - as_string(&addr), - )), - inner: self.transport.dial(addr.clone())?, - }) + let metrics = ConnectionMetrics::from_family_and_addr(&self.metrics, &addr); + Ok(self + .transport + .dial(addr.clone())? + .map_ok(Box::new(|(peer_id, stream_muxer)| { + (peer_id, Muxer::new(stream_muxer, metrics)) + }))) } fn dial_as_listener( &mut self, addr: Multiaddr, ) -> Result> { - Ok(MapFuture { - metrics: Some(ConnectionMetrics::from_family_and_protocols( - &self.metrics, - as_string(&addr), - )), - inner: self.transport.dial_as_listener(addr.clone())?, - }) + let metrics = ConnectionMetrics::from_family_and_addr(&self.metrics, &addr); + Ok(self + .transport + .dial_as_listener(addr.clone())? + .map_ok(Box::new(|(peer_id, stream_muxer)| { + (peer_id, Muxer::new(stream_muxer, metrics)) + }))) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { @@ -120,18 +122,18 @@ where upgrade, local_addr, send_back_addr, - }) => Poll::Ready(TransportEvent::Incoming { - listener_id, - upgrade: MapFuture { - metrics: Some(ConnectionMetrics::from_family_and_protocols( - this.metrics, - as_string(&send_back_addr), - )), - inner: upgrade, - }, - local_addr, - send_back_addr, - }), + }) => { + let metrics = + ConnectionMetrics::from_family_and_addr(&this.metrics, &send_back_addr); + Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade: upgrade.map_ok(Box::new(|(peer_id, stream_muxer)| { + (peer_id, Muxer::new(stream_muxer, metrics)) + })), + local_addr, + send_back_addr, + }) + } Poll::Ready(other) => { let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched")); Poll::Ready(mapped) @@ -148,7 +150,9 @@ struct ConnectionMetrics { } impl ConnectionMetrics { - fn from_family_and_protocols(family: &Family, protocols: String) -> Self { + fn from_family_and_addr(family: &Family, protocols: &Multiaddr) -> Self { + let protocols = as_string(protocols); + // Additional scope to make sure to drop the lock guard from `get_or_create`. let outbound = { let m = family.get_or_create(&Labels { @@ -160,7 +164,7 @@ impl ConnectionMetrics { // Additional scope to make sure to drop the lock guard from `get_or_create`. let inbound = { let m = family.get_or_create(&Labels { - protocols: protocols.clone(), + protocols, direction: Direction::Inbound, }); m.clone() @@ -169,38 +173,6 @@ impl ConnectionMetrics { } } -/// Map the resulting [`StreamMuxer`] of a connection upgrade with a [`Muxer`]. -#[pin_project::pin_project] -#[derive(Clone, Debug)] -pub struct MapFuture { - #[pin] - inner: T, - metrics: Option, -} - -impl Future for MapFuture -where - T: TryFuture, - M: StreamMuxer + Send + 'static, - M::Substream: Send + 'static, - M::Error: Send + Sync + 'static, -{ - type Output = Result<(PeerId, StreamMuxerBox), T::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let (peer_id, stream_muxer) = match TryFuture::try_poll(this.inner, cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(v)) => v, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - }; - Poll::Ready(Ok(( - peer_id, - StreamMuxerBox::new(Muxer::new(stream_muxer, this.metrics.take().expect("todo"))), - ))) - } -} - /// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened /// streams. #[derive(Clone)] From 689a94700cccd97ea95b7ce98203635a812f2dc5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:00:57 +0100 Subject: [PATCH 15/27] Fix wasm compilation --- libp2p/src/builder/phase/bandwidth_metrics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs index 393170d8e71..15e9d7334fb 100644 --- a/libp2p/src/builder/phase/bandwidth_metrics.rs +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -31,7 +31,9 @@ impl phantom: PhantomData, } } +} +impl SwarmBuilder> { pub fn without_bandwidth_metrics(self) -> SwarmBuilder> { SwarmBuilder { phase: BehaviourPhase { From aa0fa951e2129b9cf6157ecf5b5fa1634a39d240 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:01:08 +0100 Subject: [PATCH 16/27] Use protocol stack --- misc/metrics/src/bandwidth.rs | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index 07650a09674..a5b78235b80 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -1,15 +1,15 @@ -use libp2p_core::{ - muxing::{StreamMuxer, StreamMuxerEvent}, - transport::{ListenerId, TransportError, TransportEvent}, - Multiaddr, -}; - +use crate::protocol_stack; use futures::{ future::{MapOk, TryFutureExt}, io::{IoSlice, IoSliceMut}, prelude::*, ready, }; +use libp2p_core::{ + muxing::{StreamMuxer, StreamMuxerEvent}, + transport::{ListenerId, TransportError, TransportEvent}, + Multiaddr, +}; use libp2p_identity::PeerId; use prometheus_client::{ encoding::{EncodeLabelSet, EncodeLabelValue}, @@ -151,7 +151,7 @@ struct ConnectionMetrics { impl ConnectionMetrics { fn from_family_and_addr(family: &Family, protocols: &Multiaddr) -> Self { - let protocols = as_string(protocols); + let protocols = protocol_stack::as_string(protocols); // Additional scope to make sure to drop the lock guard from `get_or_create`. let outbound = { @@ -310,16 +310,3 @@ impl AsyncWrite for InstrumentedStream { this.inner.poll_close(cx) } } - -// TODO: rename -fn as_string(ma: &Multiaddr) -> String { - let len = ma - .protocol_stack() - .fold(0, |acc, proto| acc + proto.len() + 1); - let mut protocols = String::with_capacity(len); - for proto_tag in ma.protocol_stack() { - protocols.push('/'); - protocols.push_str(proto_tag); - } - protocols -} From 77a427b5134aca11e6025fa23acb0a4817b99791 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:03:22 +0100 Subject: [PATCH 17/27] Minor changes --- libp2p/src/builder/phase/other_transport.rs | 5 ++++- libp2p/src/builder/phase/quic.rs | 13 +++++++------ misc/metrics/src/bandwidth.rs | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index 943cc75a100..b786735ec50 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -171,7 +171,10 @@ impl pub fn with_bandwidth_metrics( self, registry: &mut libp2p_metrics::Registry, - ) -> SwarmBuilder> { + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_any_other_transports() .without_dns() .without_websocket() diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index e4edd0a2f4f..b43b041f358 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -252,9 +252,9 @@ impl SwarmBuilder ( SwarmBuilder< - Provider, + Provider, BandwidthMetricsPhase, - >, + >, Arc, ) { #[allow(deprecated)] @@ -267,13 +267,14 @@ impl SwarmBuilder - SwarmBuilder> -{ +impl SwarmBuilder> { pub fn with_bandwidth_metrics( self, registry: &mut libp2p_metrics::Registry, - ) -> SwarmBuilder> { + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_quic() .without_any_other_transports() .without_dns() diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs index a5b78235b80..2792e00612c 100644 --- a/misc/metrics/src/bandwidth.rs +++ b/misc/metrics/src/bandwidth.rs @@ -124,7 +124,7 @@ where send_back_addr, }) => { let metrics = - ConnectionMetrics::from_family_and_addr(&this.metrics, &send_back_addr); + ConnectionMetrics::from_family_and_addr(this.metrics, &send_back_addr); Poll::Ready(TransportEvent::Incoming { listener_id, upgrade: upgrade.map_ok(Box::new(|(peer_id, stream_muxer)| { @@ -184,7 +184,7 @@ pub struct Muxer { } impl Muxer { - /// Creates a new [`BandwidthLogging`] around the stream muxer. + /// Creates a new [`Muxer`] wrapping around the provided stream muxer. fn new(inner: SMInner, metrics: ConnectionMetrics) -> Self { Self { inner, metrics } } From 314bf0f7a8b819d61e7b3a4c4d9d945b0b8e2514 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:06:18 +0100 Subject: [PATCH 18/27] Expose through BandwidthMetricTransport --- libp2p/src/builder/phase/bandwidth_metrics.rs | 2 +- misc/metrics/src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs index 15e9d7334fb..b6cf13334d0 100644 --- a/libp2p/src/builder/phase/bandwidth_metrics.rs +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -21,7 +21,7 @@ impl SwarmBuilder { phase: BehaviourPhase { relay_behaviour: self.phase.relay_behaviour, - transport: libp2p_metrics::bandwidth::Transport::new( + transport: libp2p_metrics::BandwidthMetricTransport::new( self.phase.transport, registry, ) diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index cd99bc7d7fd..d21796ed453 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -27,8 +27,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -// TODO: pub? -pub mod bandwidth; +mod bandwidth; #[cfg(feature = "dcutr")] mod dcutr; #[cfg(feature = "gossipsub")] @@ -44,6 +43,7 @@ mod protocol_stack; mod relay; mod swarm; +pub use bandwidth::Transport as BandwidthMetricTransport; pub use prometheus_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. From 78e564b92e866ecb5c31eabec42ae3640cf51fbd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:19:47 +0100 Subject: [PATCH 19/27] Add changelog entry --- misc/metrics/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index acad2043fc8..cedcedd4986 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -2,6 +2,8 @@ - Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`. See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). +- Add `BandwidthMetricTransport`, wrapping an existing `Transport`, exposing Prometheus bandwidth metrics. + See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). ## 0.13.1 From 3276d9663f286166a035607088285e9802eedd23 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:22:08 +0100 Subject: [PATCH 20/27] Bump misc/server version --- Cargo.lock | 2 +- Cargo.toml | 2 +- misc/server/CHANGELOG.md | 7 +++++++ misc/server/Cargo.toml | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1d28fdb497..445c9c7e389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3023,7 +3023,7 @@ dependencies = [ [[package]] name = "libp2p-server" -version = "0.12.3" +version = "0.12.4" dependencies = [ "base64 0.21.5", "clap", diff --git a/Cargo.toml b/Cargo.toml index e7044a185bf..84016f3d06f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ libp2p-quic = { version = "0.10.0", path = "transports/quic" } libp2p-relay = { version = "0.17.0", path = "protocols/relay" } libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.26.0", path = "protocols/request-response" } -libp2p-server = { version = "0.12.3", path = "misc/server" } +libp2p-server = { version = "0.12.4", path = "misc/server" } libp2p-swarm = { version = "0.44.0", path = "swarm" } libp2p-swarm-derive = { version = "0.34.0", path = "swarm-derive" } libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" } diff --git a/misc/server/CHANGELOG.md b/misc/server/CHANGELOG.md index 60ef7f828e8..ea551507360 100644 --- a/misc/server/CHANGELOG.md +++ b/misc/server/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.12.4 - unreleased + +### Added + +- Expose `libp2p_bandwidth` Prometheus metrics. + See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). + ## 0.12.3 ### Changed diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index efaa43a8658..655dcb41348 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-server" -version = "0.12.3" +version = "0.12.4" authors = ["Max Inden "] edition = "2021" repository = "https://github.com/libp2p/rust-libp2p" From 7641531398cf57cf4053c662cfae00ba117c1413 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:29:49 +0100 Subject: [PATCH 21/27] Deprecate BandwidthLogging --- libp2p/CHANGELOG.md | 3 +++ libp2p/src/bandwidth.rs | 9 +++++++++ libp2p/src/builder/phase/bandwidth_logging.rs | 2 ++ libp2p/src/builder/phase/bandwidth_metrics.rs | 1 + libp2p/src/builder/phase/other_transport.rs | 2 ++ libp2p/src/builder/phase/quic.rs | 5 +++-- libp2p/src/transport_ext.rs | 5 ++++- misc/metrics/CHANGELOG.md | 1 + 8 files changed, 25 insertions(+), 3 deletions(-) diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 636d0622256..cea6cf0532d 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -11,6 +11,9 @@ - Remove deprecated `development_transport`. Use `libp2p::SwarmBuilder` instead. See [PR 4732](https://github.com/libp2p/rust-libp2p/pull/4732). +- Introduce `SwarmBuilder::with_bandwidth_metrics` exposing Prometheus bandwidth metrics per transport protocol stack and direction (in-/ outbound). + Deprecate `Transport::with_bandwidth_logging` and `SwarmBuilder::with_bandwidth_logging` in favor of the new `SwarmBuilder::with_bandwidth_metrics`. + See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). ## 0.52.4 diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index dc696ce07e2..dcec306f444 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -43,9 +43,11 @@ use std::{ pub(crate) struct BandwidthLogging { #[pin] inner: SMInner, + #[allow(deprecated)] sinks: Arc, } +#[allow(deprecated)] impl BandwidthLogging { /// Creates a new [`BandwidthLogging`] around the stream muxer. pub(crate) fn new(inner: SMInner, sinks: Arc) -> Self { @@ -101,11 +103,13 @@ where } /// Allows obtaining the average bandwidth of the streams. +#[deprecated(note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead.")] pub struct BandwidthSinks { inbound: AtomicU64, outbound: AtomicU64, } +#[allow(deprecated)] impl BandwidthSinks { /// Returns a new [`BandwidthSinks`]. pub(crate) fn new() -> Arc { @@ -137,6 +141,7 @@ impl BandwidthSinks { pub(crate) struct InstrumentedStream { #[pin] inner: SMInner, + #[allow(deprecated)] sinks: Arc, } @@ -148,6 +153,7 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read(cx, buf))?; + #[allow(deprecated)] this.sinks.inbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, @@ -162,6 +168,7 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; + #[allow(deprecated)] this.sinks.inbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, @@ -178,6 +185,7 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write(cx, buf))?; + #[allow(deprecated)] this.sinks.outbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, @@ -192,6 +200,7 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; + #[allow(deprecated)] this.sinks.outbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, diff --git a/libp2p/src/builder/phase/bandwidth_logging.rs b/libp2p/src/builder/phase/bandwidth_logging.rs index e3c41e15612..cee9498fcaa 100644 --- a/libp2p/src/builder/phase/bandwidth_logging.rs +++ b/libp2p/src/builder/phase/bandwidth_logging.rs @@ -1,4 +1,5 @@ use super::*; +#[allow(deprecated)] use crate::bandwidth::BandwidthSinks; use crate::transport_ext::TransportExt; use crate::SwarmBuilder; @@ -13,6 +14,7 @@ pub struct BandwidthLoggingPhase { impl SwarmBuilder> { + #[allow(deprecated)] #[deprecated(note = "Use `with_bandwidth_metrics` instead.")] pub fn with_bandwidth_logging( self, diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs index b6cf13334d0..b9350b54327 100644 --- a/libp2p/src/builder/phase/bandwidth_metrics.rs +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -1,4 +1,5 @@ use super::*; +#[allow(deprecated)] use crate::bandwidth::BandwidthSinks; use crate::transport_ext::TransportExt; use crate::SwarmBuilder; diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index b786735ec50..03994f8f992 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -9,6 +9,7 @@ use libp2p_core::{Negotiated, UpgradeInfo}; #[cfg(feature = "relay")] use libp2p_identity::PeerId; +#[allow(deprecated)] use crate::bandwidth::BandwidthSinks; use crate::SwarmBuilder; @@ -146,6 +147,7 @@ impl impl SwarmBuilder> { + #[allow(deprecated)] #[deprecated(note = "Use `with_bandwidth_metrics` instead.")] pub fn with_bandwidth_logging( self, diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index b43b041f358..d00e4a8d97f 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -247,14 +247,15 @@ impl_quic_phase_with_websocket!( rw_stream_sink::RwStreamSink> ); impl SwarmBuilder> { + #[allow(deprecated)] #[deprecated(note = "Use `with_bandwidth_metrics` instead.")] pub fn with_bandwidth_logging( self, ) -> ( SwarmBuilder< - Provider, + Provider, BandwidthMetricsPhase, - >, + >, Arc, ) { #[allow(deprecated)] diff --git a/libp2p/src/transport_ext.rs b/libp2p/src/transport_ext.rs index 8f7c16574f6..a77dc66062a 100644 --- a/libp2p/src/transport_ext.rs +++ b/libp2p/src/transport_ext.rs @@ -24,10 +24,11 @@ use crate::core::{ muxing::{StreamMuxer, StreamMuxerBox}, transport::Boxed, }; +#[allow(deprecated)] use crate::{ bandwidth::{BandwidthLogging, BandwidthSinks}, - Transport, }; +use crate::Transport; use libp2p_identity::PeerId; use std::sync::Arc; @@ -66,6 +67,8 @@ pub trait TransportExt: Transport { /// /// let (transport, sinks) = transport.with_bandwidth_logging(); /// ``` + #[allow(deprecated)] + #[deprecated(note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead.")] fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) where Self: Sized + Send + Unpin + 'static, diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index cedcedd4986..7ab9761fb85 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -3,6 +3,7 @@ - Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`. See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). - Add `BandwidthMetricTransport`, wrapping an existing `Transport`, exposing Prometheus bandwidth metrics. + See also `SwarmBuilder::with_bandwidth_metrics`. See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). ## 0.13.1 From 190492b9869ffcbe6970ef8c4ec41a71331d8497 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Nov 2023 20:31:14 +0100 Subject: [PATCH 22/27] fmt --- libp2p/src/bandwidth.rs | 4 +++- libp2p/src/builder/phase/quic.rs | 4 ++-- libp2p/src/transport_ext.rs | 10 +++++----- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index dcec306f444..1840805bba1 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -103,7 +103,9 @@ where } /// Allows obtaining the average bandwidth of the streams. -#[deprecated(note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead.")] +#[deprecated( + note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead." +)] pub struct BandwidthSinks { inbound: AtomicU64, outbound: AtomicU64, diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index d00e4a8d97f..38bda3d34b9 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -253,9 +253,9 @@ impl SwarmBuilder ( SwarmBuilder< - Provider, + Provider, BandwidthMetricsPhase, - >, + >, Arc, ) { #[allow(deprecated)] diff --git a/libp2p/src/transport_ext.rs b/libp2p/src/transport_ext.rs index a77dc66062a..0ef877bd0b9 100644 --- a/libp2p/src/transport_ext.rs +++ b/libp2p/src/transport_ext.rs @@ -20,14 +20,12 @@ //! Provides the `TransportExt` trait. +#[allow(deprecated)] +use crate::bandwidth::{BandwidthLogging, BandwidthSinks}; use crate::core::{ muxing::{StreamMuxer, StreamMuxerBox}, transport::Boxed, }; -#[allow(deprecated)] -use crate::{ - bandwidth::{BandwidthLogging, BandwidthSinks}, -}; use crate::Transport; use libp2p_identity::PeerId; use std::sync::Arc; @@ -68,7 +66,9 @@ pub trait TransportExt: Transport { /// let (transport, sinks) = transport.with_bandwidth_logging(); /// ``` #[allow(deprecated)] - #[deprecated(note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead.")] + #[deprecated( + note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead." + )] fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) where Self: Sized + Send + Unpin + 'static, From bd83ba978cc368e01b2066cc0a96f30e9021483b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 4 Nov 2023 14:42:40 +0100 Subject: [PATCH 23/27] Update misc/server/CHANGELOG.md Co-authored-by: Thomas Eizinger --- misc/server/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/server/CHANGELOG.md b/misc/server/CHANGELOG.md index ea551507360..e224f6597df 100644 --- a/misc/server/CHANGELOG.md +++ b/misc/server/CHANGELOG.md @@ -2,7 +2,7 @@ ### Added -- Expose `libp2p_bandwidth` Prometheus metrics. +- Expose `libp2p_bandwidth_bytes` Prometheus metrics. See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). ## 0.12.3 From 164c8126de4477cf0c5eeba647d6d3495ce75690 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 4 Nov 2023 14:49:18 +0100 Subject: [PATCH 24/27] Rename to BandwidthTransport --- libp2p/src/bandwidth.rs | 2 +- libp2p/src/builder/phase/bandwidth_metrics.rs | 2 +- libp2p/src/transport_ext.rs | 2 +- misc/metrics/CHANGELOG.md | 2 +- misc/metrics/src/lib.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index 1840805bba1..040292bf0ac 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -104,7 +104,7 @@ where /// Allows obtaining the average bandwidth of the streams. #[deprecated( - note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead." + note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthTransport` instead." )] pub struct BandwidthSinks { inbound: AtomicU64, diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs index b9350b54327..77b6dd391b7 100644 --- a/libp2p/src/builder/phase/bandwidth_metrics.rs +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -22,7 +22,7 @@ impl SwarmBuilder { phase: BehaviourPhase { relay_behaviour: self.phase.relay_behaviour, - transport: libp2p_metrics::BandwidthMetricTransport::new( + transport: libp2p_metrics::BandwidthTransport::new( self.phase.transport, registry, ) diff --git a/libp2p/src/transport_ext.rs b/libp2p/src/transport_ext.rs index 0ef877bd0b9..bca0b8f4576 100644 --- a/libp2p/src/transport_ext.rs +++ b/libp2p/src/transport_ext.rs @@ -67,7 +67,7 @@ pub trait TransportExt: Transport { /// ``` #[allow(deprecated)] #[deprecated( - note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthMetricTransport` instead." + note = "Use `libp2p::SwarmBuilder::with_bandwidth_metrics` or `libp2p_metrics::BandwidthTransport` instead." )] fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) where diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 7ab9761fb85..b253d2c8f2c 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -2,7 +2,7 @@ - Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`. See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). -- Add `BandwidthMetricTransport`, wrapping an existing `Transport`, exposing Prometheus bandwidth metrics. +- Add `BandwidthTransport`, wrapping an existing `Transport`, exposing Prometheus bandwidth metrics. See also `SwarmBuilder::with_bandwidth_metrics`. See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index d21796ed453..afa9cb876a6 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -43,7 +43,7 @@ mod protocol_stack; mod relay; mod swarm; -pub use bandwidth::Transport as BandwidthMetricTransport; +pub use bandwidth::Transport as BandwidthTransport; pub use prometheus_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. From 756241dbecfab852d506b8f1b925c45192e989a9 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 4 Nov 2023 14:53:07 +0100 Subject: [PATCH 25/27] Allow deprecated at the top of module --- libp2p/src/bandwidth.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/libp2p/src/bandwidth.rs b/libp2p/src/bandwidth.rs index 040292bf0ac..b84cbb7e27b 100644 --- a/libp2p/src/bandwidth.rs +++ b/libp2p/src/bandwidth.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +#![allow(deprecated)] + use crate::core::muxing::{StreamMuxer, StreamMuxerEvent}; use futures::{ @@ -43,11 +45,9 @@ use std::{ pub(crate) struct BandwidthLogging { #[pin] inner: SMInner, - #[allow(deprecated)] sinks: Arc, } -#[allow(deprecated)] impl BandwidthLogging { /// Creates a new [`BandwidthLogging`] around the stream muxer. pub(crate) fn new(inner: SMInner, sinks: Arc) -> Self { @@ -111,7 +111,6 @@ pub struct BandwidthSinks { outbound: AtomicU64, } -#[allow(deprecated)] impl BandwidthSinks { /// Returns a new [`BandwidthSinks`]. pub(crate) fn new() -> Arc { @@ -143,7 +142,6 @@ impl BandwidthSinks { pub(crate) struct InstrumentedStream { #[pin] inner: SMInner, - #[allow(deprecated)] sinks: Arc, } @@ -155,7 +153,6 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read(cx, buf))?; - #[allow(deprecated)] this.sinks.inbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, @@ -170,7 +167,6 @@ impl AsyncRead for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; - #[allow(deprecated)] this.sinks.inbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, @@ -187,7 +183,6 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write(cx, buf))?; - #[allow(deprecated)] this.sinks.outbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, @@ -202,7 +197,6 @@ impl AsyncWrite for InstrumentedStream { ) -> Poll> { let this = self.project(); let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; - #[allow(deprecated)] this.sinks.outbound.fetch_add( u64::try_from(num_bytes).unwrap_or(u64::max_value()), Ordering::Relaxed, From b01f31982bb679af1a88a83cfb8a3fcb8d065b35 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 10 Nov 2023 17:37:50 +0100 Subject: [PATCH 26/27] fmt --- libp2p/src/builder/phase/bandwidth_metrics.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/libp2p/src/builder/phase/bandwidth_metrics.rs b/libp2p/src/builder/phase/bandwidth_metrics.rs index 77b6dd391b7..52daa731ddd 100644 --- a/libp2p/src/builder/phase/bandwidth_metrics.rs +++ b/libp2p/src/builder/phase/bandwidth_metrics.rs @@ -22,11 +22,8 @@ impl SwarmBuilder { phase: BehaviourPhase { relay_behaviour: self.phase.relay_behaviour, - transport: libp2p_metrics::BandwidthTransport::new( - self.phase.transport, - registry, - ) - .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))), + transport: libp2p_metrics::BandwidthTransport::new(self.phase.transport, registry) + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))), }, keypair: self.keypair, phantom: PhantomData, From f4c1ec7d45c1859abd3e0184d927ecd23219a4c4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 10 Nov 2023 18:06:59 +0100 Subject: [PATCH 27/27] Fix changelog and version --- Cargo.lock | 2 +- Cargo.toml | 2 +- misc/metrics/CHANGELOG.md | 9 ++++++--- misc/metrics/Cargo.toml | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3925f931c44..4622790ff63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2803,7 +2803,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" -version = "0.14.0" +version = "0.14.1" dependencies = [ "futures", "instant", diff --git a/Cargo.toml b/Cargo.toml index 8cca83152ed..dda224934c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-identity = { version = "0.2.7" } libp2p-kad = { version = "0.45.1", path = "protocols/kad" } libp2p-mdns = { version = "0.45.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.2.0", path = "misc/memory-connection-limits" } -libp2p-metrics = { version = "0.14.0", path = "misc/metrics" } +libp2p-metrics = { version = "0.14.1", path = "misc/metrics" } libp2p-mplex = { version = "0.41.0", path = "muxers/mplex" } libp2p-muxer-test-harness = { path = "muxers/test-harness" } libp2p-noise = { version = "0.44.0", path = "transports/noise" } diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 445a4a886a1..6086d579c04 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,11 +1,14 @@ -## 0.14.0 +## 0.14.1 - unreleased -- Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`. - See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). - Add `BandwidthTransport`, wrapping an existing `Transport`, exposing Prometheus bandwidth metrics. See also `SwarmBuilder::with_bandwidth_metrics`. See [PR 4727](https://github.com/libp2p/rust-libp2p/pull/4727). +## 0.14.0 + +- Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`. + See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). + ## 0.13.1 - Enable gossipsub related data-type fields when compiling for wasm. diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index c2b023db999..0607808214c 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-metrics" edition = "2021" rust-version = { workspace = true } description = "Metrics for libp2p" -version = "0.14.0" +version = "0.14.1" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p"