From d5ea93dd71a9948f69cfd74e4d3300ece74422ce Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Tue, 15 Nov 2022 15:26:03 +0100 Subject: [PATCH] feat(swarm): Make executor for connection tasks explicit (#3097) Previously, the executor for connection tasks silently defaulted to a `futures::executor::ThreadPool`. This causes issues such as https://github.com/libp2p/rust-libp2p/issues/2230. With this patch, we force the user to choose, which executor they want to run the connection tasks on which results in overall simpler API with less footguns. Closes #3068. --- Cargo.toml | 5 +- core/CHANGELOG.md | 3 + core/src/lib.rs | 20 --- examples/chat-tokio.rs | 23 +-- examples/chat.rs | 2 +- examples/distributed-key-value-store.rs | 2 +- examples/file-sharing.rs | 9 +- examples/gossipsub-chat.rs | 2 +- examples/ipfs-kad.rs | 2 +- examples/ipfs-private.rs | 2 +- examples/mdns-passive-discovery.rs | 2 +- examples/ping.rs | 2 +- misc/metrics/examples/metrics/main.rs | 2 +- misc/multistream-select/tests/transport.rs | 6 +- protocols/autonat/examples/autonat_client.rs | 2 +- protocols/autonat/examples/autonat_server.rs | 2 +- protocols/autonat/tests/test_client.rs | 2 +- protocols/autonat/tests/test_server.rs | 2 +- protocols/dcutr/examples/dcutr.rs | 11 +- protocols/dcutr/tests/lib.rs | 4 +- protocols/gossipsub/tests/smoke.rs | 2 +- protocols/identify/examples/identify.rs | 2 +- protocols/identify/src/behaviour.rs | 12 +- protocols/kad/src/behaviour/test.rs | 2 +- protocols/mdns/tests/use-async-std.rs | 2 +- protocols/mdns/tests/use-tokio.rs | 2 +- protocols/ping/tests/ping.rs | 14 +- protocols/relay/examples/relay_v2.rs | 2 +- protocols/relay/tests/v2.rs | 4 +- protocols/rendezvous/examples/discover.rs | 6 +- protocols/rendezvous/examples/register.rs | 6 +- .../examples/register_with_identify.rs | 6 +- .../rendezvous/examples/rendezvous_point.rs | 6 +- protocols/rendezvous/tests/harness.rs | 8 +- protocols/request-response/tests/ping.rs | 14 +- swarm/CHANGELOG.md | 61 +++++++ swarm/Cargo.toml | 4 + swarm/src/connection/pool.rs | 91 +++++----- swarm/src/executor.rs | 48 +++++ swarm/src/lib.rs | 166 +++++++++++++++--- transports/tls/tests/smoke.rs | 2 +- 41 files changed, 384 insertions(+), 181 deletions(-) create mode 100644 swarm/src/executor.rs diff --git a/Cargo.toml b/Cargo.toml index 00535bec98d..7bea3907e12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,8 @@ full = [ "websocket", "yamux", ] -async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"] + +async-std = ["libp2p-swarm/async-std", "libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"] autonat = ["dep:libp2p-autonat"] dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"] deflate = ["dep:libp2p-deflate"] @@ -74,7 +75,7 @@ rsa = ["libp2p-core/rsa"] secp256k1 = ["libp2p-core/secp256k1"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] tcp = ["dep:libp2p-tcp"] -tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] +tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"] wasm-ext = ["dep:libp2p-wasm-ext"] diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index d7059cab98f..c3831a3b3a9 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -4,8 +4,11 @@ - Hide `prost::Error` from public API in `FromEnvelopeError::InvalidPeerRecord` and `signed_envelope::DecodingError`. See [PR 3058]. +- Move `Executor` to `libp2p-swarm`. See [PR 3097]. + [PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031 [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058 +[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 # 0.37.0 diff --git a/core/src/lib.rs b/core/src/lib.rs index 6205d715615..2b20f5156e4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -83,23 +83,3 @@ pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, U #[derive(thiserror::Error, Debug)] #[error(transparent)] pub struct DecodeError(prost::DecodeError); - -use std::{future::Future, pin::Pin}; - -/// Implemented on objects that can run a `Future` in the background. -/// -/// > **Note**: While it may be tempting to implement this trait on types such as -/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is -/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically -/// > be used as fallback by libp2p. The `Executor` trait should therefore only be -/// > about running `Future`s in the background. -pub trait Executor { - /// Run the given future in the background until it ends. - fn exec(&self, future: Pin + Send>>); -} - -impl + Send>>)> Executor for F { - fn exec(&self, f: Pin + Send>>) { - self(f) - } -} diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index d4b0b121553..f69c376b817 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -39,7 +39,7 @@ use libp2p::{ TokioMdns, }, mplex, noise, - swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmEvent}, tcp, Multiaddr, PeerId, Transport, }; use std::error::Error; @@ -97,23 +97,12 @@ async fn main() -> Result<(), Box> { } // Create a Swarm to manage peers and events. - let mut swarm = { - let mdns = TokioMdns::new(Default::default())?; - let mut behaviour = MyBehaviour { - floodsub: Floodsub::new(peer_id), - mdns, - }; - - behaviour.floodsub.subscribe(floodsub_topic.clone()); - - SwarmBuilder::new(transport, behaviour, peer_id) - // We want the connection background tasks to be spawned - // onto the tokio runtime. - .executor(Box::new(|fut| { - tokio::spawn(fut); - })) - .build() + let mdns = TokioMdns::new(Default::default())?; + let behaviour = MyBehaviour { + floodsub: Floodsub::new(peer_id), + mdns, }; + let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { diff --git a/examples/chat.rs b/examples/chat.rs index 26db660b1c2..c7e4b73720f 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box> { }; behaviour.floodsub.subscribe(floodsub_topic.clone()); - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_threadpool_executor(transport, behaviour, local_peer_id) }; // Reach out to another node if specified diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 8d8c6d917ef..1ed87718dbe 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -99,7 +99,7 @@ async fn main() -> Result<(), Box> { let kademlia = Kademlia::new(local_peer_id, store); let mdns = Mdns::new(MdnsConfig::default())?; let behaviour = MyBehaviour { kademlia, mdns }; - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Read full lines from stdin diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 85c5d3d50dd..620ce6cd5d9 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -219,9 +219,7 @@ mod network { ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; - use libp2p::swarm::{ - ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, - }; + use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; @@ -252,7 +250,7 @@ mod network { // Build the Swarm, connecting the lower layer transport logic with the // higher layer network behaviour logic. - let swarm = SwarmBuilder::new( + let swarm = Swarm::with_threadpool_executor( libp2p::development_transport(id_keys).await?, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), @@ -263,8 +261,7 @@ mod network { ), }, peer_id, - ) - .build(); + ); let (command_sender, command_receiver) = mpsc::channel(0); let (event_sender, event_receiver) = mpsc::channel(0); diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index e86d1836746..532ebfff58b 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -110,7 +110,7 @@ async fn main() -> Result<(), Box> { let mut swarm = { let mdns = Mdns::new(MdnsConfig::default())?; let behaviour = MyBehaviour { gossipsub, mdns }; - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Read full lines from stdin diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 659cd49b607..2c370472bec 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -68,7 +68,7 @@ async fn main() -> Result<(), Box> { behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone()); } - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Order Kademlia to search for a peer. diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index ddaa28896f1..1d89f5f9af0 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -204,7 +204,7 @@ async fn main() -> Result<(), Box> { println!("Subscribing to {gossipsub_topic:?}"); behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap(); - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Reach out to other nodes if specified diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index 8231d888dcc..477c9766391 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm that establishes connections through the given transport. // Note that the MDNS behaviour itself will not actually inititiate any connections, // as it only uses UDP. - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; loop { diff --git a/examples/ping.rs b/examples/ping.rs index a2da2834127..5deb8544ccb 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { let transport = libp2p::development_transport(local_key).await?; - let mut swarm = Swarm::new(transport, Behaviour::default(), local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, Behaviour::default(), local_peer_id); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. diff --git a/misc/metrics/examples/metrics/main.rs b/misc/metrics/examples/metrics/main.rs index 1f661cb9ecd..ab1e041bcc4 100644 --- a/misc/metrics/examples/metrics/main.rs +++ b/misc/metrics/examples/metrics/main.rs @@ -70,7 +70,7 @@ fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); info!("Local peer id: {:?}", local_peer_id); - let mut swarm = Swarm::new( + let mut swarm = Swarm::without_executor( block_on(libp2p::development_transport(local_key))?, Behaviour::default(), local_peer_id, diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index bf5dd247b40..a66d20eadd5 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -61,8 +61,10 @@ fn transport_upgrade() { let listen_addr = Multiaddr::from(Protocol::Memory(random::())); - let mut dialer = Swarm::new(dialer_transport, dummy::Behaviour, dialer_id); - let mut listener = Swarm::new(listener_transport, dummy::Behaviour, listener_id); + let mut dialer = + Swarm::with_async_std_executor(dialer_transport, dummy::Behaviour, dialer_id); + let mut listener = + Swarm::with_async_std_executor(listener_transport, dummy::Behaviour, listener_id); listener.listen_on(listen_addr).unwrap(); let (addr_sender, addr_receiver) = oneshot::channel(); diff --git a/protocols/autonat/examples/autonat_client.rs b/protocols/autonat/examples/autonat_client.rs index bdd54ff406d..1c897620db6 100644 --- a/protocols/autonat/examples/autonat_client.rs +++ b/protocols/autonat/examples/autonat_client.rs @@ -67,7 +67,7 @@ async fn main() -> Result<(), Box> { let behaviour = Behaviour::new(local_key.public()); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); swarm.listen_on( Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) diff --git a/protocols/autonat/examples/autonat_server.rs b/protocols/autonat/examples/autonat_server.rs index 82a06b8b55d..a3bcda1ee34 100644 --- a/protocols/autonat/examples/autonat_server.rs +++ b/protocols/autonat/examples/autonat_server.rs @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box> { let behaviour = Behaviour::new(local_key.public()); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); swarm.listen_on( Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 420bcf99829..5e23304e3f2 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -40,7 +40,7 @@ async fn init_swarm(config: Config) -> Swarm { let local_id = PeerId::from_public_key(&keypair.public()); let transport = development_transport(keypair).await.unwrap(); let behaviour = Behaviour::new(local_id, config); - Swarm::new(transport, behaviour, local_id) + Swarm::with_async_std_executor(transport, behaviour, local_id) } async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) { diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index b45ae7ecafc..3035a6d8d9e 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -39,7 +39,7 @@ async fn init_swarm(config: Config) -> Swarm { let local_id = PeerId::from_public_key(&keypair.public()); let transport = development_transport(keypair).await.unwrap(); let behaviour = Behaviour::new(local_id, config); - Swarm::new(transport, behaviour, local_id) + Swarm::with_async_std_executor(transport, behaviour, local_id) } async fn init_server(config: Option) -> (Swarm, PeerId, Multiaddr) { diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index bc33eddcad1..e12ff4ccfa2 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use clap::Parser; -use futures::executor::block_on; +use futures::executor::{block_on, ThreadPool}; use futures::future::FutureExt; use futures::stream::StreamExt; use libp2p::core::multiaddr::{Multiaddr, Protocol}; @@ -155,9 +155,12 @@ fn main() -> Result<(), Box> { dcutr: dcutr::behaviour::Behaviour::new(), }; - let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id) - .dial_concurrency_factor(10_u8.try_into().unwrap()) - .build(); + let mut swarm = match ThreadPool::new() { + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp), + Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id), + } + .dial_concurrency_factor(10_u8.try_into().unwrap()) + .build(); swarm .listen_on( diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 0bc5df7e0eb..8c687835d9f 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -98,7 +98,7 @@ fn build_relay() -> Swarm { let transport = build_transport(MemoryTransport::default().boxed(), local_public_key); - Swarm::new( + Swarm::with_threadpool_executor( transport, relay::Relay::new( local_peer_id, @@ -122,7 +122,7 @@ fn build_client() -> Swarm { local_public_key, ); - Swarm::new( + Swarm::with_threadpool_executor( transport, Client { relay: behaviour, diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 43ad944dccb..db99179b07f 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -171,7 +171,7 @@ fn build_node() -> (Multiaddr, Swarm) { .build() .unwrap(); let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id), config).unwrap(); - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::without_executor(transport, behaviour, peer_id); let port = 1 + random::(); let mut addr: Multiaddr = Protocol::Memory(port).into(); diff --git a/protocols/identify/examples/identify.rs b/protocols/identify/examples/identify.rs index b02eb1c9ebf..6f5fb2a1427 100644 --- a/protocols/identify/examples/identify.rs +++ b/protocols/identify/examples/identify.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { local_key.public(), )); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 92279765d5a..2215fde0ee8 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -584,7 +584,7 @@ mod tests { let protocol = Behaviour::new( Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -593,7 +593,7 @@ mod tests { let protocol = Behaviour::new( Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -661,7 +661,7 @@ mod tests { let (mut swarm1, pubkey1) = { let (pubkey, transport) = transport(); let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone())); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -670,7 +670,7 @@ mod tests { let protocol = Behaviour::new( Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -742,7 +742,7 @@ mod tests { .with_initial_delay(Duration::from_secs(10)), ); - Swarm::new(transport, protocol, pubkey.to_peer_id()) + Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) }; let mut swarm2 = { @@ -751,7 +751,7 @@ mod tests { Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - Swarm::new(transport, protocol, pubkey.to_peer_id()) + Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) }; let swarm1_peer_id = *swarm1.local_peer_id(); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index c61ffaf158f..47da12904bb 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -66,7 +66,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let store = MemoryStore::new(local_id); let behaviour = Kademlia::with_config(local_id, store, cfg); - let mut swarm = Swarm::new(transport, behaviour, local_id); + let mut swarm = Swarm::without_executor(transport, behaviour, local_id); let address: Multiaddr = Protocol::Memory(random::()).into(); swarm.listen_on(address.clone()).unwrap(); diff --git a/protocols/mdns/tests/use-async-std.rs b/protocols/mdns/tests/use-async-std.rs index 2ddb36355be..3774179fefa 100644 --- a/protocols/mdns/tests/use-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -62,7 +62,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box> let peer_id = PeerId::from(id_keys.public()); let transport = libp2p::development_transport(id_keys).await?; let behaviour = Mdns::new(config)?; - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) } diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 830557d3f00..dfd2d7a08c8 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -58,7 +58,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box(1); @@ -127,10 +128,11 @@ fn max_failures() { .with_max_failures(max_failures.into()); let (peer1_id, trans) = mk_transport(muxer); - let mut swarm1 = Swarm::new(trans, Behaviour::new(cfg.clone()), peer1_id); + let mut swarm1 = + Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); let (peer2_id, trans) = mk_transport(muxer); - let mut swarm2 = Swarm::new(trans, Behaviour::new(cfg), peer2_id); + let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -197,10 +199,10 @@ fn max_failures() { #[test] fn unsupported_doesnt_fail() { let (peer1_id, trans) = mk_transport(MuxerChoice::Mplex); - let mut swarm1 = Swarm::new(trans, keep_alive::Behaviour, peer1_id); + let mut swarm1 = Swarm::with_async_std_executor(trans, keep_alive::Behaviour, peer1_id); let (peer2_id, trans) = mk_transport(MuxerChoice::Mplex); - let mut swarm2 = Swarm::new(trans, Behaviour::default(), peer2_id); + let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::default(), peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); diff --git a/protocols/relay/examples/relay_v2.rs b/protocols/relay/examples/relay_v2.rs index 102637ca2fc..137132e6956 100644 --- a/protocols/relay/examples/relay_v2.rs +++ b/protocols/relay/examples/relay_v2.rs @@ -66,7 +66,7 @@ fn main() -> Result<(), Box> { )), }; - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::without_executor(transport, behaviour, local_peer_id); // Listen on all interfaces let listen_addr = Multiaddr::empty() diff --git a/protocols/relay/tests/v2.rs b/protocols/relay/tests/v2.rs index 34e6e8ee6ce..b31d6866228 100644 --- a/protocols/relay/tests/v2.rs +++ b/protocols/relay/tests/v2.rs @@ -291,7 +291,7 @@ fn build_relay() -> Swarm { let transport = upgrade_transport(MemoryTransport::default().boxed(), local_public_key); - Swarm::new( + Swarm::with_threadpool_executor( transport, Relay { ping: ping::Behaviour::new(ping::Config::new()), @@ -318,7 +318,7 @@ fn build_client() -> Swarm { local_public_key, ); - Swarm::new( + Swarm::with_threadpool_executor( transport, Client { ping: ping::Behaviour::new(ping::Config::new()), diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index 0d1a6daca58..d400301bdbe 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -25,7 +25,7 @@ use libp2p::multiaddr::Protocol; use libp2p::ping; use libp2p::swarm::{keep_alive, SwarmEvent}; use libp2p::Swarm; -use libp2p::{development_transport, rendezvous, Multiaddr}; +use libp2p::{rendezvous, tokio_development_transport, Multiaddr}; use std::time::Duration; use void::Void; @@ -41,8 +41,8 @@ async fn main() { .parse() .unwrap(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs index 471ff739426..903a79610d8 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/protocols/rendezvous/examples/register.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::ping; use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::Multiaddr; -use libp2p::{development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use libp2p_swarm::AddressScore; use std::time::Duration; @@ -39,8 +39,8 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index bf30fa906f7..06844be1281 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -25,7 +25,7 @@ use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::Multiaddr; -use libp2p::{development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use std::time::Duration; use void::Void; @@ -40,8 +40,8 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index c8688b98573..1e98f73d6da 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use void::Void; /// Examples for the rendezvous protocol: @@ -43,8 +43,8 @@ async fn main() { let key = identity::ed25519::SecretKey::from_bytes(bytes).expect("we always pass 32 bytes"); let identity = identity::Keypair::Ed25519(key.into()); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index cad3a087afb..523f34c76db 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -28,7 +28,7 @@ use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; use libp2p::noise::NoiseAuthenticated; -use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::yamux::YamuxConfig; use std::fmt::Debug; use std::time::Duration; @@ -53,11 +53,7 @@ where .timeout(Duration::from_secs(5)) .boxed(); - SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id) - .executor(Box::new(|future| { - let _ = tokio::spawn(future); - })) - .build() + Swarm::with_tokio_executor(transport, behaviour_fn(peer_id, identity), peer_id) } fn get_rand_memory_address() -> Multiaddr { diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 77f8efd8fec..e97b725d4af 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -48,7 +48,7 @@ fn is_response_outbound() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let request_id1 = swarm1 .behaviour_mut() @@ -87,11 +87,11 @@ fn ping_protocol() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -176,11 +176,11 @@ fn emits_inbound_connection_closed_failure() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); swarm1.listen_on(addr).unwrap(); @@ -245,11 +245,11 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); swarm1.listen_on(addr).unwrap(); diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f66828db64a..f04da3da77b 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -5,9 +5,70 @@ - Export `NetworkBehaviour` derive as `libp2p_swarm::NetworkBehaviour`. This follows the convention of other popular libraries. `serde` for example exports the `Serialize` trait and macro as `serde::Serialize`. See [PR 3055]. + - Feature-gate `NetworkBehaviour` macro behind `macros` feature flag. See [PR 3055]. +- Make executor in Swarm constructor explicit. See [PR 3097]. + + Supported executors: + - Tokio + + Previously + ```rust + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|fut| { + tokio::spawn(fut); + })) + .build(); + ``` + Now + ```rust + let swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id); + ``` + - Async Std + + Previously + ```rust + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|fut| { + async_std::task::spawn(fut); + })) + .build(); + ``` + Now + ```rust + let swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); + ``` + - ThreadPool (see [Issue 3107]) + + In most cases ThreadPool can be replaced by executors or spawning on the local task. + + Previously + ```rust + let swarm = Swarm::new(transport, behaviour, peer_id); + ``` + + Now + ```rust + let swarm = Swarm::with_threadpool_executor(transport, behaviour, peer_id); + ``` + - Without + + Spawns the tasks on the current task, this may result in bad performance so try to use an executor where possible. Previously this was just a fallback when no executor was specified and constructing a `ThreadPool` failed. + + New + ```rust + let swarm = Swarm::without_executor(transport, behaviour, peer_id); + ``` + + Deprecated APIs: + - `Swarm::new` + - `SwarmBuilder::new` + - `SwarmBuilder::executor` + [PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055 +[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 +[Issue 3107]: https://github.com/libp2p/rust-libp2p/issues/3107 # 0.40.1 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 815013bc34f..a6f9a91a5ca 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -24,9 +24,13 @@ rand = "0.8" smallvec = "1.6.1" thiserror = "1.0" void = "1" +tokio = { version = "1.15", features = ["rt"], optional = true } +async-std = { version = "1.6.2", optional = true } [features] macros = ["dep:libp2p-swarm-derive"] +tokio = ["dep:tokio"] +async-std = ["dep:async-std"] [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index cc6a9bbd816..8729b2e36e1 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -54,10 +54,34 @@ use void::Void; mod concurrent_dial; mod task; +enum ExecSwitch { + Executor(Box), + LocalSpawn(FuturesUnordered + Send>>>), +} + +impl ExecSwitch { + fn advance_local(&mut self, cx: &mut Context) { + match self { + ExecSwitch::Executor(_) => {} + ExecSwitch::LocalSpawn(local) => { + while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {} + } + } + } + + fn spawn(&mut self, task: BoxFuture<'static, ()>) { + match self { + Self::Executor(executor) => executor.exec(task), + Self::LocalSpawn(local) => local.push(task), + } + } +} + /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where TTrans: Transport, + THandler: IntoConnectionHandler, { local_id: PeerId, @@ -93,14 +117,9 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, - /// The executor to use for running the background tasks. If `None`, - /// the tasks are kept in `local_spawns` instead and polled on the - /// current thread when the [`Pool`] is polled for new events. - executor: Option>, - - /// If no `executor` is configured, tasks are kept in this set and - /// polled on the current thread when the [`Pool`] is polled for new events. - local_spawns: FuturesUnordered + Send>>>, + /// The executor to use for running connection tasks. Can either be a global executor + /// or a local queue. + executor: ExecSwitch, /// Sender distributed to pending tasks for reporting events back /// to the pool. @@ -299,6 +318,10 @@ where mpsc::channel(config.task_event_buffer_size); let (established_connection_events_tx, established_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); + let executor = match config.executor { + Some(exec) => ExecSwitch::Executor(exec), + None => ExecSwitch::LocalSpawn(Default::default()), + }; Pool { local_id, counters: ConnectionCounters::new(limits), @@ -309,8 +332,7 @@ where dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, - executor: config.executor, - local_spawns: FuturesUnordered::new(), + executor, pending_connection_events_tx, pending_connection_events_rx, established_connection_events_tx, @@ -399,11 +421,7 @@ where } fn spawn(&mut self, task: BoxFuture<'static, ()>) { - if let Some(executor) = &mut self.executor { - executor.exec(task); - } else { - self.local_spawns.push(task); - } + self.executor.spawn(task) } } @@ -820,8 +838,7 @@ where } } - // Advance the tasks in `local_spawns`. - while let Poll::Ready(Some(())) = self.local_spawns.poll_next_unpin(cx) {} + self.executor.advance_local(cx); Poll::Pending } @@ -1073,34 +1090,21 @@ pub struct PoolConfig { max_negotiating_inbound_streams: usize, } -impl Default for PoolConfig { - fn default() -> Self { - PoolConfig { - executor: None, - task_event_buffer_size: 32, - task_command_buffer_size: 7, - // Set to a default of 8 based on frequency of dialer connections +impl PoolConfig { + pub fn new(executor: Option>) -> Self { + Self { + executor, + task_command_buffer_size: 32, + task_event_buffer_size: 7, dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } } -} -impl PoolConfig { /// Configures the executor to use for spawning connection background tasks. - pub fn with_executor(mut self, e: Box) -> Self { - self.executor = Some(e); - self - } - - /// Configures the executor to use for spawning connection background tasks, - /// only if no executor has already been configured. - pub fn or_else_with_executor(mut self, f: F) -> Self - where - F: FnOnce() -> Option>, - { - self.executor = self.executor.or_else(f); + pub fn with_executor(mut self, executor: Box) -> Self { + self.executor = Some(executor); self } @@ -1174,13 +1178,4 @@ mod tests { impl Executor for Dummy { fn exec(&self, _: Pin + Send>>) {} } - - #[test] - fn set_executor() { - PoolConfig::default() - .with_executor(Box::new(Dummy)) - .with_executor(Box::new(|f| { - async_std::task::spawn(f); - })); - } } diff --git a/swarm/src/executor.rs b/swarm/src/executor.rs new file mode 100644 index 00000000000..7799d141d49 --- /dev/null +++ b/swarm/src/executor.rs @@ -0,0 +1,48 @@ +use futures::executor::ThreadPool; +use std::{future::Future, pin::Pin}; + +/// Implemented on objects that can run a `Future` in the background. +/// +/// > **Note**: While it may be tempting to implement this trait on types such as +/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is +/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically +/// > be used as fallback by libp2p. The `Executor` trait should therefore only be +/// > about running `Future`s on a separate task. +pub trait Executor { + /// Run the given future in the background until it ends. + fn exec(&self, future: Pin + Send>>); +} + +impl + Send>>)> Executor for F { + fn exec(&self, f: Pin + Send>>) { + self(f) + } +} + +impl Executor for ThreadPool { + fn exec(&self, future: Pin + Send>>) { + self.spawn_ok(future) + } +} + +#[cfg(feature = "tokio")] +#[derive(Default, Debug, Clone, Copy)] +pub(crate) struct TokioExecutor; + +#[cfg(feature = "tokio")] +impl Executor for TokioExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = tokio::spawn(future); + } +} + +#[cfg(feature = "async-std")] +#[derive(Default, Debug, Clone, Copy)] +pub(crate) struct AsyncStdExecutor; + +#[cfg(feature = "async-std")] +impl Executor for AsyncStdExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = async_std::task::spawn(future); + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6927cfa068e..6fc1b8707c3 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -64,6 +64,7 @@ mod upgrade; pub mod behaviour; pub mod dial_opts; pub mod dummy; +mod executor; pub mod handler; pub mod keep_alive; @@ -94,6 +95,7 @@ pub use connection::{ ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; +pub use executor::Executor; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, @@ -117,7 +119,7 @@ use libp2p_core::{ muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, upgrade::ProtocolName, - Endpoint, Executor, Multiaddr, Negotiated, PeerId, Transport, + Endpoint, Multiaddr, Negotiated, PeerId, Transport, }; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; @@ -328,12 +330,89 @@ where TBehaviour: NetworkBehaviour, { /// Builds a new `Swarm`. + #[deprecated( + since = "0.41.0", + note = "This constructor is considered ambiguous regarding the executor. Use one of the new, executor-specific constructors or `Swarm::with_threadpool_executor` for the same behaviour." + )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::new(transport, behaviour, local_peer_id).build() + Self::with_threadpool_executor(transport, behaviour, local_peer_id) + } + + /// Builds a new `Swarm` with a provided executor. + pub fn with_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + executor: impl Executor + Send + 'static, + ) -> Self { + SwarmBuilder::with_executor(transport, behaviour, local_peer_id, executor).build() + } + + /// Builds a new `Swarm` with a tokio executor. + #[cfg(feature = "tokio")] + pub fn with_tokio_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self::with_executor( + transport, + behaviour, + local_peer_id, + crate::executor::TokioExecutor, + ) + } + + /// Builds a new `Swarm` with an async-std executor. + #[cfg(feature = "async-std")] + pub fn with_async_std_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self::with_executor( + transport, + behaviour, + local_peer_id, + crate::executor::AsyncStdExecutor, + ) + } + + /// Builds a new `Swarm` with a threadpool executor. + pub fn with_threadpool_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + let builder = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + { + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp), + Err(err) => { + log::warn!("Failed to create executor thread pool: {:?}", err); + SwarmBuilder::without_executor(transport, behaviour, local_peer_id) + } + }; + builder.build() + } + + /// Builds a new `Swarm` without an executor, instead using the current task. + /// + /// ## ⚠️ Performance warning + /// All connections will be polled on the current task, thus quite bad performance + /// characteristics should be expected. Whenever possible use an executor and + /// [`Swarm::with_executor`]. + pub fn without_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build() } /// Returns information about the connections underlying the [`Swarm`]. @@ -1294,16 +1373,67 @@ where /// Creates a new `SwarmBuilder` from the given transport, behaviour and /// local peer ID. The `Swarm` with its underlying `Network` is obtained /// via [`SwarmBuilder::build`]. + #[deprecated( + since = "0.41.0", + note = "Use `SwarmBuilder::with_executor` or `SwarmBuilder::without_executor` instead." + )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { + let executor: Option> = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + .ok() + { + Some(tp) => Some(Box::new(tp)), + None => None, + }; SwarmBuilder { local_peer_id, transport, behaviour, - pool_config: Default::default(), + pool_config: PoolConfig::new(executor), + connection_limits: Default::default(), + } + } + + /// Creates a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and + /// executor. The `Swarm` with its underlying `Network` is obtained via + /// [`SwarmBuilder::build`]. + pub fn with_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + executor: impl Executor + Send + 'static, + ) -> Self { + Self { + local_peer_id, + transport, + behaviour, + pool_config: PoolConfig::new(Some(Box::new(executor))), + connection_limits: Default::default(), + } + } + + /// Creates a new [`SwarmBuilder`] from the given transport, behaviour and local peer ID. The + /// `Swarm` with its underlying `Network` is obtained via [`SwarmBuilder::build`]. + /// + /// ## ⚠️ Performance warning + /// All connections will be polled on the current task, thus quite bad performance + /// characteristics should be expected. Whenever possible use an executor and + /// [`SwarmBuilder::with_executor`]. + pub fn without_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self { + local_peer_id, + transport, + behaviour, + pool_config: PoolConfig::new(None), connection_limits: Default::default(), } } @@ -1313,8 +1443,9 @@ where /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). - pub fn executor(mut self, e: Box) -> Self { - self.pool_config = self.pool_config.with_executor(e); + #[deprecated(since = "0.41.0", note = "Use `SwarmBuilder::with_executor` instead.")] + pub fn executor(mut self, executor: Box) -> Self { + self.pool_config = self.pool_config.with_executor(executor); self } @@ -1412,25 +1543,10 @@ where .map(|info| info.protocol_name().to_vec()) .collect(); - // If no executor has been explicitly configured, try to set up a thread pool. - let pool_config = - self.pool_config.or_else_with_executor(|| { - match ThreadPoolBuilder::new() - .name_prefix("libp2p-swarm-task-") - .create() - { - Ok(tp) => Some(Box::new(move |f| tp.spawn_ok(f))), - Err(err) => { - log::warn!("Failed to create executor thread pool: {:?}", err); - None - } - } - }); - Swarm { local_peer_id: self.local_peer_id, transport: self.transport, - pool: Pool::new(self.local_peer_id, pool_config, self.connection_limits), + pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits), behaviour: self.behaviour, supported_protocols, listened_addrs: HashMap::new(), @@ -1586,6 +1702,7 @@ mod tests { use super::*; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::executor::block_on; + use futures::executor::ThreadPool; use futures::future::poll_fn; use futures::future::Either; use futures::{executor, future, ready}; @@ -1622,7 +1739,12 @@ mod tests { .multiplex(yamux::YamuxConfig::default()) .boxed(); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - SwarmBuilder::new(transport, behaviour, local_public_key.into()) + match ThreadPool::new().ok() { + Some(tp) => { + SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp) + } + None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), + } } fn swarms_connected( diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 1def8717e01..d30753b8fb5 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -65,7 +65,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p::yamux::YamuxConfig::default()) .boxed(); - Swarm::new( + Swarm::without_executor( transport, keep_alive::Behaviour, identity.public().to_peer_id(),