diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index a003ab56570..b171f48b6e3 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -370,10 +370,8 @@ enum PendingInboundConnectionError { ConnectionLimit, } -impl From<&libp2p_swarm::PendingInboundConnectionError> - for PendingInboundConnectionError -{ - fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self { +impl From<&libp2p_swarm::PendingInboundConnectionError> for PendingInboundConnectionError { + fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self { match error { libp2p_swarm::PendingInboundConnectionError::WrongPeerId { .. } => { PendingInboundConnectionError::WrongPeerId diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index da77ae72209..3f411b4437e 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -7,9 +7,13 @@ - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. +- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`. + These two types are always used with `std::io::Error`. See [PR 3272]. + [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 +[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272 # 0.41.1 diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 541d458df0c..9226035968c 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -76,12 +76,11 @@ impl From for ConnectionError { /// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to /// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single /// connection. -pub type PendingOutboundConnectionError = - PendingConnectionError)>>; +pub type PendingOutboundConnectionError = + PendingConnectionError)>>; /// Errors that can occur in the context of a pending incoming `Connection`. -pub type PendingInboundConnectionError = - PendingConnectionError>; +pub type PendingInboundConnectionError = PendingConnectionError>; /// Errors that can occur in the context of a pending `Connection`. #[derive(Debug)] diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 7a81c57e2df..30dff859d51 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -26,7 +26,7 @@ use crate::{ Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - transport::{Transport, TransportError}, + transport::TransportError, ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId, }; use concurrent_dial::ConcurrentDial; @@ -79,9 +79,8 @@ impl ExecSwitch { } /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where - TTrans: Transport, THandler: IntoConnectionHandler, { local_id: PeerId, @@ -124,10 +123,10 @@ where /// Sender distributed to pending tasks for reporting events back /// to the pool. - pending_connection_events_tx: mpsc::Sender>, + pending_connection_events_tx: mpsc::Sender, /// Receiver for events reported from pending tasks. - pending_connection_events_rx: mpsc::Receiver>, + pending_connection_events_rx: mpsc::Receiver, /// Sender distributed to established tasks for reporting events back /// to the pool. @@ -213,7 +212,7 @@ impl PendingConnection { } } -impl fmt::Debug for Pool { +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -223,10 +222,7 @@ impl fmt::Debug for Pool -where - TTrans: Transport, -{ +pub enum PoolEvent { /// A new connection has been established. ConnectionEstablished { id: ConnectionId, @@ -239,7 +235,7 @@ where /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. - concurrent_dial_errors: Option)>>, + concurrent_dial_errors: Option)>>, /// How long it took to establish this connection. established_in: std::time::Duration, }, @@ -272,7 +268,7 @@ where /// The ID of the failed connection. id: ConnectionId, /// The error that occurred. - error: PendingOutboundConnectionError, + error: PendingOutboundConnectionError, /// The handler that was supposed to handle the connection. handler: THandler, /// The (expected) peer of the failed connection. @@ -288,7 +284,7 @@ where /// Local connection address. local_addr: Multiaddr, /// The error that occurred. - error: PendingInboundConnectionError, + error: PendingInboundConnectionError, /// The handler that was supposed to handle the connection. handler: THandler, }, @@ -312,10 +308,9 @@ where }, } -impl Pool +impl Pool where THandler: IntoConnectionHandler, - TTrans: Transport, { /// Creates a new empty `Pool`. pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { @@ -429,12 +424,9 @@ where } } -impl Pool +impl Pool where THandler: IntoConnectionHandler, - TTrans: Transport + 'static, - TTrans::Output: Send + 'static, - TTrans::Error: Send + 'static, { /// Adds a pending outgoing connection to the pool in the form of a `Future` /// that establishes and negotiates the connection. @@ -448,10 +440,7 @@ where 'static, ( Multiaddr, - Result< - ::Output, - TransportError<::Error>, - >, + Result<(PeerId, StreamMuxerBox), TransportError>, ), >, >, @@ -459,11 +448,7 @@ where handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result - where - TTrans: Send, - TTrans::Dial: Send + 'static, - { + ) -> Result { if let Err(limit) = self.counters.check_max_pending_outgoing() { return Err((limit, handler)); }; @@ -515,7 +500,7 @@ where info: IncomingInfo<'_>, ) -> Result where - TFut: Future> + Send + 'static, + TFut: Future> + Send + 'static, { let endpoint = info.create_connected_point(); @@ -552,9 +537,8 @@ where } /// Polls the connection pool for events. - pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where - TTrans: Transport, THandler: IntoConnectionHandler + 'static, THandler::Handler: ConnectionHandler + Send, ::OutboundOpenInfo: Send, @@ -677,7 +661,7 @@ where ), }; - let error: Result<(), PendingInboundConnectionError<_>> = self + let error = self .counters // Check general established connection limit. .check_max_established(&endpoint) diff --git a/swarm/src/connection/pool/concurrent_dial.rs b/swarm/src/connection/pool/concurrent_dial.rs index 5ba71f54fd0..7bd9e7f1f66 100644 --- a/swarm/src/connection/pool/concurrent_dial.rs +++ b/swarm/src/connection/pool/concurrent_dial.rs @@ -18,45 +18,38 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - transport::{Transport, TransportError}, - Multiaddr, -}; +use crate::{transport::TransportError, Multiaddr}; use futures::{ future::{BoxFuture, Future}, ready, stream::{FuturesUnordered, StreamExt}, }; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::PeerId; use std::{ num::NonZeroU8, pin::Pin, task::{Context, Poll}, }; -type Dial = BoxFuture< +type Dial = BoxFuture< 'static, ( Multiaddr, - Result<::Output, TransportError<::Error>>, + Result<(PeerId, StreamMuxerBox), TransportError>, ), >; -pub struct ConcurrentDial { - dials: FuturesUnordered>, - pending_dials: Box> + Send>, - errors: Vec<(Multiaddr, TransportError)>, +pub struct ConcurrentDial { + dials: FuturesUnordered, + pending_dials: Box + Send>, + errors: Vec<(Multiaddr, TransportError)>, } -impl Unpin for ConcurrentDial {} +impl Unpin for ConcurrentDial {} -impl ConcurrentDial -where - TTrans: Transport + Send + 'static, - TTrans::Output: Send, - TTrans::Error: Send, - TTrans::Dial: Send + 'static, -{ - pub(crate) fn new(pending_dials: Vec>, concurrency_factor: NonZeroU8) -> Self { +impl ConcurrentDial { + pub(crate) fn new(pending_dials: Vec, concurrency_factor: NonZeroU8) -> Self { let mut pending_dials = pending_dials.into_iter(); let dials = FuturesUnordered::new(); @@ -75,20 +68,17 @@ where } } -impl Future for ConcurrentDial -where - TTrans: Transport, -{ +impl Future for ConcurrentDial { type Output = Result< // Either one dial succeeded, returning the negotiated [`PeerId`], the address, the // muxer and the addresses and errors of the dials that failed before. ( Multiaddr, - TTrans::Output, - Vec<(Multiaddr, TransportError)>, + (PeerId, StreamMuxerBox), + Vec<(Multiaddr, TransportError)>, ), // Or all dials failed, thus returning the address and error for each dial. - Vec<(Multiaddr, TransportError)>, + Vec<(Multiaddr, TransportError)>, >; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 8e1129d8cae..326e381d76f 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -26,7 +26,7 @@ use crate::{ connection::{ self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - transport::{Transport, TransportError}, + transport::TransportError, ConnectionHandler, Multiaddr, PeerId, }; use futures::{ @@ -35,6 +35,7 @@ use futures::{ SinkExt, StreamExt, }; use libp2p_core::connection::ConnectionId; +use libp2p_core::muxing::StreamMuxerBox; use std::pin::Pin; use void::Void; @@ -48,26 +49,19 @@ pub enum Command { Close, } -#[derive(Debug)] -pub enum PendingConnectionEvent -where - TTrans: Transport, -{ +pub enum PendingConnectionEvent { ConnectionEstablished { id: ConnectionId, - output: TTrans::Output, + output: (PeerId, StreamMuxerBox), /// [`Some`] when the new connection is an outgoing connection. /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. - outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError)>)>, + outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError)>)>, }, /// A pending connection failed. PendingFailed { id: ConnectionId, - error: Either< - PendingOutboundConnectionError, - PendingInboundConnectionError, - >, + error: Either, }, } @@ -97,14 +91,12 @@ pub enum EstablishedConnectionEvent { }, } -pub async fn new_for_pending_outgoing_connection( +pub async fn new_for_pending_outgoing_connection( connection_id: ConnectionId, - dial: ConcurrentDial, + dial: ConcurrentDial, abort_receiver: oneshot::Receiver, - mut events: mpsc::Sender>, -) where - TTrans: Transport, -{ + mut events: mpsc::Sender, +) { match futures::future::select(abort_receiver, Box::pin(dial)).await { Either::Left((Err(oneshot::Canceled), _)) => { let _ = events @@ -135,14 +127,13 @@ pub async fn new_for_pending_outgoing_connection( } } -pub async fn new_for_pending_incoming_connection( +pub async fn new_for_pending_incoming_connection( connection_id: ConnectionId, future: TFut, abort_receiver: oneshot::Receiver, - mut events: mpsc::Sender>, + mut events: mpsc::Sender, ) where - TTrans: Transport, - TFut: Future> + Send + 'static, + TFut: Future> + Send + 'static, { match futures::future::select(abort_receiver, Box::pin(future)).await { Either::Left((Err(oneshot::Canceled), _)) => { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 5b09bd2cc38..c4fd2de563e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -232,7 +232,7 @@ pub enum SwarmEvent { /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// The error that happened. - error: PendingInboundConnectionError, + error: PendingInboundConnectionError, }, /// Outgoing connection attempt failed. OutgoingConnectionError { @@ -303,7 +303,7 @@ where transport: transport::Boxed<(PeerId, StreamMuxerBox)>, /// The nodes currently active. - pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>>, + pool: Pool>, /// The local peer ID. local_peer_id: PeerId, @@ -733,7 +733,7 @@ where fn handle_pool_event( &mut self, - event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, + event: PoolEvent>, ) -> Option>> { match event { PoolEvent::ConnectionEstablished { @@ -1130,7 +1130,7 @@ where } } PendingNotifyHandler::Any(ids) => { - match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) { + match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) { None => continue, Some((event, ids)) => { let handler = PendingNotifyHandler::Any(ids); @@ -1239,15 +1239,13 @@ fn notify_one( /// /// Returns `None` if either all connections are closing or the event /// was successfully sent to a handler, in either case the event is consumed. -fn notify_any( +fn notify_any( ids: SmallVec<[ConnectionId; 10]>, - pool: &mut Pool, + pool: &mut Pool, event: THandlerInEvent, cx: &mut Context<'_>, ) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> where - TTrans: Transport, - TTrans::Error: Send + 'static, TBehaviour: NetworkBehaviour, THandler: IntoConnectionHandler, THandler::Handler: ConnectionHandler< @@ -1572,8 +1570,8 @@ pub enum DialError { Transport(Vec<(Multiaddr, TransportError)>), } -impl From> for DialError { - fn from(error: PendingOutboundConnectionError) -> Self { +impl From for DialError { + fn from(error: PendingOutboundConnectionError) -> Self { match error { PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit), PendingConnectionError::Aborted => DialError::Aborted,