From 5b3612bb293feb1fafceab0202cc0438e594200a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 15 Nov 2022 08:09:19 +1100 Subject: [PATCH 1/9] test: Remove `dialer_can_receive` stream muxer test (#3108) Various muxer implementations struggle to fulfill this test. In practice, it doesn't matter much because we always run `multistream-select` on top of a newly negotiated stream so we never end up actually reading from a stream that we have never written to. Relevant discussion: https://github.com/kpp/rust-libp2p/pull/27#discussion_r1012128418 --- muxers/mplex/tests/compliance.rs | 9 --------- muxers/test-harness/src/lib.rs | 25 ------------------------- muxers/yamux/tests/compliance.rs | 10 ---------- 3 files changed, 44 deletions(-) diff --git a/muxers/mplex/tests/compliance.rs b/muxers/mplex/tests/compliance.rs index 849ff9e0c209..2e8089b968c7 100644 --- a/muxers/mplex/tests/compliance.rs +++ b/muxers/mplex/tests/compliance.rs @@ -9,15 +9,6 @@ async fn close_implies_flush() { libp2p_muxer_test_harness::close_implies_flush(alice, bob).await; } -#[async_std::test] -async fn dialer_can_receive() { - let (alice, bob) = - libp2p_muxer_test_harness::connected_muxers_on_memory_transport::() - .await; - - libp2p_muxer_test_harness::dialer_can_receive(alice, bob).await; -} - #[async_std::test] async fn read_after_close() { let (alice, bob) = diff --git a/muxers/test-harness/src/lib.rs b/muxers/test-harness/src/lib.rs index 65dedf581bab..8eeafb363f56 100644 --- a/muxers/test-harness/src/lib.rs +++ b/muxers/test-harness/src/lib.rs @@ -80,31 +80,6 @@ where .await; } -/// Verifies that the dialer of a substream can receive a message. -pub async fn dialer_can_receive(alice: A, bob: B) -where - A: StreamMuxer + Unpin, - B: StreamMuxer + Unpin, - S: AsyncRead + AsyncWrite + Send + Unpin + 'static, - E: fmt::Debug, -{ - run_commutative( - alice, - bob, - |mut stream| async move { - let mut buf = Vec::new(); - stream.read_to_end(&mut buf).await.unwrap(); - - assert_eq!(buf, b"PING"); - }, - |mut stream| async move { - stream.write_all(b"PING").await.unwrap(); - stream.close().await.unwrap(); - }, - ) - .await; -} - /// Verifies that we can "half-close" a substream. pub async fn read_after_close(alice: A, bob: B) where diff --git a/muxers/yamux/tests/compliance.rs b/muxers/yamux/tests/compliance.rs index 51cbea387d29..2937936e7dcd 100644 --- a/muxers/yamux/tests/compliance.rs +++ b/muxers/yamux/tests/compliance.rs @@ -9,16 +9,6 @@ async fn close_implies_flush() { libp2p_muxer_test_harness::close_implies_flush(alice, bob).await; } -#[async_std::test] -#[ignore] // Hangs forever, is this a harness bug? It passes if we try to write to the stream. -async fn dialer_can_receive() { - let (alice, bob) = - libp2p_muxer_test_harness::connected_muxers_on_memory_transport::() - .await; - - libp2p_muxer_test_harness::dialer_can_receive(alice, bob).await; -} - #[async_std::test] async fn read_after_close() { let (alice, bob) = From 5f196dd23161ede715b642111ff9df01f643ddd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 14 Nov 2022 22:35:18 +0000 Subject: [PATCH 2/9] refactor(mdns): Parse messages using `trust-dns-proto` instead of `dns-parse` (#3102) --- protocols/mdns/CHANGELOG.md | 4 ++ protocols/mdns/Cargo.toml | 7 ++- protocols/mdns/src/behaviour/iface/dns.rs | 8 +-- protocols/mdns/src/behaviour/iface/query.rs | 57 +++++++++++---------- protocols/mdns/src/lib.rs | 4 ++ 5 files changed, 45 insertions(+), 35 deletions(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 33f9816ec0ce..b75746c62b96 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.41.0`. +- Use `trust-dns-proto` to parse DNS messages. See [PR 3102]. + +[PR 3102]: https://github.com/libp2p/rust-libp2p/pull/3102 + # 0.41.0 - Remove default features. If you previously depended on `async-io` you need to enable this explicitly now. See [PR 2918]. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 5729043514ec..78b3a26d67f7 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -11,8 +11,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +async-io = { version = "1.3.1", optional = true } data-encoding = "2.3.2" -dns-parser = "0.8.0" futures = "0.3.13" if-watch = "2.0.0" libp2p-core = { version = "0.38.0", path = "../../core" } @@ -21,10 +21,9 @@ log = "0.4.14" rand = "0.8.3" smallvec = "1.6.1" socket2 = { version = "0.4.0", features = ["all"] } -void = "1.0.2" - -async-io = { version = "1.3.1", optional = true } tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true} +trust-dns-proto = { version = "0.22.0", default-features = false, features = ["mdns"] } +void = "1.0.2" [features] tokio = ["dep:tokio"] diff --git a/protocols/mdns/src/behaviour/iface/dns.rs b/protocols/mdns/src/behaviour/iface/dns.rs index 4590e1e266e8..1f0825727a23 100644 --- a/protocols/mdns/src/behaviour/iface/dns.rs +++ b/protocols/mdns/src/behaviour/iface/dns.rs @@ -395,14 +395,14 @@ impl error::Error for MdnsResponseError {} #[cfg(test)] mod tests { use super::*; - use dns_parser::Packet; use libp2p_core::identity; use std::time::Duration; + use trust_dns_proto::op::Message; #[test] fn build_query_correct() { let query = build_query(); - assert!(Packet::parse(&query).is_ok()); + assert!(Message::from_vec(&query).is_ok()); } #[test] @@ -417,14 +417,14 @@ mod tests { Duration::from_secs(60), ); for packet in packets { - assert!(Packet::parse(&packet).is_ok()); + assert!(Message::from_vec(&packet).is_ok()); } } #[test] fn build_service_discovery_response_correct() { let query = build_service_discovery_response(0x1234, Duration::from_secs(120)); - assert!(Packet::parse(&query).is_ok()); + assert!(Message::from_vec(&query).is_ok()); } #[test] diff --git a/protocols/mdns/src/behaviour/iface/query.rs b/protocols/mdns/src/behaviour/iface/query.rs index 70e38016849a..50b86bd888fc 100644 --- a/protocols/mdns/src/behaviour/iface/query.rs +++ b/protocols/mdns/src/behaviour/iface/query.rs @@ -19,8 +19,7 @@ // DEALINGS IN THE SOFTWARE. use super::dns; -use crate::{META_QUERY_SERVICE, SERVICE_NAME}; -use dns_parser::{Packet, RData}; +use crate::{META_QUERY_SERVICE_FQDN, SERVICE_NAME_FQDN}; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -28,6 +27,10 @@ use libp2p_core::{ }; use std::time::Instant; use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration}; +use trust_dns_proto::{ + op::Message, + rr::{Name, RData}, +}; /// A valid mDNS packet received by the service. #[derive(Debug)] @@ -44,33 +47,33 @@ impl MdnsPacket { pub fn new_from_bytes( buf: &[u8], from: SocketAddr, - ) -> Result, dns_parser::Error> { - let packet = Packet::parse(buf)?; + ) -> Result, trust_dns_proto::error::ProtoError> { + let packet = Message::from_vec(buf)?; - if !packet.header.query { - return Ok(Some(MdnsPacket::Response(MdnsResponse::new(packet, from)))); + if packet.query().is_none() { + return Ok(Some(MdnsPacket::Response(MdnsResponse::new(&packet, from)))); } if packet - .questions + .queries() .iter() - .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) + .any(|q| q.name().to_utf8() == SERVICE_NAME_FQDN) { return Ok(Some(MdnsPacket::Query(MdnsQuery { from, - query_id: packet.header.id, + query_id: packet.header().id(), }))); } if packet - .questions + .queries() .iter() - .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) + .any(|q| q.name().to_utf8() == META_QUERY_SERVICE_FQDN) { // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? return Ok(Some(MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery { from, - query_id: packet.header.id, + query_id: packet.header().id(), }))); } @@ -144,21 +147,21 @@ pub struct MdnsResponse { impl MdnsResponse { /// Creates a new `MdnsResponse` based on the provided `Packet`. - pub fn new(packet: Packet<'_>, from: SocketAddr) -> MdnsResponse { + pub fn new(packet: &Message, from: SocketAddr) -> MdnsResponse { let peers = packet - .answers + .answers() .iter() .filter_map(|record| { - if record.name.to_string().as_bytes() != SERVICE_NAME { + if record.name().to_string() != SERVICE_NAME_FQDN { return None; } - let record_value = match record.data { - RData::PTR(record) => record.0.to_string(), + let record_value = match record.data() { + Some(RData::PTR(record)) => record, _ => return None, }; - MdnsPeer::new(&packet, record_value, record.ttl) + MdnsPeer::new(packet, record_value, record.ttl()) }) .collect(); @@ -225,17 +228,17 @@ pub struct MdnsPeer { impl MdnsPeer { /// Creates a new `MdnsPeer` based on the provided `Packet`. - pub fn new(packet: &Packet<'_>, record_value: String, ttl: u32) -> Option { + pub fn new(packet: &Message, record_value: &Name, ttl: u32) -> Option { let mut my_peer_id: Option = None; let addrs = packet - .additional + .additionals() .iter() .filter_map(|add_record| { - if add_record.name.to_string() != record_value { + if add_record.name() != record_value { return None; } - if let RData::TXT(ref txt) = add_record.data { + if let Some(RData::TXT(ref txt)) = add_record.data() { Some(txt) } else { None @@ -337,16 +340,16 @@ mod tests { ); for bytes in packets { - let packet = Packet::parse(&bytes).expect("unable to parse packet"); + let packet = Message::from_vec(&bytes).expect("unable to parse packet"); let record_value = packet - .answers + .answers() .iter() .filter_map(|record| { - if record.name.to_string().as_bytes() != SERVICE_NAME { + if record.name().to_utf8() != SERVICE_NAME_FQDN { return None; } - let record_value = match record.data { - RData::PTR(record) => record.0.to_string(), + let record_value = match record.data() { + Some(RData::PTR(record)) => record, _ => return None, }; Some(record_value) diff --git a/protocols/mdns/src/lib.rs b/protocols/mdns/src/lib.rs index 298f48ea9754..368582ecb7b5 100644 --- a/protocols/mdns/src/lib.rs +++ b/protocols/mdns/src/lib.rs @@ -49,8 +49,12 @@ pub use crate::behaviour::TokioMdns; /// The DNS service name for all libp2p peers used to query for addresses. const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; +/// `SERVICE_NAME` as a Fully Qualified Domain Name. +const SERVICE_NAME_FQDN: &str = "_p2p._udp.local."; /// The meta query for looking up the `SERVICE_NAME`. const META_QUERY_SERVICE: &[u8] = b"_services._dns-sd._udp.local"; +/// `META_QUERY_SERVICE` as a Fully Qualified Domain Name. +const META_QUERY_SERVICE_FQDN: &str = "_services._dns-sd._udp.local."; pub const IPV4_MDNS_MULTICAST_ADDRESS: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251); pub const IPV6_MDNS_MULTICAST_ADDRESS: Ipv6Addr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0xFB); From 368705a1465c4322948d14ee46d42475c472ca1e Mon Sep 17 00:00:00 2001 From: yojoe <58518123+yojoe@users.noreply.github.com> Date: Tue, 15 Nov 2022 02:22:11 +0100 Subject: [PATCH 3/9] docs(core): Document byte format of `PeerId` (#3084) PeerId data is more than just a multihash of the public key. See discussion: https://github.com/libp2p/rust-libp2p/discussions/3079 --- core/src/peer_id.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/peer_id.rs b/core/src/peer_id.rs index a70d8f6823cc..014f1fc240dd 100644 --- a/core/src/peer_id.rs +++ b/core/src/peer_id.rs @@ -34,8 +34,8 @@ const MAX_INLINE_KEY_LENGTH: usize = 42; /// Identifier of a peer of the network. /// -/// The data is a multihash of the public key of the peer. -/// See the [spec](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md) for more information. +/// The data is a CIDv0 compatible multihash of the protobuf encoded public key of the peer +/// as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md). #[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct PeerId { multihash: Multihash, From 9dadf5c83051d1a0711b0f466ccfa13abc6c6f48 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 15 Nov 2022 21:40:55 +1100 Subject: [PATCH 4/9] muxers/yamux: Mitigation of unnecessary stream drops (#3071) --- muxers/yamux/CHANGELOG.md | 9 +++++++++ muxers/yamux/Cargo.toml | 2 +- muxers/yamux/src/lib.rs | 28 ++++++++++++++++++++-------- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 3e6ac4e41279..e1dcdfb7a88d 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -2,6 +2,15 @@ - Update to `libp2p-core` `v0.38.0`. +# 0.41.1 + +- Yield from `StreamMuxer::poll` as soon as we receive a single substream. + This fixes [issue 3041]. + See [PR 3071]. + +[PR 3071]: https://github.com/libp2p/rust-libp2p/pull/3071/ +[issue 3041]: https://github.com/libp2p/rust-libp2p/issues/3041/ + # 0.41.0 - Update to `libp2p-core` `v0.37.0`. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index cb8a1e4e5ef1..6c4790924b58 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -22,7 +22,7 @@ log = "0.4" async-std = { version = "1.7.0", features = ["attributes"] } libp2p-muxer-test-harness = { path = "../test-harness" } -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index ce3639e572f2..42fb1621e56b 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -31,6 +31,7 @@ use futures::{ use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; +use std::task::Waker; use std::{ fmt, io, iter, mem, pin::Pin, @@ -55,6 +56,8 @@ pub struct Yamux { /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. /// Once the buffer is full, new inbound streams are dropped. inbound_stream_buffer: VecDeque, + /// Waker to be called when new inbound streams are available. + inbound_stream_waker: Option, } const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; @@ -81,6 +84,7 @@ where }, control: ctrl, inbound_stream_buffer: VecDeque::default(), + inbound_stream_waker: None, } } } @@ -101,6 +105,7 @@ where }, control: ctrl, inbound_stream_buffer: VecDeque::default(), + inbound_stream_waker: None, } } } @@ -122,6 +127,8 @@ where return Poll::Ready(Ok(stream)); } + self.inbound_stream_waker = Some(cx.waker().clone()); + self.poll_inner(cx) } @@ -140,17 +147,22 @@ where ) -> Poll> { let this = self.get_mut(); - loop { - let inbound_stream = ready!(this.poll_inner(cx))?; - - if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { - log::warn!("dropping {inbound_stream} because buffer is full"); - drop(inbound_stream); - continue; - } + let inbound_stream = ready!(this.poll_inner(cx))?; + if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { + log::warn!("dropping {inbound_stream} because buffer is full"); + drop(inbound_stream); + } else { this.inbound_stream_buffer.push_back(inbound_stream); + + if let Some(waker) = this.inbound_stream_waker.take() { + waker.wake() + } } + + // Schedule an immediate wake-up, allowing other code to run. + cx.waker().wake_by_ref(); + Poll::Pending } fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> { From ca8bcd8288e843669a5cd5026ef67442f9c632c0 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 15 Nov 2022 12:58:42 +0100 Subject: [PATCH 5/9] core: Expose `peer_id::ParseError` (#3113) --- core/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/lib.rs b/core/src/lib.rs index c7b9aa6068c0..6205d7156151 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -72,6 +72,7 @@ pub use identity::PublicKey; pub use multiaddr::Multiaddr; pub use multihash; pub use muxing::StreamMuxer; +pub use peer_id::ParseError; pub use peer_id::PeerId; pub use peer_record::PeerRecord; pub use signed_envelope::SignedEnvelope; From d8fe7bf49f9db6893b664298c0086616058ceee9 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 15 Nov 2022 12:34:38 +0000 Subject: [PATCH 6/9] transports/tcp: Update if-watch to `v3.0.0` (#3101) Update to `if-watch` version 3.0.0 and pass through features, such that `libp2p-tcp/async-io` selects `if-watch/smol` and `libp2p-tcp/tokio` brings in `if-watch/tokio`. The mDNS part is already done in #3096. --- transports/tcp/CHANGELOG.md | 3 ++ transports/tcp/Cargo.toml | 14 ++++----- transports/tcp/src/lib.rs | 28 +++++++++--------- transports/tcp/src/provider.rs | 10 +++++++ transports/tcp/src/provider/async_io.rs | 11 ++++++- transports/tcp/src/provider/tokio.rs | 38 +++++++++++++++---------- 6 files changed, 67 insertions(+), 37 deletions(-) diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 4a9ed144bda1..58fb99808f78 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,11 +1,14 @@ # 0.38.0 [unreleased] +- Update to `if-watch` `v3.0.0` and pass through `tokio` and `async-io` features. See [PR 3101]. + - Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961]. - Remove `TcpListenStream` and `TcpListenerEvent` from public API. See [PR 2961]. - Update to `libp2p-core` `v0.38.0`. +[PR 3101]: https://github.com/libp2p/rust-libp2p/pull/3101 [PR 2961]: https://github.com/libp2p/rust-libp2p/pull/2961 # 0.37.0 diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 502e05cdf3a9..585d5a91480d 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -11,26 +11,26 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-io-crate = { package = "async-io", version = "1.2.0", optional = true } +async-io = { version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = "2.0.0" +if-watch = "3.0.0" libc = "0.2.80" libp2p-core = { version = "0.38.0", path = "../../core" } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } -tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true } +tokio = { version = "1.19.0", default-features = false, features = ["net"], optional = true } [features] -tokio = ["tokio-crate"] -async-io = ["async-io-crate"] +tokio = ["dep:tokio", "if-watch/tokio"] +async-io = ["dep:async-io", "if-watch/smol"] [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["full"] } +tokio = { version = "1.0.1", default-features = false, features = ["full"] } env_logger = "0.9.0" -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 6e760ae50246..c36152b2f2b1 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -41,7 +41,7 @@ use futures::{ prelude::*, }; use futures_timer::Delay; -use if_watch::{IfEvent, IfWatcher}; +use if_watch::IfEvent; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -385,7 +385,7 @@ where return TcpListenStream::::new( id, listener, - Some(IfWatcher::new()?), + Some(T::new_if_watcher()?), self.port_reuse.clone(), ); } @@ -656,7 +656,7 @@ where /// become or stop being available. /// /// `None` if the socket is only listening on a single interface. - if_watcher: Option, + if_watcher: Option, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -680,7 +680,7 @@ where fn new( listener_id: ListenerId, listener: TcpListener, - if_watcher: Option, + if_watcher: Option, port_reuse: PortReuse, ) -> io::Result { let listen_addr = listener.local_addr()?; @@ -706,7 +706,7 @@ where fn disable_port_reuse(&mut self) { match &self.if_watcher { Some(if_watcher) => { - for ip_net in if_watcher.iter() { + for ip_net in T::addrs(if_watcher) { self.port_reuse .unregister(ip_net.addr(), self.listen_addr.port()); } @@ -749,7 +749,7 @@ where } if let Some(if_watcher) = me.if_watcher.as_mut() { - while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { + while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) { match event { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); @@ -986,11 +986,11 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1055,11 +1055,11 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1162,11 +1162,11 @@ mod tests { let (port_reuse_tx, port_reuse_rx) = oneshot::channel(); let listener = listener::(addr.clone(), ready_tx, port_reuse_rx); let dialer = dialer::(addr, ready_rx, port_reuse_tx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1220,7 +1220,7 @@ mod tests { #[cfg(feature = "tokio")] { let listener = listen_twice::(addr); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); @@ -1253,7 +1253,7 @@ mod tests { #[cfg(feature = "tokio")] { - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index a341026e7e6c..d94da7a6fc35 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -28,6 +28,8 @@ pub mod tokio; use futures::future::BoxFuture; use futures::io::{AsyncRead, AsyncWrite}; +use futures::Stream; +use if_watch::{IfEvent, IpNet}; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::task::{Context, Poll}; use std::{fmt, io}; @@ -46,6 +48,14 @@ pub trait Provider: Clone + Send + 'static { type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; /// The type of TCP listeners obtained from [`Provider::new_listener`]. type Listener: Send + Unpin; + /// The type of IfWatcher obtained from [`Provider::new_if_watcher`]. + type IfWatcher: Stream> + Send + Unpin; + + /// Create a new IfWatcher responsible for detecting IP address changes. + fn new_if_watcher() -> io::Result; + + /// An iterator over all currently discovered addresses. + fn addrs(_: &Self::IfWatcher) -> Vec; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index 0fc1102ff425..590f109d3c30 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -20,7 +20,7 @@ use super::{Incoming, Provider}; -use async_io_crate::Async; +use async_io::Async; use futures::future::{BoxFuture, FutureExt}; use std::io; use std::net; @@ -55,6 +55,15 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = Async; type Listener = Async; + type IfWatcher = if_watch::smol::IfWatcher; + + fn new_if_watcher() -> io::Result { + Self::IfWatcher::new() + } + + fn addrs(if_watcher: &Self::IfWatcher) -> Vec { + if_watcher.iter().copied().collect() + } fn new_listener(l: net::TcpListener) -> io::Result { Async::new(l) diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 486478338920..e4b75c8d814b 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -39,7 +39,6 @@ use std::task::{Context, Poll}; /// # use libp2p_core::Transport; /// # use futures::future; /// # use std::pin::Pin; -/// # use tokio_crate as tokio; /// # /// # #[tokio::main] /// # async fn main() { @@ -59,17 +58,26 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = TcpStream; - type Listener = tokio_crate::net::TcpListener; + type Listener = tokio::net::TcpListener; + type IfWatcher = if_watch::tokio::IfWatcher; + + fn new_if_watcher() -> io::Result { + Self::IfWatcher::new() + } + + fn addrs(if_watcher: &Self::IfWatcher) -> Vec { + if_watcher.iter().copied().collect() + } fn new_listener(l: net::TcpListener) -> io::Result { - tokio_crate::net::TcpListener::try_from(l) + tokio::net::TcpListener::try_from(l) } fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result> { async move { - // Taken from [`tokio_crate::net::TcpStream::connect_mio`]. + // Taken from [`tokio::net::TcpStream::connect_mio`]. - let stream = tokio_crate::net::TcpStream::try_from(s)?; + let stream = tokio::net::TcpStream::try_from(s)?; // Once we've connected, wait for the stream to be writable as // that's when the actual connection has been initiated. Once we're @@ -109,12 +117,12 @@ impl Provider for Tcp { } } -/// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. +/// A [`tokio::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. #[derive(Debug)] -pub struct TcpStream(pub tokio_crate::net::TcpStream); +pub struct TcpStream(pub tokio::net::TcpStream); -impl From for tokio_crate::net::TcpStream { - fn from(t: TcpStream) -> tokio_crate::net::TcpStream { +impl From for tokio::net::TcpStream { + fn from(t: TcpStream) -> tokio::net::TcpStream { t.0 } } @@ -125,8 +133,8 @@ impl AsyncRead for TcpStream { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let mut read_buf = tokio_crate::io::ReadBuf::new(buf); - futures::ready!(tokio_crate::io::AsyncRead::poll_read( + let mut read_buf = tokio::io::ReadBuf::new(buf); + futures::ready!(tokio::io::AsyncRead::poll_read( Pin::new(&mut self.0), cx, &mut read_buf @@ -141,15 +149,15 @@ impl AsyncWrite for TcpStream { cx: &mut Context, buf: &[u8], ) -> Poll> { - tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) + tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio_crate::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) + tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio_crate::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) + tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) } fn poll_write_vectored( @@ -157,6 +165,6 @@ impl AsyncWrite for TcpStream { cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { - tokio_crate::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) + tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) } } From d5ea93dd71a9948f69cfd74e4d3300ece74422ce Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Tue, 15 Nov 2022 15:26:03 +0100 Subject: [PATCH 7/9] feat(swarm): Make executor for connection tasks explicit (#3097) Previously, the executor for connection tasks silently defaulted to a `futures::executor::ThreadPool`. This causes issues such as https://github.com/libp2p/rust-libp2p/issues/2230. With this patch, we force the user to choose, which executor they want to run the connection tasks on which results in overall simpler API with less footguns. Closes #3068. --- Cargo.toml | 5 +- core/CHANGELOG.md | 3 + core/src/lib.rs | 20 --- examples/chat-tokio.rs | 23 +-- examples/chat.rs | 2 +- examples/distributed-key-value-store.rs | 2 +- examples/file-sharing.rs | 9 +- examples/gossipsub-chat.rs | 2 +- examples/ipfs-kad.rs | 2 +- examples/ipfs-private.rs | 2 +- examples/mdns-passive-discovery.rs | 2 +- examples/ping.rs | 2 +- misc/metrics/examples/metrics/main.rs | 2 +- misc/multistream-select/tests/transport.rs | 6 +- protocols/autonat/examples/autonat_client.rs | 2 +- protocols/autonat/examples/autonat_server.rs | 2 +- protocols/autonat/tests/test_client.rs | 2 +- protocols/autonat/tests/test_server.rs | 2 +- protocols/dcutr/examples/dcutr.rs | 11 +- protocols/dcutr/tests/lib.rs | 4 +- protocols/gossipsub/tests/smoke.rs | 2 +- protocols/identify/examples/identify.rs | 2 +- protocols/identify/src/behaviour.rs | 12 +- protocols/kad/src/behaviour/test.rs | 2 +- protocols/mdns/tests/use-async-std.rs | 2 +- protocols/mdns/tests/use-tokio.rs | 2 +- protocols/ping/tests/ping.rs | 14 +- protocols/relay/examples/relay_v2.rs | 2 +- protocols/relay/tests/v2.rs | 4 +- protocols/rendezvous/examples/discover.rs | 6 +- protocols/rendezvous/examples/register.rs | 6 +- .../examples/register_with_identify.rs | 6 +- .../rendezvous/examples/rendezvous_point.rs | 6 +- protocols/rendezvous/tests/harness.rs | 8 +- protocols/request-response/tests/ping.rs | 14 +- swarm/CHANGELOG.md | 61 +++++++ swarm/Cargo.toml | 4 + swarm/src/connection/pool.rs | 91 +++++----- swarm/src/executor.rs | 48 +++++ swarm/src/lib.rs | 166 +++++++++++++++--- transports/tls/tests/smoke.rs | 2 +- 41 files changed, 384 insertions(+), 181 deletions(-) create mode 100644 swarm/src/executor.rs diff --git a/Cargo.toml b/Cargo.toml index 00535bec98dd..7bea3907e12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,8 @@ full = [ "websocket", "yamux", ] -async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"] + +async-std = ["libp2p-swarm/async-std", "libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"] autonat = ["dep:libp2p-autonat"] dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"] deflate = ["dep:libp2p-deflate"] @@ -74,7 +75,7 @@ rsa = ["libp2p-core/rsa"] secp256k1 = ["libp2p-core/secp256k1"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] tcp = ["dep:libp2p-tcp"] -tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] +tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"] wasm-ext = ["dep:libp2p-wasm-ext"] diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index d7059cab98fa..c3831a3b3a9a 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -4,8 +4,11 @@ - Hide `prost::Error` from public API in `FromEnvelopeError::InvalidPeerRecord` and `signed_envelope::DecodingError`. See [PR 3058]. +- Move `Executor` to `libp2p-swarm`. See [PR 3097]. + [PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031 [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058 +[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 # 0.37.0 diff --git a/core/src/lib.rs b/core/src/lib.rs index 6205d7156151..2b20f5156e49 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -83,23 +83,3 @@ pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, U #[derive(thiserror::Error, Debug)] #[error(transparent)] pub struct DecodeError(prost::DecodeError); - -use std::{future::Future, pin::Pin}; - -/// Implemented on objects that can run a `Future` in the background. -/// -/// > **Note**: While it may be tempting to implement this trait on types such as -/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is -/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically -/// > be used as fallback by libp2p. The `Executor` trait should therefore only be -/// > about running `Future`s in the background. -pub trait Executor { - /// Run the given future in the background until it ends. - fn exec(&self, future: Pin + Send>>); -} - -impl + Send>>)> Executor for F { - fn exec(&self, f: Pin + Send>>) { - self(f) - } -} diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index d4b0b121553d..f69c376b8173 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -39,7 +39,7 @@ use libp2p::{ TokioMdns, }, mplex, noise, - swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmEvent}, tcp, Multiaddr, PeerId, Transport, }; use std::error::Error; @@ -97,23 +97,12 @@ async fn main() -> Result<(), Box> { } // Create a Swarm to manage peers and events. - let mut swarm = { - let mdns = TokioMdns::new(Default::default())?; - let mut behaviour = MyBehaviour { - floodsub: Floodsub::new(peer_id), - mdns, - }; - - behaviour.floodsub.subscribe(floodsub_topic.clone()); - - SwarmBuilder::new(transport, behaviour, peer_id) - // We want the connection background tasks to be spawned - // onto the tokio runtime. - .executor(Box::new(|fut| { - tokio::spawn(fut); - })) - .build() + let mdns = TokioMdns::new(Default::default())?; + let behaviour = MyBehaviour { + floodsub: Floodsub::new(peer_id), + mdns, }; + let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { diff --git a/examples/chat.rs b/examples/chat.rs index 26db660b1c27..c7e4b73720fc 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box> { }; behaviour.floodsub.subscribe(floodsub_topic.clone()); - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_threadpool_executor(transport, behaviour, local_peer_id) }; // Reach out to another node if specified diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 8d8c6d917ef9..1ed87718dbe2 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -99,7 +99,7 @@ async fn main() -> Result<(), Box> { let kademlia = Kademlia::new(local_peer_id, store); let mdns = Mdns::new(MdnsConfig::default())?; let behaviour = MyBehaviour { kademlia, mdns }; - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Read full lines from stdin diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 85c5d3d50dd7..620ce6cd5d91 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -219,9 +219,7 @@ mod network { ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; - use libp2p::swarm::{ - ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, - }; + use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; @@ -252,7 +250,7 @@ mod network { // Build the Swarm, connecting the lower layer transport logic with the // higher layer network behaviour logic. - let swarm = SwarmBuilder::new( + let swarm = Swarm::with_threadpool_executor( libp2p::development_transport(id_keys).await?, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), @@ -263,8 +261,7 @@ mod network { ), }, peer_id, - ) - .build(); + ); let (command_sender, command_receiver) = mpsc::channel(0); let (event_sender, event_receiver) = mpsc::channel(0); diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index e86d1836746a..532ebfff58bd 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -110,7 +110,7 @@ async fn main() -> Result<(), Box> { let mut swarm = { let mdns = Mdns::new(MdnsConfig::default())?; let behaviour = MyBehaviour { gossipsub, mdns }; - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Read full lines from stdin diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 659cd49b607b..2c370472bec9 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -68,7 +68,7 @@ async fn main() -> Result<(), Box> { behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone()); } - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Order Kademlia to search for a peer. diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index ddaa28896f10..1d89f5f9af0e 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -204,7 +204,7 @@ async fn main() -> Result<(), Box> { println!("Subscribing to {gossipsub_topic:?}"); behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap(); - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Reach out to other nodes if specified diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index 8231d888dcc6..477c97663916 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm that establishes connections through the given transport. // Note that the MDNS behaviour itself will not actually inititiate any connections, // as it only uses UDP. - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; loop { diff --git a/examples/ping.rs b/examples/ping.rs index a2da28341279..5deb8544ccb8 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { let transport = libp2p::development_transport(local_key).await?; - let mut swarm = Swarm::new(transport, Behaviour::default(), local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, Behaviour::default(), local_peer_id); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. diff --git a/misc/metrics/examples/metrics/main.rs b/misc/metrics/examples/metrics/main.rs index 1f661cb9ecda..ab1e041bcc40 100644 --- a/misc/metrics/examples/metrics/main.rs +++ b/misc/metrics/examples/metrics/main.rs @@ -70,7 +70,7 @@ fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); info!("Local peer id: {:?}", local_peer_id); - let mut swarm = Swarm::new( + let mut swarm = Swarm::without_executor( block_on(libp2p::development_transport(local_key))?, Behaviour::default(), local_peer_id, diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index bf5dd247b401..a66d20eadd5d 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -61,8 +61,10 @@ fn transport_upgrade() { let listen_addr = Multiaddr::from(Protocol::Memory(random::())); - let mut dialer = Swarm::new(dialer_transport, dummy::Behaviour, dialer_id); - let mut listener = Swarm::new(listener_transport, dummy::Behaviour, listener_id); + let mut dialer = + Swarm::with_async_std_executor(dialer_transport, dummy::Behaviour, dialer_id); + let mut listener = + Swarm::with_async_std_executor(listener_transport, dummy::Behaviour, listener_id); listener.listen_on(listen_addr).unwrap(); let (addr_sender, addr_receiver) = oneshot::channel(); diff --git a/protocols/autonat/examples/autonat_client.rs b/protocols/autonat/examples/autonat_client.rs index bdd54ff406d1..1c897620db69 100644 --- a/protocols/autonat/examples/autonat_client.rs +++ b/protocols/autonat/examples/autonat_client.rs @@ -67,7 +67,7 @@ async fn main() -> Result<(), Box> { let behaviour = Behaviour::new(local_key.public()); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); swarm.listen_on( Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) diff --git a/protocols/autonat/examples/autonat_server.rs b/protocols/autonat/examples/autonat_server.rs index 82a06b8b55d6..a3bcda1ee347 100644 --- a/protocols/autonat/examples/autonat_server.rs +++ b/protocols/autonat/examples/autonat_server.rs @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box> { let behaviour = Behaviour::new(local_key.public()); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); swarm.listen_on( Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 420bcf998298..5e23304e3f28 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -40,7 +40,7 @@ async fn init_swarm(config: Config) -> Swarm { let local_id = PeerId::from_public_key(&keypair.public()); let transport = development_transport(keypair).await.unwrap(); let behaviour = Behaviour::new(local_id, config); - Swarm::new(transport, behaviour, local_id) + Swarm::with_async_std_executor(transport, behaviour, local_id) } async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) { diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index b45ae7ecafc9..3035a6d8d9e3 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -39,7 +39,7 @@ async fn init_swarm(config: Config) -> Swarm { let local_id = PeerId::from_public_key(&keypair.public()); let transport = development_transport(keypair).await.unwrap(); let behaviour = Behaviour::new(local_id, config); - Swarm::new(transport, behaviour, local_id) + Swarm::with_async_std_executor(transport, behaviour, local_id) } async fn init_server(config: Option) -> (Swarm, PeerId, Multiaddr) { diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index bc33eddcad1b..e12ff4ccfa24 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use clap::Parser; -use futures::executor::block_on; +use futures::executor::{block_on, ThreadPool}; use futures::future::FutureExt; use futures::stream::StreamExt; use libp2p::core::multiaddr::{Multiaddr, Protocol}; @@ -155,9 +155,12 @@ fn main() -> Result<(), Box> { dcutr: dcutr::behaviour::Behaviour::new(), }; - let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id) - .dial_concurrency_factor(10_u8.try_into().unwrap()) - .build(); + let mut swarm = match ThreadPool::new() { + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp), + Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id), + } + .dial_concurrency_factor(10_u8.try_into().unwrap()) + .build(); swarm .listen_on( diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 0bc5df7e0eb2..8c687835d9f5 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -98,7 +98,7 @@ fn build_relay() -> Swarm { let transport = build_transport(MemoryTransport::default().boxed(), local_public_key); - Swarm::new( + Swarm::with_threadpool_executor( transport, relay::Relay::new( local_peer_id, @@ -122,7 +122,7 @@ fn build_client() -> Swarm { local_public_key, ); - Swarm::new( + Swarm::with_threadpool_executor( transport, Client { relay: behaviour, diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 43ad944dccb2..db99179b07f5 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -171,7 +171,7 @@ fn build_node() -> (Multiaddr, Swarm) { .build() .unwrap(); let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id), config).unwrap(); - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::without_executor(transport, behaviour, peer_id); let port = 1 + random::(); let mut addr: Multiaddr = Protocol::Memory(port).into(); diff --git a/protocols/identify/examples/identify.rs b/protocols/identify/examples/identify.rs index b02eb1c9ebf0..6f5fb2a14272 100644 --- a/protocols/identify/examples/identify.rs +++ b/protocols/identify/examples/identify.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { local_key.public(), )); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 92279765d5a8..2215fde0ee83 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -584,7 +584,7 @@ mod tests { let protocol = Behaviour::new( Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -593,7 +593,7 @@ mod tests { let protocol = Behaviour::new( Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -661,7 +661,7 @@ mod tests { let (mut swarm1, pubkey1) = { let (pubkey, transport) = transport(); let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone())); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -670,7 +670,7 @@ mod tests { let protocol = Behaviour::new( Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -742,7 +742,7 @@ mod tests { .with_initial_delay(Duration::from_secs(10)), ); - Swarm::new(transport, protocol, pubkey.to_peer_id()) + Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) }; let mut swarm2 = { @@ -751,7 +751,7 @@ mod tests { Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - Swarm::new(transport, protocol, pubkey.to_peer_id()) + Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) }; let swarm1_peer_id = *swarm1.local_peer_id(); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index c61ffaf158f2..47da12904bb1 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -66,7 +66,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let store = MemoryStore::new(local_id); let behaviour = Kademlia::with_config(local_id, store, cfg); - let mut swarm = Swarm::new(transport, behaviour, local_id); + let mut swarm = Swarm::without_executor(transport, behaviour, local_id); let address: Multiaddr = Protocol::Memory(random::()).into(); swarm.listen_on(address.clone()).unwrap(); diff --git a/protocols/mdns/tests/use-async-std.rs b/protocols/mdns/tests/use-async-std.rs index 2ddb36355be3..3774179fefa5 100644 --- a/protocols/mdns/tests/use-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -62,7 +62,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box> let peer_id = PeerId::from(id_keys.public()); let transport = libp2p::development_transport(id_keys).await?; let behaviour = Mdns::new(config)?; - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) } diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 830557d3f009..dfd2d7a08c80 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -58,7 +58,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box(1); @@ -127,10 +128,11 @@ fn max_failures() { .with_max_failures(max_failures.into()); let (peer1_id, trans) = mk_transport(muxer); - let mut swarm1 = Swarm::new(trans, Behaviour::new(cfg.clone()), peer1_id); + let mut swarm1 = + Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); let (peer2_id, trans) = mk_transport(muxer); - let mut swarm2 = Swarm::new(trans, Behaviour::new(cfg), peer2_id); + let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -197,10 +199,10 @@ fn max_failures() { #[test] fn unsupported_doesnt_fail() { let (peer1_id, trans) = mk_transport(MuxerChoice::Mplex); - let mut swarm1 = Swarm::new(trans, keep_alive::Behaviour, peer1_id); + let mut swarm1 = Swarm::with_async_std_executor(trans, keep_alive::Behaviour, peer1_id); let (peer2_id, trans) = mk_transport(MuxerChoice::Mplex); - let mut swarm2 = Swarm::new(trans, Behaviour::default(), peer2_id); + let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::default(), peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); diff --git a/protocols/relay/examples/relay_v2.rs b/protocols/relay/examples/relay_v2.rs index 102637ca2fcc..137132e6956c 100644 --- a/protocols/relay/examples/relay_v2.rs +++ b/protocols/relay/examples/relay_v2.rs @@ -66,7 +66,7 @@ fn main() -> Result<(), Box> { )), }; - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::without_executor(transport, behaviour, local_peer_id); // Listen on all interfaces let listen_addr = Multiaddr::empty() diff --git a/protocols/relay/tests/v2.rs b/protocols/relay/tests/v2.rs index 34e6e8ee6ceb..b31d6866228f 100644 --- a/protocols/relay/tests/v2.rs +++ b/protocols/relay/tests/v2.rs @@ -291,7 +291,7 @@ fn build_relay() -> Swarm { let transport = upgrade_transport(MemoryTransport::default().boxed(), local_public_key); - Swarm::new( + Swarm::with_threadpool_executor( transport, Relay { ping: ping::Behaviour::new(ping::Config::new()), @@ -318,7 +318,7 @@ fn build_client() -> Swarm { local_public_key, ); - Swarm::new( + Swarm::with_threadpool_executor( transport, Client { ping: ping::Behaviour::new(ping::Config::new()), diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index 0d1a6daca58e..d400301bdbe2 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -25,7 +25,7 @@ use libp2p::multiaddr::Protocol; use libp2p::ping; use libp2p::swarm::{keep_alive, SwarmEvent}; use libp2p::Swarm; -use libp2p::{development_transport, rendezvous, Multiaddr}; +use libp2p::{rendezvous, tokio_development_transport, Multiaddr}; use std::time::Duration; use void::Void; @@ -41,8 +41,8 @@ async fn main() { .parse() .unwrap(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs index 471ff739426d..903a79610d84 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/protocols/rendezvous/examples/register.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::ping; use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::Multiaddr; -use libp2p::{development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use libp2p_swarm::AddressScore; use std::time::Duration; @@ -39,8 +39,8 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index bf30fa906f73..06844be12818 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -25,7 +25,7 @@ use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::Multiaddr; -use libp2p::{development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use std::time::Duration; use void::Void; @@ -40,8 +40,8 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index c8688b98573e..1e98f73d6dad 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use void::Void; /// Examples for the rendezvous protocol: @@ -43,8 +43,8 @@ async fn main() { let key = identity::ed25519::SecretKey::from_bytes(bytes).expect("we always pass 32 bytes"); let identity = identity::Keypair::Ed25519(key.into()); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index cad3a087afb2..523f34c76db0 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -28,7 +28,7 @@ use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; use libp2p::noise::NoiseAuthenticated; -use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::yamux::YamuxConfig; use std::fmt::Debug; use std::time::Duration; @@ -53,11 +53,7 @@ where .timeout(Duration::from_secs(5)) .boxed(); - SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id) - .executor(Box::new(|future| { - let _ = tokio::spawn(future); - })) - .build() + Swarm::with_tokio_executor(transport, behaviour_fn(peer_id, identity), peer_id) } fn get_rand_memory_address() -> Multiaddr { diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 77f8efd8fec5..e97b725d4aff 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -48,7 +48,7 @@ fn is_response_outbound() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let request_id1 = swarm1 .behaviour_mut() @@ -87,11 +87,11 @@ fn ping_protocol() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -176,11 +176,11 @@ fn emits_inbound_connection_closed_failure() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); swarm1.listen_on(addr).unwrap(); @@ -245,11 +245,11 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); swarm1.listen_on(addr).unwrap(); diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f66828db64ad..f04da3da77b2 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -5,9 +5,70 @@ - Export `NetworkBehaviour` derive as `libp2p_swarm::NetworkBehaviour`. This follows the convention of other popular libraries. `serde` for example exports the `Serialize` trait and macro as `serde::Serialize`. See [PR 3055]. + - Feature-gate `NetworkBehaviour` macro behind `macros` feature flag. See [PR 3055]. +- Make executor in Swarm constructor explicit. See [PR 3097]. + + Supported executors: + - Tokio + + Previously + ```rust + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|fut| { + tokio::spawn(fut); + })) + .build(); + ``` + Now + ```rust + let swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id); + ``` + - Async Std + + Previously + ```rust + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|fut| { + async_std::task::spawn(fut); + })) + .build(); + ``` + Now + ```rust + let swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); + ``` + - ThreadPool (see [Issue 3107]) + + In most cases ThreadPool can be replaced by executors or spawning on the local task. + + Previously + ```rust + let swarm = Swarm::new(transport, behaviour, peer_id); + ``` + + Now + ```rust + let swarm = Swarm::with_threadpool_executor(transport, behaviour, peer_id); + ``` + - Without + + Spawns the tasks on the current task, this may result in bad performance so try to use an executor where possible. Previously this was just a fallback when no executor was specified and constructing a `ThreadPool` failed. + + New + ```rust + let swarm = Swarm::without_executor(transport, behaviour, peer_id); + ``` + + Deprecated APIs: + - `Swarm::new` + - `SwarmBuilder::new` + - `SwarmBuilder::executor` + [PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055 +[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 +[Issue 3107]: https://github.com/libp2p/rust-libp2p/issues/3107 # 0.40.1 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 815013bc34f5..a6f9a91a5ca1 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -24,9 +24,13 @@ rand = "0.8" smallvec = "1.6.1" thiserror = "1.0" void = "1" +tokio = { version = "1.15", features = ["rt"], optional = true } +async-std = { version = "1.6.2", optional = true } [features] macros = ["dep:libp2p-swarm-derive"] +tokio = ["dep:tokio"] +async-std = ["dep:async-std"] [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index cc6a9bbd8161..8729b2e36e15 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -54,10 +54,34 @@ use void::Void; mod concurrent_dial; mod task; +enum ExecSwitch { + Executor(Box), + LocalSpawn(FuturesUnordered + Send>>>), +} + +impl ExecSwitch { + fn advance_local(&mut self, cx: &mut Context) { + match self { + ExecSwitch::Executor(_) => {} + ExecSwitch::LocalSpawn(local) => { + while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {} + } + } + } + + fn spawn(&mut self, task: BoxFuture<'static, ()>) { + match self { + Self::Executor(executor) => executor.exec(task), + Self::LocalSpawn(local) => local.push(task), + } + } +} + /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where TTrans: Transport, + THandler: IntoConnectionHandler, { local_id: PeerId, @@ -93,14 +117,9 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, - /// The executor to use for running the background tasks. If `None`, - /// the tasks are kept in `local_spawns` instead and polled on the - /// current thread when the [`Pool`] is polled for new events. - executor: Option>, - - /// If no `executor` is configured, tasks are kept in this set and - /// polled on the current thread when the [`Pool`] is polled for new events. - local_spawns: FuturesUnordered + Send>>>, + /// The executor to use for running connection tasks. Can either be a global executor + /// or a local queue. + executor: ExecSwitch, /// Sender distributed to pending tasks for reporting events back /// to the pool. @@ -299,6 +318,10 @@ where mpsc::channel(config.task_event_buffer_size); let (established_connection_events_tx, established_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); + let executor = match config.executor { + Some(exec) => ExecSwitch::Executor(exec), + None => ExecSwitch::LocalSpawn(Default::default()), + }; Pool { local_id, counters: ConnectionCounters::new(limits), @@ -309,8 +332,7 @@ where dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, - executor: config.executor, - local_spawns: FuturesUnordered::new(), + executor, pending_connection_events_tx, pending_connection_events_rx, established_connection_events_tx, @@ -399,11 +421,7 @@ where } fn spawn(&mut self, task: BoxFuture<'static, ()>) { - if let Some(executor) = &mut self.executor { - executor.exec(task); - } else { - self.local_spawns.push(task); - } + self.executor.spawn(task) } } @@ -820,8 +838,7 @@ where } } - // Advance the tasks in `local_spawns`. - while let Poll::Ready(Some(())) = self.local_spawns.poll_next_unpin(cx) {} + self.executor.advance_local(cx); Poll::Pending } @@ -1073,34 +1090,21 @@ pub struct PoolConfig { max_negotiating_inbound_streams: usize, } -impl Default for PoolConfig { - fn default() -> Self { - PoolConfig { - executor: None, - task_event_buffer_size: 32, - task_command_buffer_size: 7, - // Set to a default of 8 based on frequency of dialer connections +impl PoolConfig { + pub fn new(executor: Option>) -> Self { + Self { + executor, + task_command_buffer_size: 32, + task_event_buffer_size: 7, dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } } -} -impl PoolConfig { /// Configures the executor to use for spawning connection background tasks. - pub fn with_executor(mut self, e: Box) -> Self { - self.executor = Some(e); - self - } - - /// Configures the executor to use for spawning connection background tasks, - /// only if no executor has already been configured. - pub fn or_else_with_executor(mut self, f: F) -> Self - where - F: FnOnce() -> Option>, - { - self.executor = self.executor.or_else(f); + pub fn with_executor(mut self, executor: Box) -> Self { + self.executor = Some(executor); self } @@ -1174,13 +1178,4 @@ mod tests { impl Executor for Dummy { fn exec(&self, _: Pin + Send>>) {} } - - #[test] - fn set_executor() { - PoolConfig::default() - .with_executor(Box::new(Dummy)) - .with_executor(Box::new(|f| { - async_std::task::spawn(f); - })); - } } diff --git a/swarm/src/executor.rs b/swarm/src/executor.rs new file mode 100644 index 000000000000..7799d141d497 --- /dev/null +++ b/swarm/src/executor.rs @@ -0,0 +1,48 @@ +use futures::executor::ThreadPool; +use std::{future::Future, pin::Pin}; + +/// Implemented on objects that can run a `Future` in the background. +/// +/// > **Note**: While it may be tempting to implement this trait on types such as +/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is +/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically +/// > be used as fallback by libp2p. The `Executor` trait should therefore only be +/// > about running `Future`s on a separate task. +pub trait Executor { + /// Run the given future in the background until it ends. + fn exec(&self, future: Pin + Send>>); +} + +impl + Send>>)> Executor for F { + fn exec(&self, f: Pin + Send>>) { + self(f) + } +} + +impl Executor for ThreadPool { + fn exec(&self, future: Pin + Send>>) { + self.spawn_ok(future) + } +} + +#[cfg(feature = "tokio")] +#[derive(Default, Debug, Clone, Copy)] +pub(crate) struct TokioExecutor; + +#[cfg(feature = "tokio")] +impl Executor for TokioExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = tokio::spawn(future); + } +} + +#[cfg(feature = "async-std")] +#[derive(Default, Debug, Clone, Copy)] +pub(crate) struct AsyncStdExecutor; + +#[cfg(feature = "async-std")] +impl Executor for AsyncStdExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = async_std::task::spawn(future); + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6927cfa068ef..6fc1b8707c36 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -64,6 +64,7 @@ mod upgrade; pub mod behaviour; pub mod dial_opts; pub mod dummy; +mod executor; pub mod handler; pub mod keep_alive; @@ -94,6 +95,7 @@ pub use connection::{ ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; +pub use executor::Executor; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, @@ -117,7 +119,7 @@ use libp2p_core::{ muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, upgrade::ProtocolName, - Endpoint, Executor, Multiaddr, Negotiated, PeerId, Transport, + Endpoint, Multiaddr, Negotiated, PeerId, Transport, }; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; @@ -328,12 +330,89 @@ where TBehaviour: NetworkBehaviour, { /// Builds a new `Swarm`. + #[deprecated( + since = "0.41.0", + note = "This constructor is considered ambiguous regarding the executor. Use one of the new, executor-specific constructors or `Swarm::with_threadpool_executor` for the same behaviour." + )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::new(transport, behaviour, local_peer_id).build() + Self::with_threadpool_executor(transport, behaviour, local_peer_id) + } + + /// Builds a new `Swarm` with a provided executor. + pub fn with_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + executor: impl Executor + Send + 'static, + ) -> Self { + SwarmBuilder::with_executor(transport, behaviour, local_peer_id, executor).build() + } + + /// Builds a new `Swarm` with a tokio executor. + #[cfg(feature = "tokio")] + pub fn with_tokio_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self::with_executor( + transport, + behaviour, + local_peer_id, + crate::executor::TokioExecutor, + ) + } + + /// Builds a new `Swarm` with an async-std executor. + #[cfg(feature = "async-std")] + pub fn with_async_std_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self::with_executor( + transport, + behaviour, + local_peer_id, + crate::executor::AsyncStdExecutor, + ) + } + + /// Builds a new `Swarm` with a threadpool executor. + pub fn with_threadpool_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + let builder = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + { + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp), + Err(err) => { + log::warn!("Failed to create executor thread pool: {:?}", err); + SwarmBuilder::without_executor(transport, behaviour, local_peer_id) + } + }; + builder.build() + } + + /// Builds a new `Swarm` without an executor, instead using the current task. + /// + /// ## ⚠️ Performance warning + /// All connections will be polled on the current task, thus quite bad performance + /// characteristics should be expected. Whenever possible use an executor and + /// [`Swarm::with_executor`]. + pub fn without_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build() } /// Returns information about the connections underlying the [`Swarm`]. @@ -1294,16 +1373,67 @@ where /// Creates a new `SwarmBuilder` from the given transport, behaviour and /// local peer ID. The `Swarm` with its underlying `Network` is obtained /// via [`SwarmBuilder::build`]. + #[deprecated( + since = "0.41.0", + note = "Use `SwarmBuilder::with_executor` or `SwarmBuilder::without_executor` instead." + )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { + let executor: Option> = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + .ok() + { + Some(tp) => Some(Box::new(tp)), + None => None, + }; SwarmBuilder { local_peer_id, transport, behaviour, - pool_config: Default::default(), + pool_config: PoolConfig::new(executor), + connection_limits: Default::default(), + } + } + + /// Creates a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and + /// executor. The `Swarm` with its underlying `Network` is obtained via + /// [`SwarmBuilder::build`]. + pub fn with_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + executor: impl Executor + Send + 'static, + ) -> Self { + Self { + local_peer_id, + transport, + behaviour, + pool_config: PoolConfig::new(Some(Box::new(executor))), + connection_limits: Default::default(), + } + } + + /// Creates a new [`SwarmBuilder`] from the given transport, behaviour and local peer ID. The + /// `Swarm` with its underlying `Network` is obtained via [`SwarmBuilder::build`]. + /// + /// ## ⚠️ Performance warning + /// All connections will be polled on the current task, thus quite bad performance + /// characteristics should be expected. Whenever possible use an executor and + /// [`SwarmBuilder::with_executor`]. + pub fn without_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self { + local_peer_id, + transport, + behaviour, + pool_config: PoolConfig::new(None), connection_limits: Default::default(), } } @@ -1313,8 +1443,9 @@ where /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). - pub fn executor(mut self, e: Box) -> Self { - self.pool_config = self.pool_config.with_executor(e); + #[deprecated(since = "0.41.0", note = "Use `SwarmBuilder::with_executor` instead.")] + pub fn executor(mut self, executor: Box) -> Self { + self.pool_config = self.pool_config.with_executor(executor); self } @@ -1412,25 +1543,10 @@ where .map(|info| info.protocol_name().to_vec()) .collect(); - // If no executor has been explicitly configured, try to set up a thread pool. - let pool_config = - self.pool_config.or_else_with_executor(|| { - match ThreadPoolBuilder::new() - .name_prefix("libp2p-swarm-task-") - .create() - { - Ok(tp) => Some(Box::new(move |f| tp.spawn_ok(f))), - Err(err) => { - log::warn!("Failed to create executor thread pool: {:?}", err); - None - } - } - }); - Swarm { local_peer_id: self.local_peer_id, transport: self.transport, - pool: Pool::new(self.local_peer_id, pool_config, self.connection_limits), + pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits), behaviour: self.behaviour, supported_protocols, listened_addrs: HashMap::new(), @@ -1586,6 +1702,7 @@ mod tests { use super::*; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::executor::block_on; + use futures::executor::ThreadPool; use futures::future::poll_fn; use futures::future::Either; use futures::{executor, future, ready}; @@ -1622,7 +1739,12 @@ mod tests { .multiplex(yamux::YamuxConfig::default()) .boxed(); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - SwarmBuilder::new(transport, behaviour, local_public_key.into()) + match ThreadPool::new().ok() { + Some(tp) => { + SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp) + } + None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), + } } fn swarms_connected( diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 1def8717e01f..d30753b8fb57 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -65,7 +65,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p::yamux::YamuxConfig::default()) .boxed(); - Swarm::new( + Swarm::without_executor( transport, keep_alive::Behaviour, identity.public().to_peer_id(), From 69efe63229e120b8d251cdbbbaad23cdcc0c21a1 Mon Sep 17 00:00:00 2001 From: John Turpish <97759690+John-LittleBearLabs@users.noreply.github.com> Date: Tue, 15 Nov 2022 15:45:14 -0500 Subject: [PATCH 8/9] misc/metrics: Add `protocols` label to address-specific metrics (#2982) Previously, we would only track the metrics like the number of open connections. With this patch, we extend these metrics with a `protocols` label that contains a "protocol stack". A protocol stack is a multi-address with all variable parts removed. For example, `/ip4/127.0.0.1/tcp/1234` turns into `/ip4/tcp`. Resolves https://github.com/libp2p/rust-libp2p/issues/2758. --- misc/metrics/CHANGELOG.md | 4 ++ misc/metrics/examples/metrics/main.rs | 25 +++++++- misc/metrics/src/identify.rs | 25 +++++++- misc/metrics/src/lib.rs | 1 + misc/metrics/src/protocol_stack.rs | 27 +++++++++ misc/metrics/src/swarm.rs | 82 ++++++++++++++++++++------- 6 files changed, 138 insertions(+), 26 deletions(-) create mode 100644 misc/metrics/src/protocol_stack.rs diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index c8eecc36e6fb..58460094ae88 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -16,6 +16,10 @@ - Update to `libp2p-gossipsub` `v0.43.0`. +- Add `protocol_stack` metrics. See [PR 2982]. + +[PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/ + # 0.10.0 - Update to `libp2p-swarm` `v0.40.0`. diff --git a/misc/metrics/examples/metrics/main.rs b/misc/metrics/examples/metrics/main.rs index ab1e041bcc40..687dba3f3327 100644 --- a/misc/metrics/examples/metrics/main.rs +++ b/misc/metrics/examples/metrics/main.rs @@ -54,7 +54,7 @@ use futures::stream::StreamExt; use libp2p::core::Multiaddr; use libp2p::metrics::{Metrics, Recorder}; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; -use libp2p::{identity, ping, PeerId, Swarm}; +use libp2p::{identify, identity, ping, PeerId, Swarm}; use libp2p_swarm::keep_alive; use log::info; use prometheus_client::registry::Registry; @@ -68,11 +68,12 @@ fn main() -> Result<(), Box> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); + let local_pub_key = local_key.public(); info!("Local peer id: {:?}", local_peer_id); let mut swarm = Swarm::without_executor( block_on(libp2p::development_transport(local_key))?, - Behaviour::default(), + Behaviour::new(local_pub_key), local_peer_id, ); @@ -95,6 +96,10 @@ fn main() -> Result<(), Box> { info!("{:?}", ping_event); metrics.record(&ping_event); } + SwarmEvent::Behaviour(BehaviourEvent::Identify(identify_event)) => { + info!("{:?}", identify_event); + metrics.record(&identify_event); + } swarm_event => { info!("{:?}", swarm_event); metrics.record(&swarm_event); @@ -109,8 +114,22 @@ fn main() -> Result<(), Box> { /// /// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen /// and can be observed via the metrics. -#[derive(NetworkBehaviour, Default)] +#[derive(NetworkBehaviour)] struct Behaviour { + identify: identify::Behaviour, keep_alive: keep_alive::Behaviour, ping: ping::Behaviour, } + +impl Behaviour { + fn new(local_pub_key: libp2p::identity::PublicKey) -> Self { + Self { + ping: ping::Behaviour::default(), + identify: identify::Behaviour::new(identify::Config::new( + "/ipfs/0.1.0".into(), + local_pub_key, + )), + keep_alive: keep_alive::Behaviour::default(), + } + } +} diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 8f91521713f4..688c67a61909 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -18,9 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::protocol_stack; use libp2p_core::PeerId; -use prometheus_client::encoding::text::{EncodeMetric, Encoder}; +use prometheus_client::encoding::text::{Encode, EncodeMetric, Encoder}; use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; use prometheus_client::metrics::MetricType; use prometheus_client::registry::Registry; @@ -36,6 +38,7 @@ pub struct Metrics { received_info_listen_addrs: Histogram, received_info_protocols: Histogram, sent: Counter, + listen_addresses: Family, } impl Metrics { @@ -100,6 +103,13 @@ impl Metrics { Box::new(sent.clone()), ); + let listen_addresses = Family::default(); + sub_registry.register( + "listen_addresses", + "Number of listen addresses for remote peer per protocol stack", + Box::new(listen_addresses.clone()), + ); + Self { protocols, error, @@ -108,6 +118,7 @@ impl Metrics { received_info_listen_addrs, received_info_protocols, sent, + listen_addresses, } } } @@ -167,6 +178,13 @@ impl super::Recorder for Metrics { .observe(info.protocols.len() as f64); self.received_info_listen_addrs .observe(info.listen_addrs.len() as f64); + for listen_addr in &info.listen_addrs { + self.listen_addresses + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(listen_addr), + }) + .inc(); + } } libp2p_identify::Event::Sent { .. } => { self.sent.inc(); @@ -190,6 +208,11 @@ impl super::Recorder>>>, diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index d42f6b3ffe04..351887260dff 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -38,6 +38,7 @@ mod identify; mod kad; #[cfg(feature = "ping")] mod ping; +mod protocol_stack; #[cfg(feature = "relay")] mod relay; mod swarm; diff --git a/misc/metrics/src/protocol_stack.rs b/misc/metrics/src/protocol_stack.rs new file mode 100644 index 000000000000..1715b51f034a --- /dev/null +++ b/misc/metrics/src/protocol_stack.rs @@ -0,0 +1,27 @@ +use libp2p_core::multiaddr::Multiaddr; + +pub fn as_string(ma: &Multiaddr) -> String { + let len = ma + .protocol_stack() + .fold(0, |acc, proto| acc + proto.len() + 1); + let mut protocols = String::with_capacity(len); + for proto_tag in ma.protocol_stack() { + protocols.push('/'); + protocols.push_str(proto_tag); + } + protocols +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ip6_tcp_wss_p2p() { + let ma = Multiaddr::try_from("/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095/tcp/8000/wss/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC").expect("testbad"); + + let protocol_stack = as_string(&ma); + + assert_eq!(protocol_stack, "/ip6/tcp/wss/p2p"); + } +} diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index e9c5a0493ce6..c4fa8712d142 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -18,37 +18,38 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::protocol_stack; use prometheus_client::encoding::text::Encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::registry::Registry; pub struct Metrics { - connections_incoming: Counter, + connections_incoming: Family, connections_incoming_error: Family, connections_established: Family, connections_closed: Family, - new_listen_addr: Counter, - expired_listen_addr: Counter, + new_listen_addr: Family, + expired_listen_addr: Family, - listener_closed: Counter, + listener_closed: Family, listener_error: Counter, dial_attempt: Counter, outgoing_connection_error: Family, - connected_to_banned_peer: Counter, + connected_to_banned_peer: Family, } impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("swarm"); - let connections_incoming = Counter::default(); + let connections_incoming = Family::default(); sub_registry.register( "connections_incoming", - "Number of incoming connections", + "Number of incoming connections per address stack", Box::new(connections_incoming.clone()), ); @@ -59,21 +60,21 @@ impl Metrics { Box::new(connections_incoming_error.clone()), ); - let new_listen_addr = Counter::default(); + let new_listen_addr = Family::default(); sub_registry.register( "new_listen_addr", "Number of new listen addresses", Box::new(new_listen_addr.clone()), ); - let expired_listen_addr = Counter::default(); + let expired_listen_addr = Family::default(); sub_registry.register( "expired_listen_addr", "Number of expired listen addresses", Box::new(expired_listen_addr.clone()), ); - let listener_closed = Counter::default(); + let listener_closed = Family::default(); sub_registry.register( "listener_closed", "Number of listeners closed", @@ -101,7 +102,7 @@ impl Metrics { Box::new(outgoing_connection_error.clone()), ); - let connected_to_banned_peer = Counter::default(); + let connected_to_banned_peer = Family::default(); sub_registry.register( "connected_to_banned_peer", "Number of connection attempts to banned peer", @@ -146,6 +147,7 @@ impl super::Recorder super::Recorder { - self.connections_incoming.inc(); + libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => { + self.connections_incoming + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(send_back_addr), + }) + .inc(); } - libp2p_swarm::SwarmEvent::IncomingConnectionError { error, .. } => { + libp2p_swarm::SwarmEvent::IncomingConnectionError { + error, + send_back_addr, + .. + } => { self.connections_incoming_error .get_or_create(&IncomingConnectionErrorLabels { error: error.into(), + protocols: protocol_stack::as_string(send_back_addr), }) .inc(); } @@ -221,17 +233,35 @@ impl super::Recorder { - self.connected_to_banned_peer.inc(); + libp2p_swarm::SwarmEvent::BannedPeer { endpoint, .. } => { + self.connected_to_banned_peer + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(endpoint.get_remote_address()), + }) + .inc(); } - libp2p_swarm::SwarmEvent::NewListenAddr { .. } => { - self.new_listen_addr.inc(); + libp2p_swarm::SwarmEvent::NewListenAddr { address, .. } => { + self.new_listen_addr + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(address), + }) + .inc(); } - libp2p_swarm::SwarmEvent::ExpiredListenAddr { .. } => { - self.expired_listen_addr.inc(); + libp2p_swarm::SwarmEvent::ExpiredListenAddr { address, .. } => { + self.expired_listen_addr + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(address), + }) + .inc(); } - libp2p_swarm::SwarmEvent::ListenerClosed { .. } => { - self.listener_closed.inc(); + libp2p_swarm::SwarmEvent::ListenerClosed { addresses, .. } => { + for address in addresses { + self.listener_closed + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(address), + }) + .inc(); + } } libp2p_swarm::SwarmEvent::ListenerError { .. } => { self.listener_error.inc(); @@ -246,11 +276,18 @@ impl super::Recorder Date: Wed, 16 Nov 2022 10:28:03 +1100 Subject: [PATCH 9/9] chore(mergify): Don't ask dependabot to resolve conflicts (#3122) --- .github/mergify.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/mergify.yml b/.github/mergify.yml index f3a2f46bd469..de29495ace8f 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -12,6 +12,7 @@ pull_request_rules: conditions: - conflict - -draft # Draft PRs are allowed to have conflicts. + - -author=dependabot[bot] actions: comment: message: This pull request has merge conflicts. Could you please resolve them @{{author}}? 🙏