From 0c5fa28b5a97276cb4ccbb9335b6588253d551e7 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 24 Feb 2022 14:41:48 +0100 Subject: [PATCH 1/2] feat: remove Send bound from NetworkBehaviour --- protocols/dcutr/tests/lib.rs | 2 +- protocols/relay/tests/v1.rs | 2 +- protocols/relay/tests/v2.rs | 2 +- protocols/rendezvous/tests/harness.rs | 6 +++--- swarm/CHANGELOG.md | 5 +++++ swarm/src/behaviour.rs | 2 +- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index a53a5319f34..6a10d1049e4 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -175,7 +175,7 @@ impl From for ClientEvent { } } -fn spawn_swarm_on_pool(pool: &LocalPool, swarm: Swarm) { +fn spawn_swarm_on_pool(pool: &LocalPool, swarm: Swarm) { pool.spawner() .spawn_obj(swarm.collect::>().map(|_| ()).boxed().into()) .unwrap(); diff --git a/protocols/relay/tests/v1.rs b/protocols/relay/tests/v1.rs index 6d6f97ac700..1da368f68c2 100644 --- a/protocols/relay/tests/v1.rs +++ b/protocols/relay/tests/v1.rs @@ -1369,7 +1369,7 @@ fn build_keep_alive_only_swarm() -> Swarm { ) } -fn spawn_swarm_on_pool(pool: &LocalPool, mut swarm: Swarm) { +fn spawn_swarm_on_pool(pool: &LocalPool, mut swarm: Swarm) { pool.spawner() .spawn_obj( async move { diff --git a/protocols/relay/tests/v2.rs b/protocols/relay/tests/v2.rs index ea4b8b8fb3c..c0d084aa606 100644 --- a/protocols/relay/tests/v2.rs +++ b/protocols/relay/tests/v2.rs @@ -397,7 +397,7 @@ impl From for ClientEvent { } } -fn spawn_swarm_on_pool(pool: &LocalPool, swarm: Swarm) { +fn spawn_swarm_on_pool(pool: &LocalPool, swarm: Swarm) { pool.spawner() .spawn_obj(swarm.collect::>().map(|_| ()).boxed().into()) .unwrap(); diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 3709e509a06..3602d666c0f 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -140,7 +140,7 @@ pub trait SwarmExt { /// Establishes a connection to the given [`Swarm`], polling both of them until the connection is established. async fn block_on_connection(&mut self, other: &mut Swarm) where - T: NetworkBehaviour, + T: NetworkBehaviour + Send, ::OutEvent: Debug; /// Listens on a random memory address, polling the [`Swarm`] until the transport is ready to accept connections. @@ -153,12 +153,12 @@ pub trait SwarmExt { #[async_trait] impl SwarmExt for Swarm where - B: NetworkBehaviour, + B: NetworkBehaviour + Send, ::OutEvent: Debug, { async fn block_on_connection(&mut self, other: &mut Swarm) where - T: NetworkBehaviour, + T: NetworkBehaviour + Send, ::OutEvent: Debug, { let addr_to_dial = other.external_addresses().next().unwrap().addr.clone(); diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index a142e2327a2..9a491198a9b 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,8 @@ + +# Unreleased + +- Remove `Send` bound from `NetworkBehaviour`. + # 0.34.0 [2022-02-22] - Rename `ProtocolsHandler` to `ConnectionHandler`. Upgrade should be as simple as renaming all diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 20b607c3342..6e2a163cc2c 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -165,7 +165,7 @@ pub(crate) type THandlerOutEvent = /// Optionally one can provide a custom `poll` function through the `#[behaviour(poll_method = /// "poll")]` attribute. This function must have the same signature as the [`NetworkBehaviour#poll`] /// function and will be called last within the generated [`NetworkBehaviour`] implementation. -pub trait NetworkBehaviour: Send + 'static { +pub trait NetworkBehaviour: 'static { /// Handler for all the protocols the network behaviour supports. type ConnectionHandler: IntoConnectionHandler; From 88cc599dfde60cecc06dd827dfc1add4c1be54ef Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 24 Feb 2022 15:43:26 +0100 Subject: [PATCH 2/2] feat(swarm): keep Connections on the same Task We know upgrade a PendingConnection on the same Task, instead of moving it back to the main Task, and spawning another one --- swarm/src/connection/pool.rs | 61 +++++++-------- swarm/src/connection/pool/task.rs | 123 ++++++++++++++++++++++++++---- 2 files changed, 134 insertions(+), 50 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index bf21976f749..5a723bcd49f 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -33,12 +33,11 @@ use fnv::FnvHashMap; use futures::prelude::*; use futures::{ channel::{mpsc, oneshot}, - future::{poll_fn, BoxFuture, Either}, - ready, + future::{BoxFuture, Either}, stream::FuturesUnordered, }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; +use libp2p_core::muxing::StreamMuxerBox; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -98,10 +97,12 @@ where /// Sender distributed to pending tasks for reporting events back /// to the pool. - pending_connection_events_tx: mpsc::Sender>, + pending_connection_events_tx: + mpsc::Sender>, /// Receiver for events reported from pending tasks. - pending_connection_events_rx: mpsc::Receiver>, + pending_connection_events_rx: + mpsc::Receiver>, /// Sender distributed to established tasks for reporting events back /// to the pool. @@ -485,7 +486,7 @@ where dial_concurrency_factor_override: Option, ) -> Result where - TTrans: Clone + Send, + TTrans: Transport + Clone + Send, TTrans::Dial: Send + 'static, { if let Err(limit) = self.counters.check_max_pending_outgoing() { @@ -541,6 +542,7 @@ where info: IncomingInfo<'_>, ) -> Result where + TTrans: Transport, TFut: Future> + Send + 'static, { let endpoint = info.to_connected_point(); @@ -673,7 +675,9 @@ where match event { task::PendingConnectionEvent::ConnectionEstablished { id, - output: (obtained_peer_id, muxer), + // output: (obtained_peer_id, muxer), + obtained_peer_id, + response, outgoing, } => { let PendingConnectionInfo { @@ -759,20 +763,8 @@ where }); if let Err(error) = error { - self.spawn( - poll_fn(move |cx| { - if let Err(e) = ready!(muxer.close(cx)) { - log::debug!( - "Failed to close connection {:?} to peer {}: {:?}", - id, - obtained_peer_id, - e - ); - } - Poll::Ready(()) - }) - .boxed(), - ); + // send message to PendingConnection + let _ = response.send(task::PendingCommand::Close); match endpoint { ConnectedPoint::Dialer { .. } => { @@ -815,21 +807,22 @@ where }, ); - let connection = super::Connection::new( - muxer, - handler.into_handler(&obtained_peer_id, &endpoint), - self.substream_upgrade_protocol_override, - ); - self.spawn( - task::new_for_established_connection( + // Send message to upgrade pending connection to upgrade to a full connection + let cmd = task::PendingCommand::Upgrade { + handler: handler.into_handler(&obtained_peer_id, &endpoint), + substream_upgrade_protocol_override: self + .substream_upgrade_protocol_override, + command_receiver, + events: self.established_connection_events_tx.clone(), + }; + if response.send(cmd).is_err() { + // TODO: what else do we want to do if the task is gone? + log::debug!( + "Failed to upgrade connection {:?} to peer {}: Task is gone", id, obtained_peer_id, - connection, - command_receiver, - self.established_connection_events_tx.clone(), - ) - .boxed(), - ); + ); + } match self.get(id) { Some(PoolConnection::Established(connection)) => { diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 866049e50da..01cefdfdf8e 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -24,7 +24,8 @@ use super::concurrent_dial::ConcurrentDial; use crate::{ connection::{ - self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, + self, Connection, ConnectionError, PendingInboundConnectionError, + PendingOutboundConnectionError, }, transport::{Transport, TransportError}, ConnectionHandler, Multiaddr, PeerId, @@ -34,7 +35,7 @@ use futures::{ future::{poll_fn, Either, Future}, SinkExt, StreamExt, }; -use libp2p_core::connection::ConnectionId; +use libp2p_core::{connection::ConnectionId, muxing::StreamMuxerBox, upgrade, StreamMuxer}; use std::pin::Pin; use void::Void; @@ -48,14 +49,30 @@ pub enum Command { Close, } +/// Commands that can be sent to a task driving a pending connection. #[derive(Debug)] -pub enum PendingConnectionEvent +pub enum PendingCommand { + /// Upgrade from pending to established connection. + Upgrade { + handler: THandler, + substream_upgrade_protocol_override: Option, + command_receiver: mpsc::Receiver>, + events: mpsc::Sender>, + }, + /// Close the connection, due to an error, and terminate the task. + Close, +} + +#[derive(Debug)] +pub enum PendingConnectionEvent where TTrans: Transport, + THandler: ConnectionHandler, { ConnectionEstablished { id: ConnectionId, - output: TTrans::Output, + obtained_peer_id: PeerId, + response: oneshot::Sender>, /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. @@ -97,13 +114,14 @@ pub enum EstablishedConnectionEvent { }, } -pub async fn new_for_pending_outgoing_connection( +pub async fn new_for_pending_outgoing_connection( connection_id: ConnectionId, dial: ConcurrentDial, abort_receiver: oneshot::Receiver, - mut events: mpsc::Sender>, + mut events: mpsc::Sender>, ) where - TTrans: Transport, + TTrans: Transport, + THandler: ConnectionHandler, { match futures::future::select(abort_receiver, Box::pin(dial)).await { Either::Left((Err(oneshot::Canceled), _)) => { @@ -115,14 +133,50 @@ pub async fn new_for_pending_outgoing_connection( .await; } Either::Left((Ok(v), _)) => void::unreachable(v), - Either::Right((Ok((address, output, errors)), _)) => { + Either::Right((Ok((address, (obtained_peer_id, muxer), errors)), _)) => { + let (response, receiver) = oneshot::channel(); let _ = events .send(PendingConnectionEvent::ConnectionEstablished { id: connection_id, - output, + obtained_peer_id, + response, outgoing: Some((address, errors)), }) .await; + + match receiver.await { + Ok(PendingCommand::Upgrade { + handler, + substream_upgrade_protocol_override, + command_receiver, + events, + }) => { + // Upgrade to Connection + let connection = + Connection::new(muxer, handler, substream_upgrade_protocol_override); + new_for_established_connection( + connection_id, + obtained_peer_id, + connection, + command_receiver, + events, + ) + .await + } + Ok(PendingCommand::Close) => { + if let Err(e) = poll_fn(move |cx| muxer.close(cx)).await { + log::debug!( + "Failed to close connection {:?} to peer {}: {:?}", + connection_id, + obtained_peer_id, + e + ); + } + } + Err(_) => { + // Shutting down, nothing we can do about this. + } + } } Either::Right((Err(e), _)) => { let _ = events @@ -135,14 +189,15 @@ pub async fn new_for_pending_outgoing_connection( } } -pub async fn new_for_pending_incoming_connection( +pub async fn new_for_pending_incoming_connection( connection_id: ConnectionId, future: TFut, abort_receiver: oneshot::Receiver, - mut events: mpsc::Sender>, + mut events: mpsc::Sender>, ) where - TTrans: Transport, + TTrans: Transport, TFut: Future> + Send + 'static, + THandler: ConnectionHandler, { match futures::future::select(abort_receiver, Box::pin(future)).await { Either::Left((Err(oneshot::Canceled), _)) => { @@ -154,14 +209,50 @@ pub async fn new_for_pending_incoming_connection( .await; } Either::Left((Ok(v), _)) => void::unreachable(v), - Either::Right((Ok(output), _)) => { + Either::Right((Ok((obtained_peer_id, muxer)), _)) => { + let (response, receiver) = oneshot::channel(); let _ = events .send(PendingConnectionEvent::ConnectionEstablished { id: connection_id, - output, + obtained_peer_id, + response, outgoing: None, }) .await; + + match receiver.await { + Ok(PendingCommand::Upgrade { + handler, + substream_upgrade_protocol_override, + command_receiver, + events, + }) => { + // Upgrade to Connection + let connection = + Connection::new(muxer, handler, substream_upgrade_protocol_override); + new_for_established_connection( + connection_id, + obtained_peer_id, + connection, + command_receiver, + events, + ) + .await + } + Ok(PendingCommand::Close) => { + if let Err(e) = poll_fn(move |cx| muxer.close(cx)).await { + log::debug!( + "Failed to close connection {:?} to peer {}: {:?}", + connection_id, + obtained_peer_id, + e + ); + } + } + Err(_) => { + // Shutting down, nothing we can do about this. + } + } } Either::Right((Err(e), _)) => { let _ = events @@ -176,10 +267,10 @@ pub async fn new_for_pending_incoming_connection( } } -pub async fn new_for_established_connection( +async fn new_for_established_connection( connection_id: ConnectionId, peer_id: PeerId, - mut connection: crate::connection::Connection, + mut connection: Connection, mut command_receiver: mpsc::Receiver>, mut events: mpsc::Sender>, ) where