diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 00d7587478b0..25183b932c7f 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -70,11 +70,11 @@ impl NetworkBehaviour for Toggle where TBehaviour: NetworkBehaviour, { - type ConnectionHandler = ToggleIntoProtoHandler; + type ConnectionHandler = ToggleIntoConnectionHandler; type OutEvent = TBehaviour::OutEvent; fn new_handler(&mut self) -> Self::ConnectionHandler { - ToggleIntoProtoHandler { + ToggleIntoConnectionHandler { inner: self.inner.as_mut().map(|i| i.new_handler()), } } @@ -223,9 +223,9 @@ where params: &mut impl PollParameters, ) -> Poll> { if let Some(inner) = self.inner.as_mut() { - inner - .poll(cx, params) - .map(|action| action.map_handler(|h| ToggleIntoProtoHandler { inner: Some(h) })) + inner.poll(cx, params).map(|action| { + action.map_handler(|h| ToggleIntoConnectionHandler { inner: Some(h) }) + }) } else { Poll::Pending } @@ -244,22 +244,22 @@ where } /// Implementation of `IntoConnectionHandler` that can be in the disabled state. -pub struct ToggleIntoProtoHandler { +pub struct ToggleIntoConnectionHandler { inner: Option, } -impl IntoConnectionHandler for ToggleIntoProtoHandler +impl IntoConnectionHandler for ToggleIntoConnectionHandler where TInner: IntoConnectionHandler, { - type Handler = ToggleProtoHandler; + type Handler = ToggleConnectionHandler; fn into_handler( self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint, ) -> Self::Handler { - ToggleProtoHandler { + ToggleConnectionHandler { inner: self .inner .map(|h| h.into_handler(remote_peer_id, connected_point)), @@ -276,11 +276,11 @@ where } /// Implementation of [`ConnectionHandler`] that can be in the disabled state. -pub struct ToggleProtoHandler { +pub struct ToggleConnectionHandler { inner: Option, } -impl ConnectionHandler for ToggleProtoHandler +impl ConnectionHandler for ToggleConnectionHandler where TInner: ConnectionHandler, { @@ -426,7 +426,7 @@ mod tests { use super::*; use crate::handler::DummyConnectionHandler; - /// A disabled [`ToggleProtoHandler`] can receive listen upgrade errors in + /// A disabled [`ToggleConnectionHandler`] can receive listen upgrade errors in /// the following two cases: /// /// 1. Protocol negotiation on an incoming stream failed with no protocol @@ -439,10 +439,10 @@ mod tests { /// [`ConnectionHandlerSelect`](crate::connection_handler::ConnectionHandlerSelect) /// the former might receive an inbound upgrade error even when disabled. /// - /// [`ToggleProtoHandler`] should ignore the error in both of these cases. + /// [`ToggleConnectionHandler`] should ignore the error in both of these cases. #[test] fn ignore_listen_upgrade_error_when_disabled() { - let mut handler = ToggleProtoHandler:: { inner: None }; + let mut handler = ToggleConnectionHandler:: { inner: None }; handler.inject_listen_upgrade_error(Either::Right(()), ConnectionHandlerUpgrErr::Timeout); } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 9215b9996711..42c29239cdc6 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -113,7 +113,10 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. pub fn close(self) -> (THandler, Close) { - (self.handler.into_connection_handler(), self.muxing.close().0) + ( + self.handler.into_connection_handler(), + self.muxing.close().0, + ) } /// Polls the handler and the substream, forwarding events from the former to the latter and diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index f0b4946197f7..13bd22c17158 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -42,35 +42,35 @@ use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; /// - Driving substream upgrades /// - Handling connection timeout // TODO: add a caching system for protocols that are supported or not -pub struct HandlerWrapper +pub struct HandlerWrapper where - TProtoHandler: ConnectionHandler, + TConnectionHandler: ConnectionHandler, { /// The underlying handler. - handler: TProtoHandler, + handler: TConnectionHandler, /// Futures that upgrade incoming substreams. negotiating_in: FuturesUnordered< SubstreamUpgrade< - TProtoHandler::InboundOpenInfo, + TConnectionHandler::InboundOpenInfo, InboundUpgradeApply< Substream, - SendWrapper, + SendWrapper, >, >, >, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered< SubstreamUpgrade< - TProtoHandler::OutboundOpenInfo, + TConnectionHandler::OutboundOpenInfo, OutboundUpgradeApply< Substream, - SendWrapper, + SendWrapper, >, >, >, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). - queued_dial_upgrades: Vec<(u64, SendWrapper)>, + queued_dial_upgrades: Vec<(u64, SendWrapper)>, /// Unique identifier assigned to each queued dial upgrade. unique_dial_upgrade_id: u64, /// The currently planned connection & handler shutdown. @@ -79,7 +79,7 @@ where substream_upgrade_protocol_override: Option, } -impl std::fmt::Debug for HandlerWrapper { +impl std::fmt::Debug for HandlerWrapper { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("HandlerWrapper") .field("negotiating_in", &self.negotiating_in) @@ -94,9 +94,9 @@ impl std::fmt::Debug for HandlerWrapper HandlerWrapper { +impl HandlerWrapper { pub(crate) fn new( - handler: TProtoHandler, + handler: TConnectionHandler, substream_upgrade_protocol_override: Option, ) -> Self { Self { @@ -224,22 +224,22 @@ where } } -pub type OutboundOpenInfo = ( +pub type OutboundOpenInfo = ( u64, - ::OutboundOpenInfo, + ::OutboundOpenInfo, Duration, ); -impl HandlerWrapper +impl HandlerWrapper where - TProtoHandler: ConnectionHandler, + TConnectionHandler: ConnectionHandler, { pub fn inject_substream( &mut self, substream: Substream, // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). - endpoint: SubstreamEndpoint>, + endpoint: SubstreamEndpoint>, ) { match endpoint { SubstreamEndpoint::Listener => { @@ -290,7 +290,7 @@ where } } - pub fn inject_event(&mut self, event: TProtoHandler::InEvent) { + pub fn inject_event(&mut self, event: TConnectionHandler::InEvent) { self.handler.inject_event(event); } @@ -303,8 +303,8 @@ where cx: &mut Context<'_>, ) -> Poll< Result< - Event, TProtoHandler::OutEvent>, - Error, + Event, TConnectionHandler::OutEvent>, + Error, >, > { while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index 96575bbcb58e..a209225045e3 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -27,15 +27,15 @@ use libp2p_core::Multiaddr; use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll}; /// Wrapper around a protocol handler that turns the input event into something else. -pub struct MapInEvent { - inner: TProtoHandler, +pub struct MapInEvent { + inner: TConnectionHandler, map: TMap, marker: PhantomData, } -impl MapInEvent { +impl MapInEvent { /// Creates a `MapInEvent`. - pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self { + pub(crate) fn new(inner: TConnectionHandler, map: TMap) -> Self { MapInEvent { inner, map, @@ -44,20 +44,21 @@ impl MapInEvent { } } -impl ConnectionHandler for MapInEvent +impl ConnectionHandler + for MapInEvent where - TProtoHandler: ConnectionHandler, - TMap: Fn(TNewIn) -> Option, + TConnectionHandler: ConnectionHandler, + TMap: Fn(TNewIn) -> Option, TNewIn: Debug + Send + 'static, TMap: Send + 'static, { type InEvent = TNewIn; - type OutEvent = TProtoHandler::OutEvent; - type Error = TProtoHandler::Error; - type InboundProtocol = TProtoHandler::InboundProtocol; - type OutboundProtocol = TProtoHandler::OutboundProtocol; - type InboundOpenInfo = TProtoHandler::InboundOpenInfo; - type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; + type OutEvent = TConnectionHandler::OutEvent; + type Error = TConnectionHandler::Error; + type InboundProtocol = TConnectionHandler::InboundProtocol; + type OutboundProtocol = TConnectionHandler::OutboundProtocol; + type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; + type OutboundOpenInfo = TConnectionHandler::OutboundOpenInfo; fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index f92958e30b43..2eb0c2f9bdcd 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -28,32 +28,32 @@ use std::fmt::Debug; use std::task::{Context, Poll}; /// Wrapper around a protocol handler that turns the output event into something else. -pub struct MapOutEvent { - inner: TProtoHandler, +pub struct MapOutEvent { + inner: TConnectionHandler, map: TMap, } -impl MapOutEvent { +impl MapOutEvent { /// Creates a `MapOutEvent`. - pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self { + pub(crate) fn new(inner: TConnectionHandler, map: TMap) -> Self { MapOutEvent { inner, map } } } -impl ConnectionHandler for MapOutEvent +impl ConnectionHandler for MapOutEvent where - TProtoHandler: ConnectionHandler, - TMap: FnMut(TProtoHandler::OutEvent) -> TNewOut, + TConnectionHandler: ConnectionHandler, + TMap: FnMut(TConnectionHandler::OutEvent) -> TNewOut, TNewOut: Debug + Send + 'static, TMap: Send + 'static, { - type InEvent = TProtoHandler::InEvent; + type InEvent = TConnectionHandler::InEvent; type OutEvent = TNewOut; - type Error = TProtoHandler::Error; - type InboundProtocol = TProtoHandler::InboundProtocol; - type OutboundProtocol = TProtoHandler::OutboundProtocol; - type InboundOpenInfo = TProtoHandler::InboundOpenInfo; - type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; + type Error = TConnectionHandler::Error; + type InboundProtocol = TConnectionHandler::InboundProtocol; + type OutboundProtocol = TConnectionHandler::OutboundProtocol; + type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; + type OutboundOpenInfo = TConnectionHandler::OutboundOpenInfo; fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol()