From 59bcc9f1a152487d47f2823ef5a4af4013b727f3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 21 Dec 2022 14:13:17 +1100 Subject: [PATCH 1/6] Don't be generic over transport --- swarm/src/connection/pool.rs | 46 ++++++++++--------------------- swarm/src/connection/pool/task.rs | 35 ++++++++++------------- swarm/src/lib.rs | 12 ++++---- 3 files changed, 35 insertions(+), 58 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 7a81c57e2df..482c27493c8 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -26,7 +26,7 @@ use crate::{ Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - transport::{Transport, TransportError}, + transport::TransportError, ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId, }; use concurrent_dial::ConcurrentDial; @@ -79,9 +79,8 @@ impl ExecSwitch { } /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where - TTrans: Transport, THandler: IntoConnectionHandler, { local_id: PeerId, @@ -124,10 +123,10 @@ where /// Sender distributed to pending tasks for reporting events back /// to the pool. - pending_connection_events_tx: mpsc::Sender>, + pending_connection_events_tx: mpsc::Sender, /// Receiver for events reported from pending tasks. - pending_connection_events_rx: mpsc::Receiver>, + pending_connection_events_rx: mpsc::Receiver, /// Sender distributed to established tasks for reporting events back /// to the pool. @@ -213,7 +212,7 @@ impl PendingConnection { } } -impl fmt::Debug for Pool { +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -223,10 +222,7 @@ impl fmt::Debug for Pool -where - TTrans: Transport, -{ +pub enum PoolEvent { /// A new connection has been established. ConnectionEstablished { id: ConnectionId, @@ -239,7 +235,7 @@ where /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. - concurrent_dial_errors: Option)>>, + concurrent_dial_errors: Option)>>, /// How long it took to establish this connection. established_in: std::time::Duration, }, @@ -272,7 +268,7 @@ where /// The ID of the failed connection. id: ConnectionId, /// The error that occurred. - error: PendingOutboundConnectionError, + error: PendingOutboundConnectionError, /// The handler that was supposed to handle the connection. handler: THandler, /// The (expected) peer of the failed connection. @@ -288,7 +284,7 @@ where /// Local connection address. local_addr: Multiaddr, /// The error that occurred. - error: PendingInboundConnectionError, + error: PendingInboundConnectionError, /// The handler that was supposed to handle the connection. handler: THandler, }, @@ -312,10 +308,9 @@ where }, } -impl Pool +impl Pool where THandler: IntoConnectionHandler, - TTrans: Transport, { /// Creates a new empty `Pool`. pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { @@ -429,12 +424,9 @@ where } } -impl Pool +impl Pool where THandler: IntoConnectionHandler, - TTrans: Transport + 'static, - TTrans::Output: Send + 'static, - TTrans::Error: Send + 'static, { /// Adds a pending outgoing connection to the pool in the form of a `Future` /// that establishes and negotiates the connection. @@ -448,10 +440,7 @@ where 'static, ( Multiaddr, - Result< - ::Output, - TransportError<::Error>, - >, + Result<(PeerId, StreamMuxerBox), TransportError>, ), >, >, @@ -459,11 +448,7 @@ where handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result - where - TTrans: Send, - TTrans::Dial: Send + 'static, - { + ) -> Result { if let Err(limit) = self.counters.check_max_pending_outgoing() { return Err((limit, handler)); }; @@ -515,7 +500,7 @@ where info: IncomingInfo<'_>, ) -> Result where - TFut: Future> + Send + 'static, + TFut: Future> + Send + 'static, { let endpoint = info.create_connected_point(); @@ -552,9 +537,8 @@ where } /// Polls the connection pool for events. - pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where - TTrans: Transport, THandler: IntoConnectionHandler + 'static, THandler::Handler: ConnectionHandler + Send, ::OutboundOpenInfo: Send, diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 8e1129d8cae..f6e98beb092 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -26,7 +26,7 @@ use crate::{ connection::{ self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - transport::{Transport, TransportError}, + transport::{TransportError}, ConnectionHandler, Multiaddr, PeerId, }; use futures::{ @@ -35,6 +35,8 @@ use futures::{ SinkExt, StreamExt, }; use libp2p_core::connection::ConnectionId; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::Boxed; use std::pin::Pin; use void::Void; @@ -48,25 +50,21 @@ pub enum Command { Close, } -#[derive(Debug)] -pub enum PendingConnectionEvent -where - TTrans: Transport, -{ +pub enum PendingConnectionEvent { ConnectionEstablished { id: ConnectionId, - output: TTrans::Output, + output: (PeerId, StreamMuxerBox), /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. - outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError)>)>, + outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError)>)>, }, /// A pending connection failed. PendingFailed { id: ConnectionId, error: Either< - PendingOutboundConnectionError, - PendingInboundConnectionError, + PendingOutboundConnectionError, + PendingInboundConnectionError, >, }, } @@ -97,14 +95,12 @@ pub enum EstablishedConnectionEvent { }, } -pub async fn new_for_pending_outgoing_connection( +pub async fn new_for_pending_outgoing_connection( connection_id: ConnectionId, - dial: ConcurrentDial, + dial: ConcurrentDial>, abort_receiver: oneshot::Receiver, - mut events: mpsc::Sender>, -) where - TTrans: Transport, -{ + mut events: mpsc::Sender, +) { match futures::future::select(abort_receiver, Box::pin(dial)).await { Either::Left((Err(oneshot::Canceled), _)) => { let _ = events @@ -135,14 +131,13 @@ pub async fn new_for_pending_outgoing_connection( } } -pub async fn new_for_pending_incoming_connection( +pub async fn new_for_pending_incoming_connection( connection_id: ConnectionId, future: TFut, abort_receiver: oneshot::Receiver, - mut events: mpsc::Sender>, + mut events: mpsc::Sender, ) where - TTrans: Transport, - TFut: Future> + Send + 'static, + TFut: Future> + Send + 'static, { match futures::future::select(abort_receiver, Box::pin(future)).await { Either::Left((Err(oneshot::Canceled), _)) => { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cf6051e1e85..ac7bf603e93 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -305,7 +305,7 @@ where transport: transport::Boxed<(PeerId, StreamMuxerBox)>, /// The nodes currently active. - pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>>, + pool: Pool>, /// The local peer ID. local_peer_id: PeerId, @@ -802,7 +802,7 @@ where fn handle_pool_event( &mut self, - event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, + event: PoolEvent>, ) -> Option>> { match event { PoolEvent::ConnectionEstablished { @@ -1199,7 +1199,7 @@ where } } PendingNotifyHandler::Any(ids) => { - match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) { + match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) { None => continue, Some((event, ids)) => { let handler = PendingNotifyHandler::Any(ids); @@ -1308,15 +1308,13 @@ fn notify_one( /// /// Returns `None` if either all connections are closing or the event /// was successfully sent to a handler, in either case the event is consumed. -fn notify_any( +fn notify_any( ids: SmallVec<[ConnectionId; 10]>, - pool: &mut Pool, + pool: &mut Pool, event: THandlerInEvent, cx: &mut Context<'_>, ) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> where - TTrans: Transport, - TTrans::Error: Send + 'static, TBehaviour: NetworkBehaviour, THandler: IntoConnectionHandler, THandler::Handler: ConnectionHandler< From a1c55b20e85eb68141c3130760ab4bd54f8dbe49 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 21 Dec 2022 14:28:37 +1100 Subject: [PATCH 2/6] Remove type parameter for transport error --- misc/metrics/src/swarm.rs | 6 ++---- swarm/CHANGELOG.md | 4 ++++ swarm/src/connection/error.rs | 7 +++---- swarm/src/connection/pool.rs | 6 +++--- swarm/src/connection/pool/task.rs | 7 ++----- swarm/src/lib.rs | 6 +++--- 6 files changed, 17 insertions(+), 19 deletions(-) diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index a003ab56570..b171f48b6e3 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -370,10 +370,8 @@ enum PendingInboundConnectionError { ConnectionLimit, } -impl From<&libp2p_swarm::PendingInboundConnectionError> - for PendingInboundConnectionError -{ - fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self { +impl From<&libp2p_swarm::PendingInboundConnectionError> for PendingInboundConnectionError { + fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self { match error { libp2p_swarm::PendingInboundConnectionError::WrongPeerId { .. } => { PendingInboundConnectionError::WrongPeerId diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index da77ae72209..e17d2c77660 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -7,9 +7,13 @@ - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. +- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`. + These two types are always used with `std::io::Error`. See [PR XXXX. + [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX # 0.41.1 diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 541d458df0c..9226035968c 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -76,12 +76,11 @@ impl From for ConnectionError { /// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to /// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single /// connection. -pub type PendingOutboundConnectionError = - PendingConnectionError)>>; +pub type PendingOutboundConnectionError = + PendingConnectionError)>>; /// Errors that can occur in the context of a pending incoming `Connection`. -pub type PendingInboundConnectionError = - PendingConnectionError>; +pub type PendingInboundConnectionError = PendingConnectionError>; /// Errors that can occur in the context of a pending `Connection`. #[derive(Debug)] diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 482c27493c8..3d4bde9614b 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -268,7 +268,7 @@ pub enum PoolEvent { /// The ID of the failed connection. id: ConnectionId, /// The error that occurred. - error: PendingOutboundConnectionError, + error: PendingOutboundConnectionError, /// The handler that was supposed to handle the connection. handler: THandler, /// The (expected) peer of the failed connection. @@ -284,7 +284,7 @@ pub enum PoolEvent { /// Local connection address. local_addr: Multiaddr, /// The error that occurred. - error: PendingInboundConnectionError, + error: PendingInboundConnectionError, /// The handler that was supposed to handle the connection. handler: THandler, }, @@ -661,7 +661,7 @@ where ), }; - let error: Result<(), PendingInboundConnectionError<_>> = self + let error: Result<(), PendingInboundConnectionError> = self .counters // Check general established connection limit. .check_max_established(&endpoint) diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index f6e98beb092..a0f4ba9235e 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -26,7 +26,7 @@ use crate::{ connection::{ self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - transport::{TransportError}, + transport::TransportError, ConnectionHandler, Multiaddr, PeerId, }; use futures::{ @@ -62,10 +62,7 @@ pub enum PendingConnectionEvent { /// A pending connection failed. PendingFailed { id: ConnectionId, - error: Either< - PendingOutboundConnectionError, - PendingInboundConnectionError, - >, + error: Either, }, } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index ac7bf603e93..bd0c517c248 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -234,7 +234,7 @@ pub enum SwarmEvent { /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// The error that happened. - error: PendingInboundConnectionError, + error: PendingInboundConnectionError, }, /// Outgoing connection attempt failed. OutgoingConnectionError { @@ -1639,8 +1639,8 @@ pub enum DialError { Transport(Vec<(Multiaddr, TransportError)>), } -impl From> for DialError { - fn from(error: PendingOutboundConnectionError) -> Self { +impl From for DialError { + fn from(error: PendingOutboundConnectionError) -> Self { match error { PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit), PendingConnectionError::Aborted => DialError::Aborted, From 7132ff54ede2c268d5477741899e695e0d74b06b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 21 Dec 2022 14:30:15 +1100 Subject: [PATCH 3/6] Remove unnecessary type hint --- swarm/src/connection/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 3d4bde9614b..30dff859d51 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -661,7 +661,7 @@ where ), }; - let error: Result<(), PendingInboundConnectionError> = self + let error = self .counters // Check general established connection limit. .check_max_established(&endpoint) From 01efa8f8d734ee17164c3282b8e1b6388d7630e6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 21 Dec 2022 14:35:44 +1100 Subject: [PATCH 4/6] Remove type parameters from `ConcurrentDial` --- swarm/src/connection/pool/concurrent_dial.rs | 39 ++++++++------------ swarm/src/connection/pool/task.rs | 3 +- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/swarm/src/connection/pool/concurrent_dial.rs b/swarm/src/connection/pool/concurrent_dial.rs index 5ba71f54fd0..ede8b4e68e0 100644 --- a/swarm/src/connection/pool/concurrent_dial.rs +++ b/swarm/src/connection/pool/concurrent_dial.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - transport::{Transport, TransportError}, + transport::{TransportError}, Multiaddr, }; use futures::{ @@ -27,36 +27,32 @@ use futures::{ ready, stream::{FuturesUnordered, StreamExt}, }; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::PeerId; use std::{ num::NonZeroU8, pin::Pin, task::{Context, Poll}, }; -type Dial = BoxFuture< +type Dial = BoxFuture< 'static, ( Multiaddr, - Result<::Output, TransportError<::Error>>, + Result<(PeerId, StreamMuxerBox), TransportError>, ), >; -pub struct ConcurrentDial { - dials: FuturesUnordered>, - pending_dials: Box> + Send>, - errors: Vec<(Multiaddr, TransportError)>, +pub struct ConcurrentDial { + dials: FuturesUnordered, + pending_dials: Box + Send>, + errors: Vec<(Multiaddr, TransportError)>, } -impl Unpin for ConcurrentDial {} +impl Unpin for ConcurrentDial {} -impl ConcurrentDial -where - TTrans: Transport + Send + 'static, - TTrans::Output: Send, - TTrans::Error: Send, - TTrans::Dial: Send + 'static, -{ - pub(crate) fn new(pending_dials: Vec>, concurrency_factor: NonZeroU8) -> Self { +impl ConcurrentDial { + pub(crate) fn new(pending_dials: Vec, concurrency_factor: NonZeroU8) -> Self { let mut pending_dials = pending_dials.into_iter(); let dials = FuturesUnordered::new(); @@ -75,20 +71,17 @@ where } } -impl Future for ConcurrentDial -where - TTrans: Transport, -{ +impl Future for ConcurrentDial { type Output = Result< // Either one dial succeeded, returning the negotiated [`PeerId`], the address, the // muxer and the addresses and errors of the dials that failed before. ( Multiaddr, - TTrans::Output, - Vec<(Multiaddr, TransportError)>, + (PeerId, StreamMuxerBox), + Vec<(Multiaddr, TransportError)>, ), // Or all dials failed, thus returning the address and error for each dial. - Vec<(Multiaddr, TransportError)>, + Vec<(Multiaddr, TransportError)>, >; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index a0f4ba9235e..326e381d76f 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -36,7 +36,6 @@ use futures::{ }; use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::StreamMuxerBox; -use libp2p_core::transport::Boxed; use std::pin::Pin; use void::Void; @@ -94,7 +93,7 @@ pub enum EstablishedConnectionEvent { pub async fn new_for_pending_outgoing_connection( connection_id: ConnectionId, - dial: ConcurrentDial>, + dial: ConcurrentDial, abort_receiver: oneshot::Receiver, mut events: mpsc::Sender, ) { From b0f2678b93d30a838096832bfaa8e4af440dffa5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 21 Dec 2022 14:44:27 +1100 Subject: [PATCH 5/6] Fix rustfmt --- swarm/src/connection/pool/concurrent_dial.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/swarm/src/connection/pool/concurrent_dial.rs b/swarm/src/connection/pool/concurrent_dial.rs index ede8b4e68e0..7bd9e7f1f66 100644 --- a/swarm/src/connection/pool/concurrent_dial.rs +++ b/swarm/src/connection/pool/concurrent_dial.rs @@ -18,10 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - transport::{TransportError}, - Multiaddr, -}; +use crate::{transport::TransportError, Multiaddr}; use futures::{ future::{BoxFuture, Future}, ready, From 7de947435c2e886ecdcfbfc5ab844904e3621143 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 23 Dec 2022 10:48:20 +1100 Subject: [PATCH 6/6] Update swarm/CHANGELOG.md --- swarm/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e17d2c77660..3f411b4437e 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -8,12 +8,12 @@ - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. - Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`. - These two types are always used with `std::io::Error`. See [PR XXXX. + These two types are always used with `std::io::Error`. See [PR 3272]. [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 -[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX +[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272 # 0.41.1