Skip to content

Commit

Permalink
feat(swarm): expose ConnectionId and add conn duration metric
Browse files Browse the repository at this point in the history
- Exposes the `ConnectionId` in the various `SwarmEvent` variants.
- Tracks connection duration in `libp2p-metrics::swarm`.

Pull-Request: #3927.
  • Loading branch information
mxinden authored May 17, 2023
1 parent a580906 commit cc5b346
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 49 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/dcutr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ fn main() -> Result<(), Box<dyn Error>> {
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
_ => {}
Expand Down
5 changes: 4 additions & 1 deletion examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,10 @@ impl EventLoop {
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"),
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} => eprintln!("Dialing {peer_id}"),
e => panic!("{e:?}"),
}
}
Expand Down
5 changes: 5 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
- Raise MSRV to 1.65.
See [PR 3715].

- Replace `libp2p_swarm_connections_closed` `Counter` with `libp2p_swarm_connections_duration` `Histogram` which additionally tracks the duration of a connection.
Note that you can use the `_count` metric of the `Histogram` as a replacement for the `Counter`.
See [PR 3927].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3927]: https://github.com/libp2p/rust-libp2p/pull/3927
[PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325

## 0.12.0
Expand Down
3 changes: 2 additions & 1 deletion misc/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ relay = ["libp2p-relay"]
dcutr = ["libp2p-dcutr"]

[dependencies]
instant = "0.1.11"
libp2p-core = { workspace = true }
libp2p-dcutr = { workspace = true, optional = true }
libp2p-identify = { workspace = true, optional = true }
Expand All @@ -27,7 +28,7 @@ libp2p-ping = { workspace = true, optional = true }
libp2p-relay = { workspace = true, optional = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
prometheus-client = { version = "0.21.0" }
prometheus-client = { version = "0.21.1"}
once_cell = "1.16.0"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
Expand Down
107 changes: 77 additions & 30 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use crate::protocol_stack;
use instant::Instant;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use prometheus_client::registry::{Registry, Unit};

pub(crate) struct Metrics {
connections_incoming: Family<AddressLabels, Counter>,
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,

connections_established: Family<ConnectionEstablishedLabels, Counter>,
connections_establishment_duration: Family<ConnectionEstablishmentDurationLabels, Histogram>,
connections_closed: Family<ConnectionClosedLabels, Counter>,
connections_established: Family<ConnectionLabels, Counter>,
connections_establishment_duration: Family<ConnectionLabels, Histogram>,
connections_duration: Family<ConnectionClosedLabels, Histogram>,

new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Family<AddressLabels, Counter>,
Expand All @@ -41,6 +46,8 @@ pub(crate) struct Metrics {

dial_attempt: Counter,
outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,

connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
}

impl Metrics {
Expand Down Expand Up @@ -110,34 +117,42 @@ impl Metrics {
connections_established.clone(),
);

let connections_closed = Family::default();
let connections_establishment_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 1.5, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register(
"connections_closed",
"Number of connections closed",
connections_closed.clone(),
"connections_establishment_duration",
"Time it took (locally) to establish connections",
connections_establishment_duration.clone(),
);

let connections_establishment_duration = Family::new_with_constructor(
create_connection_establishment_duration_histogram as fn() -> Histogram,
);
sub_registry.register(
let connections_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 3.0, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register_with_unit(
"connections_establishment_duration",
"Time it took (locally) to establish connections",
Unit::Seconds,
connections_establishment_duration.clone(),
);

Self {
connections_incoming,
connections_incoming_error,
connections_established,
connections_closed,
new_listen_addr,
expired_listen_addr,
listener_closed,
listener_error,
dial_attempt,
outgoing_connection_error,
connections_establishment_duration,
connections_duration,
connections: Default::default(),
}
}
}
Expand All @@ -149,24 +164,44 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ConnectionEstablished {
endpoint,
established_in: time_taken,
connection_id,
..
} => {
let labels = ConnectionEstablishedLabels {
let labels = ConnectionLabels {
role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
};
self.connections_established.get_or_create(&labels).inc();
self.connections_establishment_duration
.get_or_create(&labels)
.observe(time_taken.as_secs_f64());
self.connections
.lock()
.expect("lock not to be poisoned")
.insert(*connection_id, Instant::now());
}
libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => {
self.connections_closed
.get_or_create(&ConnectionClosedLabels {
libp2p_swarm::SwarmEvent::ConnectionClosed {
endpoint,
connection_id,
cause,
..
} => {
let labels = ConnectionClosedLabels {
connection: ConnectionLabels {
role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
})
.inc();
},
cause: cause.as_ref().map(Into::into),
};
self.connections_duration.get_or_create(&labels).observe(
self.connections
.lock()
.expect("lock not to be poisoned")
.remove(connection_id)
.expect("closed connection to previously be established")
.elapsed()
.as_secs_f64(),
);
}
libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => {
self.connections_incoming
Expand All @@ -187,7 +222,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
})
.inc();
}
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id } => {
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
let peer = match peer_id {
Some(_) => PeerStatus::Known,
None => PeerStatus::Unknown,
Expand Down Expand Up @@ -261,25 +296,41 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ListenerError { .. } => {
self.listener_error.inc();
}
libp2p_swarm::SwarmEvent::Dialing(_) => {
libp2p_swarm::SwarmEvent::Dialing { .. } => {
self.dial_attempt.inc();
}
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionEstablishedLabels {
struct ConnectionLabels {
role: Role,
protocols: String,
}

type ConnectionEstablishmentDurationLabels = ConnectionEstablishedLabels;

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionClosedLabels {
role: Role,
protocols: String,
cause: Option<ConnectionError>,
#[prometheus(flatten)]
connection: ConnectionLabels,
}

#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum ConnectionError {
Io,
KeepAliveTimeout,
Handler,
}

impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
match value {
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -359,7 +410,3 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
}
}
}

fn create_connection_establishment_duration_histogram() -> Histogram {
Histogram::new(exponential_buckets(0.01, 1.5, 20))
}
18 changes: 14 additions & 4 deletions protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,18 @@ async fn test_dial_back() {
num_established,
concurrent_dial_errors,
established_in: _,
connection_id: _,
} => {
assert_eq!(peer_id, client_id);
assert_eq!(num_established, NonZeroU32::new(2).unwrap());
assert!(concurrent_dial_errors.unwrap().is_empty());
assert_eq!(address, expect_addr);
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
}
Expand Down Expand Up @@ -143,12 +147,15 @@ async fn test_dial_error() {

loop {
match server.next_swarm_event().await {
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
assert_eq!(peer_id.unwrap(), client_id);
assert!(matches!(error, DialError::Transport(_)));
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
}
Expand Down Expand Up @@ -307,7 +314,10 @@ async fn test_dial_multiple_addr() {
assert_eq!(address, dial_addresses[1]);
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
}
Expand Down
5 changes: 4 additions & 1 deletion protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ async fn wait_for_reservation(
break;
}
}
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{e:?}"),
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/perf/src/bin/perf-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn main() -> Result<()> {
let server_peer_id = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
e => panic!("{e:?}"),
Expand All @@ -113,7 +113,7 @@ async fn main() -> Result<()> {
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?,
Expand Down
2 changes: 1 addition & 1 deletion protocols/perf/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn perf() {
.wait(|e| match e {
SwarmEvent::IncomingConnection { .. } => panic!(),
SwarmEvent::ConnectionEstablished { .. } => None,
SwarmEvent::Dialing(_) => None,
SwarmEvent::Dialing { .. } => None,
SwarmEvent::Behaviour(client::Event { result, .. }) => Some(result),
e => panic!("{e:?}"),
})
Expand Down
10 changes: 8 additions & 2 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ async fn connection_established_to(
) {
loop {
match swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) if peer == other => {
break
Expand Down Expand Up @@ -419,7 +422,10 @@ async fn wait_for_reservation(
async fn wait_for_dial(client: &mut Swarm<Client>, remote: PeerId) -> bool {
loop {
match client.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == remote => {}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == remote => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == remote => return true,
SwarmEvent::OutgoingConnectionError { peer_id, .. } if peer_id == Some(remote) => {
return false
Expand Down
Loading

0 comments on commit cc5b346

Please sign in to comment.