diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index c4fa8712d14..9d89944b49d 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -31,6 +31,8 @@ pub struct Metrics { connections_established: Family, connections_closed: Family, + connections_denied: Family, + new_listen_addr: Family, expired_listen_addr: Family, @@ -60,6 +62,13 @@ impl Metrics { Box::new(connections_incoming_error.clone()), ); + let connections_denied = Family::default(); + sub_registry.register( + "connections_denied", + "Number of denied connections", + Box::new(connections_denied.clone()), + ); + let new_listen_addr = Family::default(); sub_registry.register( "new_listen_addr", @@ -128,6 +137,7 @@ impl Metrics { connections_incoming_error, connections_established, connections_closed, + connections_denied, new_listen_addr, expired_listen_addr, listener_closed, @@ -269,6 +279,13 @@ impl super::Recorder { self.dial_attempt.inc(); } + libp2p_swarm::SwarmEvent::ConnectionDenied { endpoint, .. } => { + self.connections_denied + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(endpoint.get_remote_address()), + }) + .inc(); + } } } } diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index cc0adc51c02..c8453661d77 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -35,14 +35,15 @@ use libp2p_request_response::{ ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::{ behaviour::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, FromSwarm, }, - ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, + ConnectionHandler, }; +use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use std::{ collections::{HashMap, VecDeque}, iter, @@ -374,20 +375,9 @@ impl Behaviour { } } - fn on_dial_failure( - &mut self, - DialFailure { - peer_id, - handler, - error, - }: DialFailure<::ConnectionHandler>, - ) { + fn on_dial_failure(&mut self, DialFailure { peer_id, error }: DialFailure) { self.inner - .on_swarm_event(FromSwarm::DialFailure(DialFailure { - peer_id, - handler, - error, - })); + .on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, error })); if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { self.pending_out_events .push_back(Event::InboundProbe(event)); @@ -467,8 +457,12 @@ impl NetworkBehaviour for Behaviour { } } - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.inner.new_handler() + fn new_handler( + &mut self, + peer: &PeerId, + connected_point: &ConnectedPoint, + ) -> Result> { + self.inner.new_handler(peer, connected_point) } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { @@ -529,8 +523,7 @@ impl NetworkBehaviour for Behaviour { &mut self, peer_id: PeerId, connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: ::OutEvent, ) { self.inner .on_connection_handler_event(peer_id, connection_id, event) @@ -539,7 +532,7 @@ impl NetworkBehaviour for Behaviour { type Action = NetworkBehaviourAction< ::OutEvent, - ::ConnectionHandler, + THandlerInEvent<::ConnectionHandler>, >; // Trait implemented for `AsClient` as `AsServer` to handle events from the inner [`RequestResponse`] Protocol. diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index f858c48ceb7..f9f8a6283c7 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -30,7 +30,7 @@ use libp2p_request_response::{ }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, - DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + DialError, NetworkBehaviourAction, PollParameters, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -137,7 +137,6 @@ impl<'a> HandleInnerEvent for AsServer<'a> { .override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0")) .addresses(addrs) .build(), - handler: self.inner.new_handler(), }); } Err((status_text, error)) => { diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 78b22c89ac1..240b742804c 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -26,12 +26,15 @@ use either::Either; use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::multiaddr::Protocol; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; +use libp2p_swarm::behaviour::{ + ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, THandlerInEvent, +}; use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerUpgrErr, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, + dummy, ConnectionHandler, ConnectionHandlerUpgrErr, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; use thiserror::Error; @@ -73,13 +76,22 @@ pub struct Behaviour { /// All direct (non-relayed) connections. direct_connections: HashMap>, + + awaiting_direct_inbound_connections: HashMap, + + awaiting_direct_outbound_connections: HashMap, } +type Handler = + Either>; + impl Behaviour { pub fn new() -> Self { Behaviour { queued_actions: Default::default(), direct_connections: Default::default(), + awaiting_direct_inbound_connections: Default::default(), + awaiting_direct_outbound_connections: Default::default(), } } @@ -132,38 +144,41 @@ impl Behaviour { fn on_dial_failure( &mut self, DialFailure { - peer_id, handler, .. - }: DialFailure<::ConnectionHandler>, + peer_id: maybe_peer_id, + .. + }: DialFailure, ) { - if let handler::Prototype::DirectConnection { - relayed_connection_id, - role: handler::Role::Initiator { attempt }, - } = handler - { - let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); - if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_actions.push_back(ActionBuilder::Connect { + let peer_id = match maybe_peer_id { + Some(peer_id) => peer_id, + None => return, + }; + + let (relayed_connection_id, attempt) = + match self.awaiting_direct_outbound_connections.remove(&peer_id) { + Some((relayed_connection_id, attempt)) => (relayed_connection_id, attempt), + None => return, + }; + + if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { + self.queued_actions.push_back(ActionBuilder::Connect { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + attempt: attempt + 1, + }); + } else { + self.queued_actions.extend([ + NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(relayed_connection_id), - attempt: attempt + 1, - }); - } else { - self.queued_actions.extend([ - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::One(relayed_connection_id), - event: Either::Left( - handler::relayed::Command::UpgradeFinishedDontKeepAlive, - ), - } - .into(), - NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { - remote_peer_id: peer_id, - error: UpgradeError::Dial, - }) - .into(), - ]); - } + event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive), + } + .into(), + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: peer_id, + error: UpgradeError::Dial, + }) + .into(), + ]); } } @@ -193,19 +208,73 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = handler::Prototype; + type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype::UnknownConnection - } + fn new_handler( + &mut self, + peer: &PeerId, + connected_point: &ConnectedPoint, + ) -> Result> { + match ( + self.awaiting_direct_inbound_connections.entry(*peer), + self.awaiting_direct_outbound_connections.entry(*peer), + ) { + (Entry::Vacant(_), Entry::Occupied(occupied)) => { + if connected_point.is_relayed() { + log::debug!( + "Unexpected relay connection whilst awaiting direct outbound connection" + ); + + return Ok(Either::Left(handler::relayed::Handler::new( + connected_point.clone(), + ))); + } + + let (relayed_connection_id, _) = occupied.remove(); + + Ok(Either::Right(Either::Left(handler::direct::Handler::new( + relayed_connection_id, + )))) + } + (Entry::Occupied(occupied), Entry::Vacant(_)) => { + if connected_point.is_relayed() { + log::debug!( + "Unexpected relay connection whilst awaiting direct inbound connection" + ); + + return Ok(Either::Left(handler::relayed::Handler::new( + connected_point.clone(), + ))); + } + let relayed_connection_id = occupied.remove(); + + Ok(Either::Right(Either::Left(handler::direct::Handler::new( + relayed_connection_id, + )))) + } + (Entry::Vacant(_), Entry::Vacant(_)) => { + if connected_point.is_relayed() { + Ok(Either::Left(handler::relayed::Handler::new( + connected_point.clone(), + ))) + } else { + Ok(Either::Right(Either::Right(dummy::ConnectionHandler))) + } + } + (Entry::Occupied(_), Entry::Occupied(_)) => { + debug_assert!(false, "Should not have a pending outbound and pending inbound connection to same peer simultaneously"); + + Ok(Either::Right(Either::Right(dummy::ConnectionHandler))) + } + } + } fn on_connection_handler_event( &mut self, event_source: PeerId, connection: ConnectionId, - handler_event: <::Handler as - ConnectionHandler>::OutEvent, + handler_event: ::OutEvent, ) { match handler_event { Either::Left(handler::relayed::Event::InboundConnectRequest { @@ -243,13 +312,11 @@ impl NetworkBehaviour for Behaviour { .addresses(remote_addrs) .condition(dial_opts::PeerCondition::Always) .build(), - handler: handler::Prototype::DirectConnection { - relayed_connection_id: connection, - role: handler::Role::Listener, - }, } .into(), ); + self.awaiting_direct_inbound_connections + .insert(event_source, connection); } Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { self.queued_actions.push_back( @@ -271,13 +338,11 @@ impl NetworkBehaviour for Behaviour { .addresses(remote_addrs) .override_role() .build(), - handler: handler::Prototype::DirectConnection { - relayed_connection_id: connection, - role: handler::Role::Initiator { attempt }, - }, } .into(), ); + self.awaiting_direct_outbound_connections + .insert(event_source, (connection, attempt)); } Either::Right(Either::Left( handler::direct::Event::DirectConnectionUpgradeSucceeded { @@ -309,7 +374,8 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, poll_parameters: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(action) = self.queued_actions.pop_front() { return Poll::Ready(action.build(poll_parameters)); } @@ -342,7 +408,7 @@ impl NetworkBehaviour for Behaviour { /// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Behaviour::poll`]. enum ActionBuilder { - Done(NetworkBehaviourAction), + Done(NetworkBehaviourAction>), Connect { attempt: u8, handler: NotifyHandler, @@ -355,8 +421,8 @@ enum ActionBuilder { }, } -impl From> for ActionBuilder { - fn from(action: NetworkBehaviourAction) -> Self { +impl From>> for ActionBuilder { + fn from(action: NetworkBehaviourAction>) -> Self { Self::Done(action) } } @@ -365,7 +431,7 @@ impl ActionBuilder { fn build( self, poll_parameters: &mut impl PollParameters, - ) -> NetworkBehaviourAction { + ) -> NetworkBehaviourAction> { let obs_addrs = || { poll_parameters .external_addresses() diff --git a/protocols/dcutr/src/handler.rs b/protocols/dcutr/src/handler.rs index e854b395308..cc59e3ab4ce 100644 --- a/protocols/dcutr/src/handler.rs +++ b/protocols/dcutr/src/handler.rs @@ -18,64 +18,5 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol; -use either::Either; -use libp2p_core::connection::ConnectionId; -use libp2p_core::upgrade::{self, DeniedUpgrade}; -use libp2p_core::{ConnectedPoint, PeerId}; -use libp2p_swarm::dummy; -use libp2p_swarm::handler::SendWrapper; -use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler}; - pub mod direct; pub mod relayed; - -pub enum Prototype { - DirectConnection { - role: Role, - relayed_connection_id: ConnectionId, - }, - UnknownConnection, -} - -pub enum Role { - Initiator { attempt: u8 }, - Listener, -} - -impl IntoConnectionHandler for Prototype { - type Handler = Either>; - - fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - match self { - Self::UnknownConnection => { - if endpoint.is_relayed() { - Either::Left(relayed::Handler::new(endpoint.clone())) - } else { - Either::Right(Either::Right(dummy::ConnectionHandler)) - } - } - Self::DirectConnection { - relayed_connection_id, - .. - } => { - assert!( - !endpoint.is_relayed(), - "`Prototype::DirectConnection` is never created for relayed connection." - ); - Either::Right(Either::Left(direct::Handler::new(relayed_connection_id))) - } - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - match self { - Prototype::UnknownConnection => upgrade::EitherUpgrade::A(SendWrapper( - upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), - )), - Prototype::DirectConnection { .. } => { - upgrade::EitherUpgrade::A(SendWrapper(upgrade::EitherUpgrade::B(DeniedUpgrade))) - } - } - } -} diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 776c0e8551b..58fc40a9f25 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -26,13 +26,14 @@ use crate::topic::Topic; use crate::FloodsubConfig; use cuckoofilter::{CuckooError, CuckooFilter}; use fnv::FnvHashSet; -use libp2p_core::{connection::ConnectionId, PeerId}; +use libp2p_core::{connection::ConnectionId, ConnectedPoint, PeerId}; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; +use libp2p_swarm::ConnectionHandler; use libp2p_swarm::{ dial_opts::DialOpts, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, PollParameters, }; -use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler}; use log::warn; use smallvec::SmallVec; use std::collections::hash_map::{DefaultHasher, HashMap}; @@ -42,12 +43,7 @@ use std::{collections::VecDeque, iter}; /// Network behaviour that handles the floodsub protocol. pub struct Floodsub { /// Events that need to be yielded to the outside when polling. - events: VecDeque< - NetworkBehaviourAction< - FloodsubEvent, - OneShotHandler, - >, - >, + events: VecDeque>, config: FloodsubConfig, @@ -108,10 +104,8 @@ impl Floodsub { } if self.target_peers.insert(peer_id) { - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } @@ -331,10 +325,8 @@ impl Floodsub { // We can be disconnected by the remote in case of inactivity for example, so we always // try to reconnect. if self.target_peers.contains(&peer_id) { - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } @@ -344,16 +336,19 @@ impl NetworkBehaviour for Floodsub { type ConnectionHandler = OneShotHandler; type OutEvent = FloodsubEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Default::default() + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(Default::default()) } fn on_connection_handler_event( &mut self, propagation_source: PeerId, _connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, + event: ::OutEvent, ) { // We ignore successful sends or timeouts. let event = match event { @@ -472,7 +467,8 @@ impl NetworkBehaviour for Floodsub { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c361fc4fdbc..43dd133468c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -38,13 +38,12 @@ use rand::{seq::SliceRandom, thread_rng}; use libp2p_core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4, - multiaddr::Protocol::Ip6, Multiaddr, PeerId, + multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, }; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm}, dial_opts::DialOpts, - ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use wasm_timer::Instant; @@ -202,7 +201,7 @@ impl From for PublishConfig { } type GossipsubNetworkBehaviourAction = - NetworkBehaviourAction>; + NetworkBehaviourAction>; /// Network behaviour that handles the gossipsub protocol. /// @@ -1141,10 +1140,8 @@ where if !self.peer_topics.contains_key(peer_id) { // Connect to peer debug!("Connecting to explicit peer {:?}", peer_id); - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(*peer_id).build(), - handler, }); } } @@ -1639,11 +1636,8 @@ where // mark as px peer self.px_peers.insert(peer_id); - // dial peer - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } @@ -3303,7 +3297,11 @@ where type ConnectionHandler = GossipsubHandler; type OutEvent = GossipsubEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { let protocol_config = ProtocolConfig::new( self.config.protocol_id().clone(), self.config.custom_id_version().clone(), @@ -3312,15 +3310,17 @@ where self.config.support_floodsub(), ); - GossipsubHandler::new(protocol_config, self.config.idle_timeout()) + Ok(GossipsubHandler::new( + protocol_config, + self.config.idle_timeout(), + )) } fn on_connection_handler_event( &mut self, propagation_source: PeerId, - _connection_id: ConnectionId, - handler_event: <::Handler as - ConnectionHandler>::OutEvent, + _: ConnectionId, + handler_event: HandlerEvent, ) { match handler_event { HandlerEvent::PeerKind(kind) => { @@ -3447,7 +3447,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event.map_in(|e: Arc| { // clone send event reference if others references are present diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 9bf32d8ff43..9dd7a66c9ea 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -235,7 +235,7 @@ where // peer_connections.connections should never be empty. let mut active_connections = peer_connections.connections.len(); for connection_id in peer_connections.connections.clone() { - let handler = gs.new_handler(); + let handler = gs.new_handler(peer_id, &fake_endpoint).unwrap(); active_connections = active_connections.checked_sub(1).unwrap(); gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id: *peer_id, @@ -1358,7 +1358,7 @@ fn test_explicit_peer_gets_connected() { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => opts.get_peer_id() == Some(peer), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(peer), _ => false, }) .count(); @@ -1399,8 +1399,7 @@ fn test_explicit_peer_reconnects() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => - opts.get_peer_id() == Some(*peer), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), _ => false, }) .count(), @@ -1415,8 +1414,7 @@ fn test_explicit_peer_reconnects() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => - opts.get_peer_id() == Some(*peer), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), _ => false, }) .count() @@ -1796,7 +1794,7 @@ fn test_connect_to_px_peers_on_handle_prune() { .events .iter() .filter_map(|e| match e { - NetworkBehaviourAction::Dial { opts, handler: _ } => opts.get_peer_id(), + NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(), _ => None, }) .collect(); diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 34956fe614c..0e4965e7fb0 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -889,7 +889,7 @@ mod test { use crate::types::PeerKind; use crate::Topic; use crate::{Gossipsub, MessageAuthenticity}; - use libp2p_core::UpgradeInfo; + use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr, UpgradeInfo}; use libp2p_swarm::{ConnectionHandler, NetworkBehaviour}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -995,7 +995,9 @@ mod test { let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder).expect("Correct configuration"); - let handler = gossipsub.new_handler(); + let handler = gossipsub + .new_handler(&PeerId::random(), &fake_endpoint()) + .unwrap(); let (protocol_config, _) = handler.listen_protocol().into_upgrade(); let protocol_ids = protocol_config.protocol_info(); @@ -1023,7 +1025,9 @@ mod test { let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder).expect("Correct configuration"); - let handler = gossipsub.new_handler(); + let handler = gossipsub + .new_handler(&PeerId::random(), &fake_endpoint()) + .unwrap(); let (protocol_config, _) = handler.listen_protocol().into_upgrade(); let protocol_ids = protocol_config.protocol_info(); @@ -1032,4 +1036,11 @@ mod test { assert_eq!(protocol_ids[0].protocol_id, b"purple".to_vec()); assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsub); } + + fn fake_endpoint() -> ConnectedPoint { + ConnectedPoint::Dialer { + address: Multiaddr::empty(), + role_override: Endpoint::Dialer, + } + } } diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 10bfab3ed7d..fa7c5e9e219 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,17 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, Proto, Push}; +use crate::handler::{self, Handler, Push}; use crate::protocol::{Info, ReplySubstream, UpgradeError}; use futures::prelude::*; use libp2p_core::{ connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; +use libp2p_swarm::behaviour::{ + ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, THandlerInEvent, +}; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, - IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use lru::LruCache; use std::num::NonZeroUsize; @@ -54,7 +55,7 @@ pub struct Behaviour { /// Pending replies to send. pending_replies: VecDeque, /// Pending events to be emitted when polled. - events: VecDeque>, + events: VecDeque>, /// Peers to which an active push with current information about /// the local peer should be sent. pending_push: HashSet, @@ -198,10 +199,8 @@ impl Behaviour { { for p in peers { if self.pending_push.insert(p) && !self.connected.contains_key(&p) { - let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(p).build(), - handler, }); } } @@ -236,18 +235,26 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = Proto; + type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Proto::new(self.config.initial_delay, self.config.interval) + fn new_handler( + &mut self, + peer: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(Handler::new( + self.config.initial_delay, + self.config.interval, + *peer, + )) } fn on_connection_handler_event( &mut self, peer_id: PeerId, connection: ConnectionId, - event: <::Handler as ConnectionHandler>::OutEvent, + event: ::OutEvent, ) { match event { handler::Event::Identified(mut info) => { @@ -307,7 +314,8 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0de54f0a006..67e3f77b249 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -26,44 +26,18 @@ use futures::prelude::*; use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade}; -use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_core::PeerId; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use log::warn; use smallvec::SmallVec; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; -pub struct Proto { - initial_delay: Duration, - interval: Duration, -} - -impl Proto { - pub fn new(initial_delay: Duration, interval: Duration) -> Self { - Proto { - initial_delay, - interval, - } - } -} - -impl IntoConnectionHandler for Proto { - type Handler = Handler; - - fn into_handler(self, remote_peer_id: &PeerId, _endpoint: &ConnectedPoint) -> Self::Handler { - Handler::new(self.initial_delay, self.interval, *remote_peer_id) - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - SelectUpgrade::new(Protocol, PushProtocol::inbound()) - } -} - /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 643f618567c..38bcf052d86 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -24,7 +24,7 @@ mod test; use crate::addresses::Addresses; use crate::handler::{ - KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn, KademliaHandlerProto, + KademliaHandler, KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId, }; use crate::jobs::*; @@ -42,7 +42,7 @@ use instant::Instant; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr, - FromSwarm, NewExternalAddr, NewListenAddr, + FromSwarm, NewExternalAddr, NewListenAddr, THandlerInEvent, }; use libp2p_swarm::{ dial_opts::{self, DialOpts}, @@ -101,7 +101,7 @@ pub struct Kademlia { connection_idle_timeout: Duration, /// Queued events to return when the behaviour is being polled. - queued_events: VecDeque>>, + queued_events: VecDeque>>, /// The currently known addresses of the local node. local_addrs: HashSet, @@ -567,10 +567,8 @@ where RoutingUpdate::Failed } kbucket::InsertResult::Pending { disconnected } => { - let handler = self.new_handler(); self.queued_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()).build(), - handler, }); RoutingUpdate::Pending } @@ -1209,11 +1207,9 @@ where // // Only try dialing peer if not currently connected. if !self.connected_peers.contains(disconnected.preimage()) { - let handler = self.new_handler(); self.queued_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()) .build(), - handler, }) } } @@ -1909,12 +1905,7 @@ where } } - fn on_dial_failure( - &mut self, - DialFailure { peer_id, error, .. }: DialFailure< - ::ConnectionHandler, - >, - ) { + fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { let peer_id = match peer_id { Some(id) => id, // Not interested in dial failures to unknown peers. @@ -1981,15 +1972,23 @@ where for<'a> TStore: RecordStore<'a>, TStore: Send + 'static, { - type ConnectionHandler = KademliaHandlerProto; + type ConnectionHandler = KademliaHandler; type OutEvent = KademliaEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - KademliaHandlerProto::new(KademliaHandlerConfig { - protocol_config: self.protocol_config.clone(), - allow_listening: true, - idle_timeout: self.connection_idle_timeout, - }) + fn new_handler( + &mut self, + remote_peer_id: &PeerId, + endpoint: &ConnectedPoint, + ) -> Result> { + Ok(KademliaHandler::new( + KademliaHandlerConfig { + protocol_config: self.protocol_config.clone(), + allow_listening: true, + idle_timeout: self.connection_idle_timeout, + }, + endpoint.clone(), + *remote_peer_id, + )) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -2286,7 +2285,8 @@ where &mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { let now = Instant::now(); // Calculate the available capacity for queries triggered by background jobs. @@ -2385,10 +2385,8 @@ where }); } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id, event)); - let handler = self.new_handler(); self.queued_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(peer_id).build(), - handler, }); } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 345762d4b54..cc1c5cffae1 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -31,8 +31,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use log::trace; use std::task::Waker; @@ -42,39 +42,6 @@ use std::{ const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32; -/// A prototype from which [`KademliaHandler`]s can be constructed. -pub struct KademliaHandlerProto { - config: KademliaHandlerConfig, - _type: PhantomData, -} - -impl KademliaHandlerProto { - pub fn new(config: KademliaHandlerConfig) -> Self { - KademliaHandlerProto { - config, - _type: PhantomData, - } - } -} - -impl IntoConnectionHandler - for KademliaHandlerProto -{ - type Handler = KademliaHandler; - - fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id) - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - if self.config.allow_listening { - upgrade::EitherUpgrade::A(self.config.protocol_config.clone()) - } else { - upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade) - } - } -} - /// Protocol handler that manages substreams for the Kademlia protocol /// on a single connection with a peer. /// diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index ef9ac50addf..4fa105ec8e8 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -35,6 +35,7 @@ use libp2p_swarm::{ use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; +use void::Void; /// An abstraction to allow for compatibility with various async runtimes. pub trait Provider: 'static { @@ -167,8 +168,12 @@ where type ConnectionHandler = dummy::ConnectionHandler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - dummy::ConnectionHandler + fn new_handler( + &mut self, + _: &PeerId, + _: &libp2p_core::ConnectedPoint, + ) -> Result> { + Ok(dummy::ConnectionHandler) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -222,7 +227,7 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { // Poll ifwatch. while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { match event { diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 6e481500df9..80b14e1dc44 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -47,7 +47,8 @@ mod protocol; use handler::Handler; pub use handler::{Config, Failure, Success}; -use libp2p_core::{connection::ConnectionId, PeerId}; +use libp2p_core::{connection::ConnectionId, ConnectedPoint, PeerId}; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::{ behaviour::FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; @@ -119,8 +120,13 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = Handler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - Handler::new(self.config.clone()) + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> std::result::Result> + { + Ok(Handler::new(self.config.clone())) } fn on_connection_handler_event(&mut self, peer: PeerId, _: ConnectionId, result: Result) { @@ -131,7 +137,8 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(e) = self.events.pop_back() { let Event { result, peer } = &e; diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index d9a2d977588..76527af3591 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -23,6 +23,7 @@ mod handler; pub mod transport; +use crate::v2::client::handler::Handler; use crate::v2::protocol::{self, inbound_stop, outbound_hop}; use bytes::Bytes; use either::Either; @@ -33,9 +34,11 @@ use futures::io::{AsyncRead, AsyncWrite}; use futures::ready; use futures::stream::StreamExt; use libp2p_core::connection::ConnectionId; -use libp2p_core::PeerId; +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; +use libp2p_swarm::{dummy, ConnectionHandler}; use libp2p_swarm::{ ConnectionHandlerUpgrErr, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, @@ -96,6 +99,8 @@ pub struct Client { /// connection. directly_connected_peers: HashMap>, + initial_events: HashMap, + /// Queue of actions to return when polled. queued_actions: VecDeque, } @@ -109,6 +114,7 @@ impl Client { local_peer_id, from_transport, directly_connected_peers: Default::default(), + initial_events: Default::default(), queued_actions: Default::default(), }; (transport, behaviour) @@ -146,11 +152,40 @@ impl Client { } impl NetworkBehaviour for Client { - type ConnectionHandler = handler::Prototype; + type ConnectionHandler = Either; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype::new(self.local_peer_id, None) + fn new_handler( + &mut self, + peer: &PeerId, + connected_point: &ConnectedPoint, + ) -> Result> { + if connected_point.is_relayed() { + if let Some(event) = self.initial_events.remove(peer) { + log::debug!( + "Established relayed instead of direct connection to {:?}, \ + dropping initial in event {:?}.", + peer, + event + ); + } + + // Deny all substreams on relayed connection. + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + let mut handler = Handler::new( + self.local_peer_id, + *peer, + connected_point.get_remote_address().clone(), + ); + + if let Some(event) = self.initial_events.remove(peer) { + #[allow(deprecated)] + handler.inject_event(event) + } + + Ok(Either::Left(handler)) + } } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -247,7 +282,8 @@ impl NetworkBehaviour for Client { &mut self, cx: &mut Context<'_>, _poll_parameters: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(event) = self.queued_actions.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -269,16 +305,14 @@ impl NetworkBehaviour for Client { event: Either::Left(handler::In::Reserve { to_listener }), }, None => { - let handler = handler::Prototype::new( - self.local_peer_id, - Some(handler::In::Reserve { to_listener }), - ); + self.initial_events + .insert(relay_peer_id, handler::In::Reserve { to_listener }); + NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(relay_peer_id) .addresses(vec![relay_addr]) .extend_addresses_through_behaviour() .build(), - handler, } } } @@ -304,19 +338,19 @@ impl NetworkBehaviour for Client { }), }, None => { - let handler = handler::Prototype::new( - self.local_peer_id, - Some(handler::In::EstablishCircuit { + self.initial_events.insert( + relay_peer_id, + handler::In::EstablishCircuit { send_back, dst_peer_id, - }), + }, ); + NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(relay_peer_id) .addresses(vec![relay_addr]) .extend_addresses_through_behaviour() .build(), - handler, } } } diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 5d01cf9dbce..8472bef5983 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -21,7 +21,6 @@ use crate::v2::client::transport; use crate::v2::message_proto::Status; use crate::v2::protocol::{self, inbound_stop, outbound_hop}; -use either::Either; use futures::channel::{mpsc, oneshot}; use futures::future::{BoxFuture, FutureExt}; use futures::sink::SinkExt; @@ -30,14 +29,14 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::either::EitherError; use libp2p_core::multiaddr::Protocol; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + ListenUpgradeError, }; use libp2p_swarm::{ - dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - IntoConnectionHandler, KeepAlive, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, }; use log::debug; use std::collections::{HashMap, VecDeque}; @@ -110,63 +109,6 @@ pub enum Event { }, } -pub struct Prototype { - local_peer_id: PeerId, - /// Initial [`In`] event from [`super::Client`] provided at creation time. - initial_in: Option, -} - -impl Prototype { - pub(crate) fn new(local_peer_id: PeerId, initial_in: Option) -> Self { - Self { - local_peer_id, - initial_in, - } - } -} - -impl IntoConnectionHandler for Prototype { - type Handler = Either; - - fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - if endpoint.is_relayed() { - if let Some(event) = self.initial_in { - debug!( - "Established relayed instead of direct connection to {:?}, \ - dropping initial in event {:?}.", - remote_peer_id, event - ); - } - - // Deny all substreams on relayed connection. - Either::Right(dummy::ConnectionHandler) - } else { - let mut handler = Handler { - remote_peer_id: *remote_peer_id, - remote_addr: endpoint.get_remote_address().clone(), - local_peer_id: self.local_peer_id, - queued_events: Default::default(), - pending_error: Default::default(), - reservation: Reservation::None, - alive_lend_out_substreams: Default::default(), - circuit_deny_futs: Default::default(), - send_error_futs: Default::default(), - keep_alive: KeepAlive::Yes, - }; - - if let Some(event) = self.initial_in { - handler.on_behaviour_event(event) - } - - Either::Left(handler) - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - upgrade::EitherUpgrade::A(SendWrapper(inbound_stop::Upgrade {})) - } -} - pub struct Handler { local_peer_id: PeerId, remote_peer_id: PeerId, @@ -212,6 +154,23 @@ pub struct Handler { send_error_futs: FuturesUnordered>, } +impl Handler { + pub fn new(local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr) -> Self { + Self { + local_peer_id, + remote_peer_id, + remote_addr, + pending_error: Default::default(), + keep_alive: KeepAlive::Yes, + queued_events: Default::default(), + reservation: Reservation::None, + alive_lend_out_substreams: Default::default(), + circuit_deny_futs: Default::default(), + send_error_futs: Default::default(), + } + } +} + impl Handler { fn on_fully_negotiated_inbound( &mut self, diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index 42ccdd69069..b6be9d9197d 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -25,14 +25,16 @@ pub mod rate_limiter; use crate::v2::message_proto; use crate::v2::protocol::inbound_hop; +use crate::v2::relay::handler::Handler; use either::Either; use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::multiaddr::Protocol; -use libp2p_core::PeerId; +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + dummy, ConnectionHandlerUpgrErr, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; @@ -249,16 +251,26 @@ impl Relay { } impl NetworkBehaviour for Relay { - type ConnectionHandler = handler::Prototype; + type ConnectionHandler = Either; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { - handler::Prototype { - config: handler::Config { - reservation_duration: self.config.reservation_duration, - max_circuit_duration: self.config.max_circuit_duration, - max_circuit_bytes: self.config.max_circuit_bytes, - }, + fn new_handler( + &mut self, + _: &PeerId, + endpoint: &ConnectedPoint, + ) -> Result> { + if endpoint.is_relayed() { + // Deny all substreams on relayed connection. + Ok(Either::Right(dummy::ConnectionHandler)) + } else { + Ok(Either::Left(Handler::new( + endpoint.clone(), + handler::Config { + reservation_duration: self.config.reservation_duration, + max_circuit_duration: self.config.max_circuit_duration, + max_circuit_bytes: self.config.max_circuit_bytes, + }, + ))) } } @@ -640,7 +652,8 @@ impl NetworkBehaviour for Relay { &mut self, _cx: &mut Context<'_>, poll_parameters: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(action) = self.queued_actions.pop_front() { return Poll::Ready(action.build(poll_parameters)); } @@ -743,7 +756,7 @@ impl Add for CircuitId { /// before being returned in [`Relay::poll`]. #[allow(clippy::large_enum_variant)] enum Action { - Done(NetworkBehaviourAction), + Done(NetworkBehaviourAction>>), AcceptReservationPrototype { inbound_reservation_req: inbound_hop::ReservationReq, handler: NotifyHandler, @@ -751,8 +764,15 @@ enum Action { }, } -impl From> for Action { - fn from(action: NetworkBehaviourAction) -> Self { +impl From>>> + for Action +{ + fn from( + action: NetworkBehaviourAction< + Event, + THandlerInEvent>, + >, + ) -> Self { Self::Done(action) } } @@ -761,7 +781,8 @@ impl Action { fn build( self, poll_parameters: &mut impl PollParameters, - ) -> NetworkBehaviourAction { + ) -> NetworkBehaviourAction>> + { match self { Action::Done(action) => action, Action::AcceptReservationPrototype { diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index ef8b40755b2..e82ef6f7211 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -23,7 +23,6 @@ use crate::v2::message_proto::Status; use crate::v2::protocol::{inbound_hop, outbound_stop}; use crate::v2::relay::CircuitId; use bytes::Bytes; -use either::Either; use futures::channel::oneshot::{self, Canceled}; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::io::AsyncWriteExt; @@ -35,11 +34,11 @@ use libp2p_core::either::EitherError; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + ListenUpgradeError, }; use libp2p_swarm::{ - dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -339,43 +338,6 @@ impl fmt::Debug for Event { } } -pub struct Prototype { - pub config: Config, -} - -impl IntoConnectionHandler for Prototype { - type Handler = Either; - - fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - if endpoint.is_relayed() { - // Deny all substreams on relayed connection. - Either::Right(dummy::ConnectionHandler) - } else { - Either::Left(Handler { - endpoint: endpoint.clone(), - config: self.config, - queued_events: Default::default(), - pending_error: Default::default(), - reservation_request_future: Default::default(), - circuit_accept_futures: Default::default(), - circuit_deny_futures: Default::default(), - alive_lend_out_substreams: Default::default(), - circuits: Default::default(), - active_reservation: Default::default(), - keep_alive: KeepAlive::Yes, - }) - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - upgrade::EitherUpgrade::A(SendWrapper(inbound_hop::Upgrade { - reservation_duration: self.config.reservation_duration, - max_circuit_duration: self.config.max_circuit_duration, - max_circuit_bytes: self.config.max_circuit_bytes, - })) - } -} - /// [`ConnectionHandler`] that manages substreams for a relay on a single /// connection with a peer. pub struct Handler { @@ -432,6 +394,22 @@ pub struct Handler { } impl Handler { + pub fn new(endpoint: ConnectedPoint, config: Config) -> Self { + Self { + endpoint, + config, + queued_events: Default::default(), + pending_error: Default::default(), + keep_alive: KeepAlive::Yes, + reservation_request_future: Default::default(), + active_reservation: Default::default(), + circuit_accept_futures: Default::default(), + circuit_deny_futures: Default::default(), + alive_lend_out_substreams: Default::default(), + circuits: Default::default(), + } + } + fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 173831d95d6..9406ec82753 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -22,7 +22,7 @@ use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, use crate::handler; use crate::handler::outbound; use crate::handler::outbound::OpenInfo; -use crate::substream_handler::SubstreamConnectionHandler; +use crate::substream_handler::{InEvent, SubstreamConnectionHandler}; use futures::future::BoxFuture; use futures::future::FutureExt; use futures::stream::FuturesUnordered; @@ -31,22 +31,19 @@ use instant::Duration; use libp2p_core::connection::ConnectionId; use libp2p_core::identity::error::SigningError; use libp2p_core::identity::Keypair; -use libp2p_core::{Multiaddr, PeerId, PeerRecord}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PeerRecord}; use libp2p_swarm::behaviour::FromSwarm; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::{ CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; use std::task::{Context, Poll}; +use void::Void; pub struct Behaviour { - events: VecDeque< - NetworkBehaviourAction< - Event, - SubstreamConnectionHandler, - >, - >, + events: VecDeque>>, keypair: Keypair, pending_register_requests: Vec<(Namespace, PeerId, Option)>, @@ -169,10 +166,16 @@ impl NetworkBehaviour for Behaviour { SubstreamConnectionHandler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { let initial_keep_alive = Duration::from_secs(30); - SubstreamConnectionHandler::new_outbound_only(initial_keep_alive) + Ok(SubstreamConnectionHandler::new_outbound_only( + initial_keep_alive, + )) } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { @@ -216,7 +219,8 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, poll_params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -290,12 +294,7 @@ fn handle_outbound_event( peer_id: PeerId, discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, expiring_registrations: &mut FuturesUnordered>, -) -> Vec< - NetworkBehaviourAction< - Event, - SubstreamConnectionHandler, - >, -> { +) -> Vec>> { match event { outbound::OutEvent::Registered { namespace, ttl } => { vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 4126b6e3e28..2c10f14253c 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -20,7 +20,7 @@ use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; use crate::handler::inbound; -use crate::substream_handler::{InboundSubstreamId, SubstreamConnectionHandler}; +use crate::substream_handler::{InEvent, InboundSubstreamId, SubstreamConnectionHandler}; use crate::{handler, MAX_TTL, MIN_TTL}; use bimap::BiMap; use futures::future::BoxFuture; @@ -28,8 +28,9 @@ use futures::ready; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use libp2p_core::connection::ConnectionId; -use libp2p_core::PeerId; +use libp2p_core::{ConnectedPoint, PeerId}; use libp2p_swarm::behaviour::FromSwarm; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::{ CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; @@ -40,9 +41,7 @@ use std::time::Duration; use void::Void; pub struct Behaviour { - events: VecDeque< - NetworkBehaviourAction>, - >, + events: VecDeque>>, registrations: Registrations, } @@ -113,10 +112,16 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = SubstreamConnectionHandler; type OutEvent = Event; - fn new_handler(&mut self) -> Self::ConnectionHandler { + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { let initial_keep_alive = Duration::from_secs(30); - SubstreamConnectionHandler::new_inbound_only(initial_keep_alive) + Ok(SubstreamConnectionHandler::new_inbound_only( + initial_keep_alive, + )) } fn on_connection_handler_event( @@ -148,7 +153,8 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { return Poll::Ready(NetworkBehaviourAction::GenerateEvent( Event::RegistrationExpired(registration), @@ -186,7 +192,7 @@ fn handle_inbound_event( connection: ConnectionId, id: InboundSubstreamId, registrations: &mut Registrations, -) -> Vec>> { +) -> Vec>> { match event { // bad registration inbound::OutEvent::RegistrationRequested(registration) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 68c6212579c..3cf0f94c0e2 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -67,10 +67,11 @@ pub use handler::ProtocolSupport; use futures::channel::oneshot; use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_swarm::behaviour::THandlerInEvent; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, dial_opts::DialOpts, - IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use smallvec::SmallVec; use std::{ @@ -316,7 +317,7 @@ where pending_events: VecDeque< NetworkBehaviourAction< RequestResponseEvent, - RequestResponseHandler, + RequestProtocol, >, >, /// The currently connected peers, their pending outbound and inbound responses and their known, @@ -385,10 +386,8 @@ where }; if let Some(request) = self.try_send_request(peer, request) { - let handler = self.new_handler(); self.pending_events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(*peer).build(), - handler, }); self.pending_outbound_requests .entry(*peer) @@ -664,10 +663,7 @@ where } } - fn on_dial_failure( - &mut self, - DialFailure { peer_id, .. }: DialFailure<::ConnectionHandler>, - ) { + fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) { if let Some(peer) = peer_id { // If there are pending outgoing requests when a dial failure occurs, // it is implied that we are not connected to the peer, since pending @@ -698,14 +694,18 @@ where type ConnectionHandler = RequestResponseHandler; type OutEvent = RequestResponseEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - RequestResponseHandler::new( + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(RequestResponseHandler::new( self.inbound_protocols.clone(), self.codec.clone(), self.config.connection_keep_alive, self.config.request_timeout, self.next_inbound_id.clone(), - ) + )) } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { @@ -744,8 +744,7 @@ where &mut self, peer: PeerId, connection: ConnectionId, - event: <::Handler as - libp2p_swarm::ConnectionHandler>::OutEvent, + event: ::OutEvent, ) { match event { RequestResponseHandlerEvent::Response { @@ -897,7 +896,8 @@ where &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 4af70e6a84a..66ad8887a54 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -55,11 +55,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let trait_to_impl = quote! { #prelude_path::NetworkBehaviour }; let either_ident = quote! { #prelude_path::EitherOutput }; let network_behaviour_action = quote! { #prelude_path::NetworkBehaviourAction }; - let into_connection_handler = quote! { #prelude_path::IntoConnectionHandler }; let connection_handler = quote! { #prelude_path::ConnectionHandler }; - let into_proto_select_ident = quote! { #prelude_path::IntoConnectionHandlerSelect }; + let proto_select_ident = quote! { #prelude_path::ConnectionHandlerSelect }; let peer_id = quote! { #prelude_path::PeerId }; let connection_id = quote! { #prelude_path::ConnectionId }; + let connected_point = quote! { #prelude_path::ConnectedPoint }; let poll_parameters = quote! { #prelude_path::PollParameters }; let from_swarm = quote! { #prelude_path::FromSwarm }; let connection_established = quote! { #prelude_path::ConnectionEstablished }; @@ -254,65 +254,37 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .fields .iter() .enumerate() - // The outmost handler belongs to the last behaviour. - .rev() - .enumerate() - .map(|(enum_n, (field_n, field))| { - let handler = if field_n == 0 { - // Given that the iterator is reversed, this is the innermost handler only. - quote! { let handler = handlers } - } else { - quote! { - let (handlers, handler) = handlers.into_inner() - } - }; - - let inject = match field.ident { - Some(ref i) => quote! { + .map(move |(field_n, field)| match field.ident { + Some(ref i) => quote! { #[allow(deprecated)] - self.#i.inject_dial_failure(peer_id, handler, error);}, - None => quote! { + self.#i.inject_dial_failure(peer_id, error); + }, + None => quote! { #[allow(deprecated)] - self.#enum_n.inject_dial_failure(peer_id, handler, error);}, - }; - - quote! { - #handler; - #inject; - } + self.#field_n.inject_dial_failure(peer_id, error); + }, }) }; - // Build the list of statements to put in the body of `on_swarm_event()` - // for the `FromSwarm::ListenFailure` variant. - let on_listen_failure_stmts = - { - data_struct.fields.iter().enumerate().rev().enumerate().map( - |(enum_n, (field_n, field))| { - let handler = if field_n == 0 { - quote! { let handler = handlers } - } else { - quote! { - let (handlers, handler) = handlers.into_inner() - } - }; - - let inject = match field.ident { - Some(ref i) => quote! { - #[allow(deprecated)] - self.#i.inject_listen_failure(local_addr, send_back_addr, handler);}, - None => quote! { - #[allow(deprecated)] - self.#enum_n.inject_listen_failure(local_addr, send_back_addr, handler);}, - }; - + // Build the list of statements to put in the body of `inject_listen_failure()`. + let on_listen_failure_stmts = { + data_struct + .fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => { quote! { - #handler; - #inject; + #[allow(deprecated)] + self.#i.inject_listen_failure(local_addr, send_back_addr); } + } + None => quote! { + #[allow(deprecated)] + self.#field_n.inject_listen_failure(local_addr, send_back_addr); }, - ) - }; + }) + }; // Build the list of statements to put in the body of `on_swarm_event()` // for the `FromSwarm::NewListener` variant. @@ -471,7 +443,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let ty = &field.ty; let field_info = quote! { <#ty as #trait_to_impl>::ConnectionHandler }; match ph_ty { - Some(ev) => ph_ty = Some(quote! { #into_proto_select_ident<#ev, #field_info> }), + Some(ev) => ph_ty = Some(quote! { #proto_select_ident<#ev, #field_info> }), ref mut ev @ None => *ev = Some(field_info), } } @@ -491,13 +463,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }; let builder = quote! { - #field_name.new_handler() + #field_name.new_handler(peer, connected_point)? }; match out_handler { - Some(h) => { - out_handler = Some(quote! { #into_connection_handler::select(#h, #builder) }) - } + Some(h) => out_handler = Some(quote! { #connection_handler::select(#h, #builder) }), ref mut h @ None => *h = Some(builder), } } @@ -523,38 +493,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { wrapped_event = quote!{ #either_ident::First(#wrapped_event) }; } - // `Dial` provides a handler of the specific behaviour triggering the - // event. Though in order for the final handler to be able to handle - // protocols of all behaviours, the provided handler needs to be - // combined with handlers of all other behaviours. - let provided_handler_and_new_handlers = { - let mut out_handler = None; - - for (f_n, f) in data_struct.fields.iter().enumerate() { - let f_name = match f.ident { - Some(ref i) => quote! { self.#i }, - None => quote! { self.#f_n }, - }; - - let builder = if field_n == f_n { - // The behaviour that triggered the event. Thus, instead of - // creating a new handler, use the provided handler. - quote! { provided_handler } - } else { - quote! { #f_name.new_handler() } - }; - - match out_handler { - Some(h) => { - out_handler = Some(quote! { #into_connection_handler::select(#h, #builder) }) - } - ref mut h @ None => *h = Some(builder), - } - } - - out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`. - }; - let generate_event_match_arm = { // If the `NetworkBehaviour`'s `OutEvent` is generated by the derive macro, wrap the sub // `NetworkBehaviour` `OutEvent` in the variant of the generated `OutEvent`. If the @@ -582,8 +520,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { loop { match #trait_to_impl::poll(&mut self.#field, cx, poll_params) { #generate_event_match_arm - std::task::Poll::Ready(#network_behaviour_action::Dial { opts, handler: provided_handler }) => { - return std::task::Poll::Ready(#network_behaviour_action::Dial { opts, handler: #provided_handler_and_new_handlers }); + std::task::Poll::Ready(#network_behaviour_action::Dial { opts }) => { + return std::task::Poll::Ready(#network_behaviour_action::Dial { opts }); } std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => { return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { @@ -620,9 +558,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { type ConnectionHandler = #connection_handler_ty; type OutEvent = #out_event_reference; - fn new_handler(&mut self) -> Self::ConnectionHandler { - use #into_connection_handler; - #new_handler + #[allow(clippy::needless_question_mark)] + fn new_handler(&mut self, peer: &#peer_id, connected_point: &#connected_point) -> Result> { + use #connection_handler; + + Ok(#new_handler) } fn addresses_of_peer(&mut self, peer_id: &#peer_id) -> Vec<#multiaddr> { @@ -635,14 +575,14 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { &mut self, peer_id: #peer_id, connection_id: #connection_id, - event: <::Handler as #connection_handler>::OutEvent + event: ::OutEvent ) { match event { #(#on_node_event_stmts),* } } - fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action> { + fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action::InEvent>> { use #prelude_path::futures::*; #(#poll_stmts)* std::task::Poll::Pending @@ -660,10 +600,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #connection_closed { peer_id, connection_id, endpoint, handler: handlers, remaining_established }) => { #(#on_connection_closed_stmts)* } #from_swarm::DialFailure( - #dial_failure { peer_id, handler: handlers, error }) + #dial_failure { peer_id, error }) => { #(#on_dial_failure_stmts)* } #from_swarm::ListenFailure( - #listen_failure { local_addr, send_back_addr, handler: handlers }) + #listen_failure { local_addr, send_back_addr }) => { #(#on_listen_failure_stmts)* } #from_swarm::NewListener( #new_listener { listener_id }) @@ -686,7 +626,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #from_swarm::ListenerClosed( #listener_closed { listener_id, reason }) => { #(#on_listener_closed_stmts)* } - _ => {} } } } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 546ae75f7a6..2f7f1a84048 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -82,7 +82,8 @@ ``` - Without - Spawns the tasks on the current task, this may result in bad performance so try to use an executor where possible. Previously this was just a fallback when no executor was specified and constructing a `ThreadPool` failed. + Spawns the tasks on the current task, this may result in bad performance so try to use an executor where possible. + Previously this was just a fallback when no executor was specified and constructing a `ThreadPool` failed. New ```rust @@ -96,6 +97,10 @@ - Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. +- Remove `IntoConnectionHandler` abstraction and change the signature of `NetworkBehaviour::new_handler` to accept `PeerId` and `ConnectedPoint`. + Previously, this information was only available as part of `IntoConnectionHandler::into_handler` but it is now passed to the `NetworkBehaviour` directly. + See [PR 3099]. + [PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 6240e570888..07cb18f3e84 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -22,7 +22,7 @@ mod either; pub mod toggle; use crate::dial_opts::DialOpts; -use crate::handler::{ConnectionHandler, IntoConnectionHandler}; +use crate::handler::ConnectionHandler; use crate::{AddressRecord, AddressScore, DialError}; use libp2p_core::{ connection::ConnectionId, transport::ListenerId, ConnectedPoint, Multiaddr, PeerId, @@ -30,11 +30,9 @@ use libp2p_core::{ use std::{task::Context, task::Poll}; /// Custom event that can be received by the [`ConnectionHandler`]. -pub(crate) type THandlerInEvent = - <::Handler as ConnectionHandler>::InEvent; +pub type THandlerInEvent = ::InEvent; -pub(crate) type THandlerOutEvent = - <::Handler as ConnectionHandler>::OutEvent; +pub type THandlerOutEvent = ::OutEvent; /// A [`NetworkBehaviour`] defines the behaviour of the local node on the network. /// @@ -120,7 +118,7 @@ pub(crate) type THandlerOutEvent = /// ``` pub trait NetworkBehaviour: 'static { /// Handler for all the protocols the network behaviour supports. - type ConnectionHandler: IntoConnectionHandler; + type ConnectionHandler: ConnectionHandler; /// Event generated by the `NetworkBehaviour` and that the swarm will report back. type OutEvent: Send + 'static; @@ -141,7 +139,11 @@ pub trait NetworkBehaviour: 'static { /// /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and /// connection closing. - fn new_handler(&mut self) -> Self::ConnectionHandler; + fn new_handler( + &mut self, + peer: &PeerId, + connected_point: &ConnectedPoint, + ) -> Result>; /// Addresses that this behaviour is aware of for this specific peer, and that may allow /// reaching the peer. @@ -165,8 +167,7 @@ pub trait NetworkBehaviour: 'static { &mut self, _peer_id: PeerId, _connection_id: ConnectionId, - _event: <::Handler as - ConnectionHandler>::OutEvent, + _event: ::OutEvent, ) { } @@ -207,7 +208,7 @@ pub trait NetworkBehaviour: 'static { peer_id: &PeerId, connection_id: &ConnectionId, endpoint: &ConnectedPoint, - handler: ::Handler, + handler: Self::ConnectionHandler, remaining_established: usize, ) { self.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { @@ -252,7 +253,7 @@ pub trait NetworkBehaviour: 'static { &mut self, peer_id: PeerId, connection: ConnectionId, - event: <::Handler as ConnectionHandler>::OutEvent, + event: ::OutEvent, ) { self.on_connection_handler_event(peer_id, connection, event); } @@ -262,17 +263,8 @@ pub trait NetworkBehaviour: 'static { since = "0.40.2", note = "Handle `InEvent::DialFailure` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it." )] - fn inject_dial_failure( - &mut self, - peer_id: Option, - handler: Self::ConnectionHandler, - error: &DialError, - ) { - self.on_swarm_event(FromSwarm::DialFailure(DialFailure { - peer_id, - handler, - error, - })); + fn inject_dial_failure(&mut self, peer_id: Option, error: &DialError) { + self.on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id, error })); } /// Indicates to the behaviour that an error happened on an incoming connection during its @@ -284,16 +276,10 @@ pub trait NetworkBehaviour: 'static { since = "0.40.2", note = "Handle `FromSwarm::ListenFailure` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it." )] - fn inject_listen_failure( - &mut self, - local_addr: &Multiaddr, - send_back_addr: &Multiaddr, - handler: Self::ConnectionHandler, - ) { + fn inject_listen_failure(&mut self, local_addr: &Multiaddr, send_back_addr: &Multiaddr) { self.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, - handler, })); } @@ -381,7 +367,7 @@ pub trait NetworkBehaviour: 'static { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>; + ) -> Poll>>; } /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. @@ -420,200 +406,14 @@ pub trait PollParameters { // [`NetworkBehaviourAction::map_in`], mapping the handler `InEvent` leaving the // handler itself untouched. #[derive(Debug)] -pub enum NetworkBehaviourAction< - TOutEvent, - THandler: IntoConnectionHandler, - TInEvent = THandlerInEvent, -> { +pub enum NetworkBehaviourAction { /// Instructs the `Swarm` to return an event when it is being polled. GenerateEvent(TOutEvent), /// Instructs the swarm to start a dial. /// - /// On success, [`NetworkBehaviour::inject_connection_established`] is invoked. - /// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. - /// - /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure - /// and connection closing. Thus it can be used to carry state, which otherwise would have to be - /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer - /// can be included in the handler, and thus directly send on connection success or extracted by - /// the [`NetworkBehaviour`] on connection failure. - /// - /// # Example carrying state in the handler - /// - /// ```rust - /// # use futures::executor::block_on; - /// # use futures::stream::StreamExt; - /// # use libp2p_core::connection::ConnectionId; - /// # use libp2p_core::identity; - /// # use libp2p_core::transport::{MemoryTransport, Transport}; - /// # use libp2p_core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; - /// # use libp2p_core::PeerId; - /// # use libp2p_plaintext::PlainText2Config; - /// # use libp2p_swarm::{ - /// # DialError, IntoConnectionHandler, KeepAlive, NegotiatedSubstream, - /// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionHandler, - /// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, - /// # }; - /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition}; - /// # use libp2p_yamux as yamux; - /// # use std::collections::VecDeque; - /// # use std::task::{Context, Poll}; - /// # use void::Void; - /// # - /// # let local_key = identity::Keypair::generate_ed25519(); - /// # let local_public_key = local_key.public(); - /// # let local_peer_id = PeerId::from(local_public_key.clone()); - /// # - /// # let transport = MemoryTransport::default() - /// # .upgrade(upgrade::Version::V1) - /// # .authenticate(PlainText2Config { local_public_key }) - /// # .multiplex(yamux::YamuxConfig::default()) - /// # .boxed(); - /// # - /// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id); - /// # - /// // Super precious message that we should better not lose. - /// let message = PreciousMessage("My precious message".to_string()); - /// - /// // Unfortunately this peer is offline, thus sending our message to it will fail. - /// let offline_peer = PeerId::random(); - /// - /// // Let's send it anyways. We should get it back in case connecting to the peer fails. - /// swarm.behaviour_mut().send(offline_peer, message); - /// - /// block_on(async { - /// // As expected, sending failed. But great news, we got our message back. - /// matches!( - /// swarm.next().await.expect("Infinite stream"), - /// SwarmEvent::Behaviour(PreciousMessage(_)) - /// ); - /// }); - /// - /// #[derive(Default)] - /// struct MyBehaviour { - /// outbox_to_swarm: VecDeque>, - /// } - /// - /// impl MyBehaviour { - /// fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) { - /// self.outbox_to_swarm - /// .push_back(NetworkBehaviourAction::Dial { - /// opts: DialOpts::peer_id(peer_id) - /// .condition(PeerCondition::Always) - /// .build(), - /// handler: MyHandler { message: Some(msg) }, - /// }); - /// } - /// } - /// # - /// impl NetworkBehaviour for MyBehaviour { - /// # type ConnectionHandler = MyHandler; - /// # type OutEvent = PreciousMessage; - /// # - /// # fn new_handler(&mut self) -> Self::ConnectionHandler { - /// # MyHandler { message: None } - /// # } - /// # - /// # - /// # fn inject_event( - /// # &mut self, - /// # _: PeerId, - /// # _: ConnectionId, - /// # _: <::Handler as ConnectionHandler>::OutEvent, - /// # ) { - /// # unreachable!(); - /// # } - /// # - /// fn inject_dial_failure( - /// &mut self, - /// _: Option, - /// handler: Self::ConnectionHandler, - /// _: &DialError, - /// ) { - /// // As expected, sending the message failed. But lucky us, we got the handler back, thus - /// // the precious message is not lost and we can return it back to the user. - /// let msg = handler.message.unwrap(); - /// self.outbox_to_swarm - /// .push_back(NetworkBehaviourAction::GenerateEvent(msg)) - /// } - /// # - /// # fn poll( - /// # &mut self, - /// # _: &mut Context<'_>, - /// # _: &mut impl PollParameters, - /// # ) -> Poll> { - /// # if let Some(action) = self.outbox_to_swarm.pop_front() { - /// # return Poll::Ready(action); - /// # } - /// # Poll::Pending - /// # } - /// } - /// - /// # struct MyHandler { - /// # message: Option, - /// # } - /// # - /// # impl ConnectionHandler for MyHandler { - /// # type InEvent = Void; - /// # type OutEvent = Void; - /// # type Error = Void; - /// # type InboundProtocol = DeniedUpgrade; - /// # type OutboundProtocol = DeniedUpgrade; - /// # type InboundOpenInfo = (); - /// # type OutboundOpenInfo = Void; - /// # - /// # fn listen_protocol( - /// # &self, - /// # ) -> SubstreamProtocol { - /// # SubstreamProtocol::new(DeniedUpgrade, ()) - /// # } - /// # - /// # fn inject_fully_negotiated_inbound( - /// # &mut self, - /// # _: >::Output, - /// # _: Self::InboundOpenInfo, - /// # ) { - /// # } - /// # - /// # fn inject_fully_negotiated_outbound( - /// # &mut self, - /// # _: >::Output, - /// # _: Self::OutboundOpenInfo, - /// # ) { - /// # } - /// # - /// # fn inject_event(&mut self, _event: Self::InEvent) {} - /// # - /// # fn inject_dial_upgrade_error( - /// # &mut self, - /// # _: Self::OutboundOpenInfo, - /// # _: ConnectionHandlerUpgrErr, - /// # ) { - /// # } - /// # - /// # fn connection_keep_alive(&self) -> KeepAlive { - /// # KeepAlive::Yes - /// # } - /// # - /// # fn poll( - /// # &mut self, - /// # _: &mut Context<'_>, - /// # ) -> Poll< - /// # ConnectionHandlerEvent< - /// # Self::OutboundProtocol, - /// # Self::OutboundOpenInfo, - /// # Self::OutEvent, - /// # Self::Error, - /// # >, - /// # > { - /// # todo!("If `Self::message.is_some()` send the message to the remote.") - /// # } - /// # } - /// # #[derive(Debug, PartialEq, Eq)] - /// # struct PreciousMessage(String); - /// ``` - Dial { opts: DialOpts, handler: THandler }, + /// In case the dial fails, the behaviour is notified via [`NetworkBehaviour::inject_dial_failure`]. + Dial { opts: DialOpts }, /// Instructs the `Swarm` to send an event to the handler dedicated to a /// connection with a peer. @@ -673,19 +473,12 @@ pub enum NetworkBehaviourAction< }, } -impl - NetworkBehaviourAction -{ +impl NetworkBehaviourAction { /// Map the handler event. - pub fn map_in( - self, - f: impl FnOnce(TInEventOld) -> TInEventNew, - ) -> NetworkBehaviourAction { + pub fn map_in(self, f: impl FnOnce(TInEvent) -> E) -> NetworkBehaviourAction { match self { NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts, handler } => { - NetworkBehaviourAction::Dial { opts, handler } - } + NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, @@ -707,16 +500,12 @@ impl }, } } -} -impl NetworkBehaviourAction { /// Map the event the swarm will return. - pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction { + pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction { match self { NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)), - NetworkBehaviourAction::Dial { opts, handler } => { - NetworkBehaviourAction::Dial { opts, handler } - } + NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, @@ -740,93 +529,6 @@ impl NetworkBehaviourAction NetworkBehaviourAction -where - THandlerOld: IntoConnectionHandler, - ::Handler: ConnectionHandler, -{ - /// Map the handler. - pub fn map_handler( - self, - f: impl FnOnce(THandlerOld) -> THandlerNew, - ) -> NetworkBehaviourAction - where - THandlerNew: IntoConnectionHandler, - ::Handler: ConnectionHandler, - { - match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts, handler } => NetworkBehaviourAction::Dial { - opts, - handler: f(handler), - }, - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - } => NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } - } - NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - } => NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }, - } - } -} - -impl NetworkBehaviourAction -where - THandlerOld: IntoConnectionHandler, - ::Handler: ConnectionHandler, -{ - /// Map the handler and handler event. - pub fn map_handler_and_in( - self, - f_handler: impl FnOnce(THandlerOld) -> THandlerNew, - f_in_event: impl FnOnce(TInEventOld) -> TInEventNew, - ) -> NetworkBehaviourAction - where - THandlerNew: IntoConnectionHandler, - ::Handler: ConnectionHandler, - { - match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts, handler } => NetworkBehaviourAction::Dial { - opts, - handler: f_handler(handler), - }, - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - } => NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event: f_in_event(event), - }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } - } - NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - } => NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }, - } - } -} - /// The options w.r.t. which connection handler to notify of an event. #[derive(Debug, Clone)] pub enum NotifyHandler { @@ -853,7 +555,7 @@ impl Default for CloseConnection { /// Enumeration with the list of the possible events /// to pass to [`on_swarm_event`](NetworkBehaviour::on_swarm_event). -pub enum FromSwarm<'a, Handler: IntoConnectionHandler> { +pub enum FromSwarm<'a, Handler> { /// Informs the behaviour about a newly established connection to a peer. ConnectionEstablished(ConnectionEstablished<'a>), /// Informs the behaviour about a closed connection to a peer. @@ -867,13 +569,13 @@ pub enum FromSwarm<'a, Handler: IntoConnectionHandler> { AddressChange(AddressChange<'a>), /// Informs the behaviour that the dial to a known /// or unknown node failed. - DialFailure(DialFailure<'a, Handler>), + DialFailure(DialFailure<'a>), /// Informs the behaviour that an error /// happened on an incoming connection during its initial handshake. /// /// This can include, for example, an error during the handshake of the encryption layer, or the /// connection unexpectedly closed. - ListenFailure(ListenFailure<'a, Handler>), + ListenFailure(ListenFailure<'a>), /// Informs the behaviour that a new listener was created. NewListener(NewListener), /// Informs the behaviour that we have started listening on a new multiaddr. @@ -907,11 +609,11 @@ pub struct ConnectionEstablished<'a> { /// This event is always paired with an earlier /// [`FromSwarm::ConnectionEstablished`] with the same peer ID, connection ID /// and endpoint. -pub struct ConnectionClosed<'a, Handler: IntoConnectionHandler> { +pub struct ConnectionClosed<'a, Handler> { pub peer_id: PeerId, pub connection_id: ConnectionId, pub endpoint: &'a ConnectedPoint, - pub handler: ::Handler, + pub handler: Handler, pub remaining_established: usize, } @@ -928,9 +630,8 @@ pub struct AddressChange<'a> { /// [`FromSwarm`] variant that informs the behaviour that the dial to a known /// or unknown node failed. #[derive(Clone, Copy)] -pub struct DialFailure<'a, Handler> { +pub struct DialFailure<'a> { pub peer_id: Option, - pub handler: Handler, pub error: &'a DialError, } @@ -940,10 +641,9 @@ pub struct DialFailure<'a, Handler> { /// This can include, for example, an error during the handshake of the encryption layer, or the /// connection unexpectedly closed. #[derive(Clone, Copy)] -pub struct ListenFailure<'a, Handler> { +pub struct ListenFailure<'a> { pub local_addr: &'a Multiaddr, pub send_back_addr: &'a Multiaddr, - pub handler: Handler, } /// [`FromSwarm`] variant that informs the behaviour that a new listener was created. @@ -996,31 +696,19 @@ pub struct ExpiredExternalAddr<'a> { pub addr: &'a Multiaddr, } -impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> { +impl<'a, Handler> FromSwarm<'a, Handler> { fn map_handler( self, - map_into_handler: impl FnOnce(Handler) -> NewHandler, - map_handler: impl FnOnce( - ::Handler, - ) -> ::Handler, - ) -> FromSwarm<'a, NewHandler> - where - NewHandler: IntoConnectionHandler, - { - self.maybe_map_handler(|h| Some(map_into_handler(h)), |h| Some(map_handler(h))) + map_handler: impl FnOnce(Handler) -> NewHandler, + ) -> FromSwarm<'a, NewHandler> { + self.maybe_map_handler(|h| Some(map_handler(h))) .expect("To return Some as all closures return Some.") } fn maybe_map_handler( self, - map_into_handler: impl FnOnce(Handler) -> Option, - map_handler: impl FnOnce( - ::Handler, - ) -> Option<::Handler>, - ) -> Option> - where - NewHandler: IntoConnectionHandler, - { + map_handler: impl FnOnce(Handler) -> Option, + ) -> Option> { match self { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, @@ -1059,23 +747,15 @@ impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> { old, new, })), - FromSwarm::DialFailure(DialFailure { - peer_id, - handler, - error, - }) => Some(FromSwarm::DialFailure(DialFailure { - peer_id, - handler: map_into_handler(handler)?, - error, - })), + FromSwarm::DialFailure(DialFailure { peer_id, error }) => { + Some(FromSwarm::DialFailure(DialFailure { peer_id, error })) + } FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, - handler, }) => Some(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, - handler: map_into_handler(handler)?, })), FromSwarm::NewListener(NewListener { listener_id }) => { Some(FromSwarm::NewListener(NewListener { listener_id })) @@ -1161,21 +841,16 @@ pub(crate) fn inject_from_swarm( #[allow(deprecated)] behaviour.inject_address_change(&peer_id, &connection_id, old, new); } - FromSwarm::DialFailure(DialFailure { - peer_id, - handler, - error, - }) => { + FromSwarm::DialFailure(DialFailure { peer_id, error }) => { #[allow(deprecated)] - behaviour.inject_dial_failure(peer_id, handler, error); + behaviour.inject_dial_failure(peer_id, error); } FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, - handler, }) => { #[allow(deprecated)] - behaviour.inject_listen_failure(local_addr, send_back_addr, handler); + behaviour.inject_listen_failure(local_addr, send_back_addr); } FromSwarm::NewListener(NewListener { listener_id }) => { #[allow(deprecated)] diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 4154db1a0de..f187392831f 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -18,12 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::behaviour::THandlerInEvent; use crate::behaviour::{ self, inject_from_swarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; -use crate::handler::either::IntoEitherHandler; use either::Either; -use libp2p_core::{Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; use std::{task::Context, task::Poll}; /// Implementation of [`NetworkBehaviour`] that can be either of two implementations. @@ -32,14 +32,18 @@ where L: NetworkBehaviour, R: NetworkBehaviour, { - type ConnectionHandler = IntoEitherHandler; + type ConnectionHandler = Either; type OutEvent = Either; - fn new_handler(&mut self) -> Self::ConnectionHandler { - match self { - Either::Left(a) => IntoEitherHandler::Left(a.new_handler()), - Either::Right(b) => IntoEitherHandler::Right(b.new_handler()), - } + fn new_handler( + &mut self, + peer: &PeerId, + connected_point: &ConnectedPoint, + ) -> Result> { + Ok(match self { + Either::Left(a) => Either::Left(a.new_handler(peer, connected_point)?), + Either::Right(b) => Either::Right(b.new_handler(peer, connected_point)?), + }) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -53,23 +57,17 @@ where match self { Either::Left(b) => inject_from_swarm( b, - event.map_handler( - |h| h.unwrap_left(), - |h| match h { - Either::Left(h) => h, - Either::Right(_) => unreachable!(), - }, - ), + event.map_handler(|h| match h { + Either::Left(h) => h, + Either::Right(_) => unreachable!(), + }), ), Either::Right(b) => inject_from_swarm( b, - event.map_handler( - |h| h.unwrap_right(), - |h| match h { - Either::Right(h) => h, - Either::Left(_) => unreachable!(), - }, - ), + event.map_handler(|h| match h { + Either::Right(h) => h, + Either::Left(_) => unreachable!(), + }), ), } } @@ -97,14 +95,19 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll< + NetworkBehaviourAction< + Self::OutEvent, + Either, THandlerInEvent>, + >, + > { let event = match self { Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params)) .map_out(Either::Left) - .map_handler_and_in(IntoEitherHandler::Left, Either::Left), + .map_in(Either::Left), Either::Right(behaviour) => futures::ready!(behaviour.poll(cx, params)) .map_out(Either::Right) - .map_handler_and_in(IntoEitherHandler::Right, Either::Right), + .map_in(Either::Right), }; Poll::Ready(event) diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 81255a40274..399058bfe32 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -18,11 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::behaviour::THandlerInEvent; use crate::behaviour::{inject_from_swarm, FromSwarm}; use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, - KeepAlive, ListenUpgradeError, SubstreamProtocol, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, + ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; @@ -68,13 +69,20 @@ impl NetworkBehaviour for Toggle where TBehaviour: NetworkBehaviour, { - type ConnectionHandler = ToggleIntoConnectionHandler; + type ConnectionHandler = ToggleConnectionHandler; type OutEvent = TBehaviour::OutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - ToggleIntoConnectionHandler { - inner: self.inner.as_mut().map(|i| i.new_handler()), - } + fn new_handler( + &mut self, + peer: &PeerId, + connected_point: &ConnectedPoint, + ) -> Result> { + Ok(ToggleConnectionHandler { + inner: match self.inner.as_mut() { + None => None, + Some(inner) => Some(inner.new_handler(peer, connected_point)?), + }, + }) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -86,7 +94,7 @@ where fn on_swarm_event(&mut self, event: FromSwarm) { if let Some(behaviour) = &mut self.inner { - if let Some(event) = event.maybe_map_handler(|h| h.inner, |h| h.inner) { + if let Some(event) = event.maybe_map_handler(|h| h.inner) { inject_from_swarm(behaviour, event); } } @@ -108,49 +116,16 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { if let Some(inner) = self.inner.as_mut() { - inner.poll(cx, params).map(|action| { - action.map_handler(|h| ToggleIntoConnectionHandler { inner: Some(h) }) - }) + inner.poll(cx, params) } else { Poll::Pending } } } -/// Implementation of `IntoConnectionHandler` that can be in the disabled state. -pub struct ToggleIntoConnectionHandler { - inner: Option, -} - -impl IntoConnectionHandler for ToggleIntoConnectionHandler -where - TInner: IntoConnectionHandler, -{ - type Handler = ToggleConnectionHandler; - - fn into_handler( - self, - remote_peer_id: &PeerId, - connected_point: &ConnectedPoint, - ) -> Self::Handler { - ToggleConnectionHandler { - inner: self - .inner - .map(|h| h.into_handler(remote_peer_id, connected_point)), - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - if let Some(inner) = self.inner.as_ref() { - EitherUpgrade::A(SendWrapper(inner.inbound_protocol())) - } else { - EitherUpgrade::B(SendWrapper(DeniedUpgrade)) - } - } -} - /// Implementation of [`ConnectionHandler`] that can be in the disabled state. pub struct ToggleConnectionHandler { inner: Option, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8729b2e36e1..65d842200d4 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -18,8 +18,8 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. - use crate::connection::Connection; +use crate::upgrade::UpgradeInfoSend; use crate::{ behaviour::{THandlerInEvent, THandlerOutEvent}, connection::{ @@ -27,7 +27,7 @@ use crate::{ PendingInboundConnectionError, PendingOutboundConnectionError, }, transport::{Transport, TransportError}, - ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId, + ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId, }; use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; @@ -40,6 +40,9 @@ use futures::{ }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use libp2p_core::ProtocolName; +use smallvec::SmallVec; +use std::error::Error; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -81,7 +84,7 @@ impl ExecSwitch { pub struct Pool where TTrans: Transport, - THandler: IntoConnectionHandler, + THandler: ConnectionHandler, { local_id: PeerId, @@ -89,16 +92,11 @@ where counters: ConnectionCounters, /// The managed connections of each peer that are currently considered established. - established: FnvHashMap< - PeerId, - FnvHashMap< - ConnectionId, - EstablishedConnection<::InEvent>, - >, - >, + established: + FnvHashMap>>, /// The pending connections that are currently being negotiated. - pending: HashMap>, + pending: HashMap, /// Next available identifier for a new connection / task. next_connection_id: ConnectionId, @@ -130,12 +128,10 @@ where /// Sender distributed to established tasks for reporting events back /// to the pool. - established_connection_events_tx: - mpsc::Sender>, + established_connection_events_tx: mpsc::Sender>, /// Receiver for events reported from established tasks. - established_connection_events_rx: - mpsc::Receiver>, + established_connection_events_rx: mpsc::Receiver>, } #[derive(Debug)] @@ -187,17 +183,15 @@ impl EstablishedConnection { } } -struct PendingConnection { +struct PendingConnection { /// [`PeerId`] of the remote peer. peer_id: Option, - /// Handler to handle connection once no longer pending but established. - handler: THandler, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. abort_notifier: Option>, } -impl PendingConnection { +impl PendingConnection { fn is_for_same_remote_as(&self, other: PeerId) -> bool { self.peer_id.map_or(false, |peer| peer == other) } @@ -210,7 +204,7 @@ impl PendingConnection { } } -impl fmt::Debug for Pool { +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -220,7 +214,8 @@ impl fmt::Debug for Pool +#[allow(clippy::large_enum_variant)] +pub enum PoolEvent where TTrans: Transport, { @@ -237,6 +232,8 @@ where /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. concurrent_dial_errors: Option)>>, + + supported_protocols: SmallVec<[Vec; 16]>, }, /// An established connection was closed. @@ -256,10 +253,21 @@ where connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option::Error>>, + error: Option>, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, - handler: THandler::Handler, + handler: THandler, + }, + + /// A [NetworkBehaviour] denied a just established connection by not producing a [`ConnectionHandler`] from [`NetworkBehaviour::new_handler`]. + /// + /// [NetworkBehaviour]: crate::NetworkBehaviour + /// [NetworkBehaviour::new_handler]: crate::NetworkBehaviour::new_handler + ConnectionDenied { + id: ConnectionId, + peer_id: PeerId, + endpoint: ConnectedPoint, + cause: Box, }, /// An outbound connection attempt failed. @@ -268,8 +276,6 @@ where id: ConnectionId, /// The error that occurred. error: PendingOutboundConnectionError, - /// The handler that was supposed to handle the connection. - handler: THandler, /// The (expected) peer of the failed connection. peer: Option, }, @@ -284,8 +290,6 @@ where local_addr: Multiaddr, /// The error that occurred. error: PendingInboundConnectionError, - /// The handler that was supposed to handle the connection. - handler: THandler, }, /// A node has produced an event. @@ -309,7 +313,7 @@ where impl Pool where - THandler: IntoConnectionHandler, + THandler: ConnectionHandler, TTrans: Transport, { /// Creates a new empty `Pool`. @@ -427,7 +431,7 @@ where impl Pool where - THandler: IntoConnectionHandler, + THandler: ConnectionHandler, TTrans: Transport + 'static, TTrans::Output: Send + 'static, TTrans::Error: Send + 'static, @@ -452,17 +456,14 @@ where >, >, peer: Option, - handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result + ) -> Result where TTrans: Send, TTrans::Dial: Send + 'static, { - if let Err(limit) = self.counters.check_max_pending_outgoing() { - return Err((limit, handler)); - }; + self.counters.check_max_pending_outgoing()?; let dial = ConcurrentDial::new( dials, @@ -490,7 +491,6 @@ where connection_id, PendingConnection { peer_id: peer, - handler, endpoint, abort_notifier: Some(abort_notifier), }, @@ -506,17 +506,14 @@ where pub fn add_incoming( &mut self, future: TFut, - handler: THandler, info: IncomingInfo<'_>, - ) -> Result + ) -> Result where TFut: Future> + Send + 'static, { let endpoint = info.create_connected_point(); - if let Err(limit) = self.counters.check_max_pending_incoming() { - return Err((limit, handler)); - } + self.counters.check_max_pending_incoming()?; let connection_id = self.next_connection_id(); @@ -537,7 +534,6 @@ where connection_id, PendingConnection { peer_id: None, - handler, endpoint: endpoint.into(), abort_notifier: Some(abort_notifier), }, @@ -546,12 +542,18 @@ where } /// Polls the connection pool for events. - pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> + pub fn poll( + &mut self, + mut new_handler_fn: impl FnMut( + &PeerId, + &ConnectedPoint, + ) -> Result>, + cx: &mut Context<'_>, + ) -> Poll> where TTrans: Transport, - THandler: IntoConnectionHandler + 'static, - THandler::Handler: ConnectionHandler + Send, - ::OutboundOpenInfo: Send, + THandler: ConnectionHandler + 'static, + ::OutboundOpenInfo: Send, { // Poll for events of established connections. // @@ -631,7 +633,6 @@ where } => { let PendingConnection { peer_id: expected_peer_id, - handler, endpoint, abort_notifier: _, } = self @@ -733,7 +734,6 @@ where id, error: error .map(|t| vec![(endpoint.get_remote_address().clone(), t)]), - handler, peer: expected_peer_id.or(Some(obtained_peer_id)), }) } @@ -744,7 +744,6 @@ where return Poll::Ready(PoolEvent::PendingInboundConnectionError { id, error, - handler, send_back_addr, local_addr, }) @@ -767,9 +766,27 @@ where }, ); + let handler = match new_handler_fn(&obtained_peer_id, &endpoint) { + Ok(handler) => handler, + Err(cause) => { + return Poll::Ready(PoolEvent::ConnectionDenied { + id, + peer_id: obtained_peer_id, + endpoint, + cause, + }) + } + }; + let supported_protocols = handler + .listen_protocol() + .upgrade() + .protocol_info() + .map(|p| p.protocol_name().to_owned()) + .collect(); + let connection = Connection::new( muxer, - handler.into_handler(&obtained_peer_id, &endpoint), + handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, ); @@ -790,12 +807,12 @@ where id, other_established_connection_ids, concurrent_dial_errors, + supported_protocols, }); } task::PendingConnectionEvent::PendingFailed { id, error } => { if let Some(PendingConnection { peer_id, - handler, endpoint, abort_notifier: _, }) = self.pending.remove(&id) @@ -807,7 +824,6 @@ where return Poll::Ready(PoolEvent::PendingOutboundConnectionError { id, error, - handler, peer: peer_id, }); } @@ -821,7 +837,6 @@ where return Poll::Ready(PoolEvent::PendingInboundConnectionError { id, error, - handler, send_back_addr, local_addr, }); diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 4ec58581c2e..880605452a3 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -1,12 +1,14 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{ + FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandlerInEvent, +}; use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; use libp2p_core::connection::ConnectionId; use libp2p_core::upgrade::DeniedUpgrade; -use libp2p_core::PeerId; use libp2p_core::UpgradeError; +use libp2p_core::{ConnectedPoint, PeerId}; use std::task::{Context, Poll}; use void::Void; @@ -17,8 +19,12 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = ConnectionHandler; type OutEvent = Void; - fn new_handler(&mut self) -> Self::ConnectionHandler { - ConnectionHandler + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(ConnectionHandler) } fn on_connection_handler_event(&mut self, _: PeerId, _: ConnectionId, event: Void) { @@ -29,7 +35,8 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { Poll::Pending } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 8d34509c085..df45f662525 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -49,14 +49,14 @@ mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; use instant::Instant; -use libp2p_core::{upgrade::UpgradeError, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{upgrade::UpgradeError, Multiaddr}; use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use pending::PendingConnectionHandler; -pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect}; +pub use select::ConnectionHandlerSelect; /// A handler for a set of protocols used on a connection with a remote. /// @@ -582,48 +582,6 @@ where } } -/// Prototype for a [`ConnectionHandler`]. -pub trait IntoConnectionHandler: Send + 'static { - /// The protocols handler. - type Handler: ConnectionHandler; - - /// Builds the protocols handler. - /// - /// The `PeerId` is the id of the node the handler is going to handle. - fn into_handler( - self, - remote_peer_id: &PeerId, - connected_point: &ConnectedPoint, - ) -> Self::Handler; - - /// Return the handler's inbound protocol. - fn inbound_protocol(&self) -> ::InboundProtocol; - - /// Builds an implementation of [`IntoConnectionHandler`] that handles both this protocol and the - /// other one together. - fn select(self, other: TProto2) -> IntoConnectionHandlerSelect - where - Self: Sized, - { - IntoConnectionHandlerSelect::new(self, other) - } -} - -impl IntoConnectionHandler for T -where - T: ConnectionHandler, -{ - type Handler = Self; - - fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self { - self - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - self.listen_protocol().into_upgrade().0 - } -} - /// How long the connection should be kept alive. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum KeepAlive { diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index e6d16ed1133..176de7202af 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -21,75 +21,14 @@ use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, + KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use either::Either; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{EitherUpgrade, UpgradeError}; -use libp2p_core::{ConnectedPoint, PeerId}; use std::task::{Context, Poll}; -/// Auxiliary type to allow implementing [`IntoConnectionHandler`]. As [`IntoConnectionHandler`] is -/// already implemented for T, we cannot implement it for Either. -pub enum IntoEitherHandler { - Left(L), - Right(R), -} - -/// Implementation of a [`IntoConnectionHandler`] that represents either of two [`IntoConnectionHandler`] -/// implementations. -impl IntoConnectionHandler for IntoEitherHandler -where - L: IntoConnectionHandler, - R: IntoConnectionHandler, -{ - type Handler = Either; - - fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler { - match self { - IntoEitherHandler::Left(into_handler) => Either::Left(into_handler.into_handler(p, c)), - IntoEitherHandler::Right(into_handler) => { - Either::Right(into_handler.into_handler(p, c)) - } - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - match self { - IntoEitherHandler::Left(into_handler) => { - EitherUpgrade::A(SendWrapper(into_handler.inbound_protocol())) - } - IntoEitherHandler::Right(into_handler) => { - EitherUpgrade::B(SendWrapper(into_handler.inbound_protocol())) - } - } - } -} - -// Taken from https://github.com/bluss/either. -impl IntoEitherHandler { - /// Returns the left value. - pub fn unwrap_left(self) -> L { - match self { - IntoEitherHandler::Left(l) => l, - IntoEitherHandler::Right(_) => { - panic!("called `IntoEitherHandler::unwrap_left()` on a `Right` value.",) - } - } - } - - /// Returns the right value. - pub fn unwrap_right(self) -> R { - match self { - IntoEitherHandler::Right(r) => r, - IntoEitherHandler::Left(_) => { - panic!("called `IntoEitherHandler::unwrap_right()` on a `Left` value.",) - } - } - } -} - /// Implementation of a [`ConnectionHandler`] that represents either of two [`ConnectionHandler`] /// implementations. impl ConnectionHandler for Either diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index c1f937c1cb3..2df8b9a6744 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -22,14 +22,14 @@ //! indexed by some key. use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, - KeepAlive, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::NegotiatedSubstream; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::upgrade::{NegotiationError, ProtocolError, ProtocolName, UpgradeError}; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::Multiaddr; use rand::Rng; use std::{ cmp, @@ -342,72 +342,6 @@ impl IntoIterator for MultiHandler { } } -/// A [`IntoConnectionHandler`] for multiple other `IntoConnectionHandler`s. -#[derive(Clone)] -pub struct IntoMultiHandler { - handlers: HashMap, -} - -impl fmt::Debug for IntoMultiHandler -where - K: fmt::Debug + Eq + Hash, - H: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("IntoMultiHandler") - .field("handlers", &self.handlers) - .finish() - } -} - -impl IntoMultiHandler -where - K: Hash + Eq, - H: IntoConnectionHandler, -{ - /// Create and populate an `IntoMultiHandler` from the given iterator. - /// - /// It is an error for any two protocols handlers to share the same protocol name. - pub fn try_from_iter(iter: I) -> Result - where - I: IntoIterator, - { - let m = IntoMultiHandler { - handlers: HashMap::from_iter(iter), - }; - uniq_proto_names(m.handlers.values().map(|h| h.inbound_protocol()))?; - Ok(m) - } -} - -impl IntoConnectionHandler for IntoMultiHandler -where - K: Debug + Clone + Eq + Hash + Send + 'static, - H: IntoConnectionHandler, -{ - type Handler = MultiHandler; - - fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler { - MultiHandler { - handlers: self - .handlers - .into_iter() - .map(|(k, h)| (k, h.into_handler(p, c))) - .collect(), - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - Upgrade { - upgrades: self - .handlers - .iter() - .map(|(k, h)| (k.clone(), h.inbound_protocol())) - .collect(), - } - } -} - /// Index and protocol name pair used as `UpgradeInfo::Info`. #[derive(Debug, Clone)] pub struct IndexedProtoName(usize, H); diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 65508c0b6a5..7cacbda5d67 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -20,64 +20,17 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, - KeepAlive, ListenUpgradeError, SubstreamProtocol, + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, + ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; use libp2p_core::{ either::{EitherError, EitherOutput}, upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, - ConnectedPoint, PeerId, }; use std::{cmp, task::Context, task::Poll}; -/// Implementation of `IntoConnectionHandler` that combines two protocols into one. -#[derive(Debug, Clone)] -pub struct IntoConnectionHandlerSelect { - /// The first protocol. - proto1: TProto1, - /// The second protocol. - proto2: TProto2, -} - -impl IntoConnectionHandlerSelect { - /// Builds a `IntoConnectionHandlerSelect`. - pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { - IntoConnectionHandlerSelect { proto1, proto2 } - } - - pub fn into_inner(self) -> (TProto1, TProto2) { - (self.proto1, self.proto2) - } -} - -impl IntoConnectionHandler for IntoConnectionHandlerSelect -where - TProto1: IntoConnectionHandler, - TProto2: IntoConnectionHandler, -{ - type Handler = ConnectionHandlerSelect; - - fn into_handler( - self, - remote_peer_id: &PeerId, - connected_point: &ConnectedPoint, - ) -> Self::Handler { - ConnectionHandlerSelect { - proto1: self.proto1.into_handler(remote_peer_id, connected_point), - proto2: self.proto2.into_handler(remote_peer_id, connected_point), - } - } - - fn inbound_protocol(&self) -> ::InboundProtocol { - SelectUpgrade::new( - SendWrapper(self.proto1.inbound_protocol()), - SendWrapper(self.proto2.inbound_protocol()), - ) - } -} - /// Implementation of [`ConnectionHandler`] that combines two protocols into one. #[derive(Debug, Clone)] pub struct ConnectionHandlerSelect { diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index bd1ed812b8b..a778a5f85db 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -1,11 +1,13 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{ + FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandlerInEvent, +}; use crate::handler::{ ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, }; use libp2p_core::connection::ConnectionId; use libp2p_core::upgrade::DeniedUpgrade; -use libp2p_core::PeerId; +use libp2p_core::{ConnectedPoint, PeerId}; use std::task::{Context, Poll}; use void::Void; @@ -22,8 +24,12 @@ impl NetworkBehaviour for Behaviour { type ConnectionHandler = ConnectionHandler; type OutEvent = Void; - fn new_handler(&mut self) -> Self::ConnectionHandler { - ConnectionHandler + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(ConnectionHandler) } fn on_connection_handler_event(&mut self, _: PeerId, _: ConnectionId, event: Void) { @@ -34,7 +40,8 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { Poll::Pending } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0c5cadcf021..9cc217a90a8 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -85,9 +85,8 @@ pub mod derive_prelude { pub use crate::behaviour::NewListenAddr; pub use crate::behaviour::NewListener; pub use crate::ConnectionHandler; + pub use crate::ConnectionHandlerSelect; pub use crate::DialError; - pub use crate::IntoConnectionHandler; - pub use crate::IntoConnectionHandlerSelect; pub use crate::NetworkBehaviour; pub use crate::NetworkBehaviourAction; pub use crate::PollParameters; @@ -111,8 +110,7 @@ pub use connection::{ pub use executor::Executor; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, - IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, - OneShotHandlerConfig, SubstreamProtocol, + KeepAlive, OneShotHandler, OneShotHandlerConfig, SubstreamProtocol, }; #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; @@ -131,7 +129,6 @@ use libp2p_core::{ multihash::Multihash, muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, - upgrade::ProtocolName, Endpoint, Multiaddr, Negotiated, PeerId, Transport, }; use registry::{AddressIntoIter, Addresses}; @@ -145,7 +142,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use upgrade::UpgradeInfoSend as _; /// Substream for which a protocol has been chosen. /// @@ -162,16 +158,13 @@ type THandler = ::ConnectionHandler; /// Custom event that can be received by the [`ConnectionHandler`] of the /// [`NetworkBehaviour`]. -type THandlerInEvent = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent; +type THandlerInEvent = as ConnectionHandler>::InEvent; /// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -type THandlerOutEvent = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent; +type THandlerOutEvent = as ConnectionHandler>::OutEvent; /// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -type THandlerErr = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::Error; +type THandlerErr = as ConnectionHandler>::Error; /// Event generated by the `Swarm`. #[derive(Debug)] @@ -205,6 +198,13 @@ pub enum SwarmEvent { /// active close. cause: Option>, }, + ConnectionDenied { + /// Identity of the peer that we have connected to. + peer_id: PeerId, + /// Endpoint of the connection that has been closed. + endpoint: ConnectedPoint, + cause: Box, + }, /// A new connection arrived on a listener and is in the process of protocol negotiation. /// /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished), @@ -511,15 +511,8 @@ where /// swarm.dial("/ip6/::1/tcp/12345".parse::().unwrap()); /// ``` pub fn dial(&mut self, opts: impl Into) -> Result<(), DialError> { - let handler = self.behaviour.new_handler(); - self.dial_with_handler(opts.into(), handler) - } + let swarm_dial_opts = opts.into(); - fn dial_with_handler( - &mut self, - swarm_dial_opts: DialOpts, - handler: ::ConnectionHandler, - ) -> Result<(), DialError> { let (peer_id, addresses, dial_concurrency_factor_override, role_override) = match swarm_dial_opts.0 { // Dial a known peer. @@ -546,7 +539,6 @@ where #[allow(deprecated)] self.behaviour.inject_dial_failure( Some(peer_id), - handler, &DialError::DialPeerConditionFalse(condition), ); @@ -557,8 +549,7 @@ where if self.banned_peers.contains(&peer_id) { let error = DialError::Banned; #[allow(deprecated)] - self.behaviour - .inject_dial_failure(Some(peer_id), handler, &error); + self.behaviour.inject_dial_failure(Some(peer_id), &error); return Err(error); } @@ -595,8 +586,7 @@ where if addresses.is_empty() { let error = DialError::NoAddresses; #[allow(deprecated)] - self.behaviour - .inject_dial_failure(Some(peer_id), handler, &error); + self.behaviour.inject_dial_failure(Some(peer_id), &error); return Err(error); }; @@ -671,15 +661,14 @@ where match self.pool.add_outgoing( dials, peer_id, - handler, role_override, dial_concurrency_factor_override, ) { Ok(_connection_id) => Ok(()), - Err((connection_limit, handler)) => { + Err(connection_limit) => { let error = DialError::ConnectionLimit(connection_limit); #[allow(deprecated)] - self.behaviour.inject_dial_failure(peer_id, handler, &error); + self.behaviour.inject_dial_failure(peer_id, &error); Err(error) } } @@ -821,6 +810,7 @@ where endpoint, other_established_connection_ids, concurrent_dial_errors, + supported_protocols, } => { if self.banned_peers.contains(&peer_id) { // Mark the connection for the banned peer as banned, thus withholding any @@ -856,6 +846,8 @@ where failed_addresses.as_ref(), non_banned_established, ); + self.supported_protocols = supported_protocols; + return Some(SwarmEvent::ConnectionEstablished { peer_id, num_established, @@ -864,16 +856,11 @@ where }); } } - PoolEvent::PendingOutboundConnectionError { - id: _, - error, - handler, - peer, - } => { + PoolEvent::PendingOutboundConnectionError { id: _, error, peer } => { let error = error.into(); #[allow(deprecated)] - self.behaviour.inject_dial_failure(peer, handler, &error); + self.behaviour.inject_dial_failure(peer, &error); if let Some(peer) = peer { log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); @@ -891,12 +878,11 @@ where send_back_addr, local_addr, error, - handler, } => { log::debug!("Incoming connection failed: {:?}", error); #[allow(deprecated)] self.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); + .inject_listen_failure(&local_addr, &send_back_addr); return Some(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, @@ -951,6 +937,18 @@ where num_established, }); } + PoolEvent::ConnectionDenied { + peer_id, + endpoint, + cause, + .. + } => { + return Some(SwarmEvent::ConnectionDenied { + peer_id, + endpoint, + cause, + }) + } PoolEvent::ConnectionEvent { peer_id, id, event } => { if self.banned_peer_connections.contains(&id) { log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); @@ -994,10 +992,8 @@ where local_addr, send_back_addr, } => { - let handler = self.behaviour.new_handler(); match self.pool.add_incoming( upgrade, - handler, IncomingInfo { local_addr: &local_addr, send_back_addr: &send_back_addr, @@ -1009,10 +1005,10 @@ where send_back_addr, }); } - Err((connection_limit, handler)) => { + Err(connection_limit) => { #[allow(deprecated)] self.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); + .inject_listen_failure(&local_addr, &send_back_addr); log::warn!("Incoming connection rejected: {:?}", connection_limit); } }; @@ -1089,15 +1085,15 @@ where fn handle_behaviour_event( &mut self, - event: NetworkBehaviourAction, + event: NetworkBehaviourAction>, ) -> Option>> { match event { NetworkBehaviourAction::GenerateEvent(event) => { return Some(SwarmEvent::Behaviour(event)) } - NetworkBehaviourAction::Dial { opts, handler } => { + NetworkBehaviourAction::Dial { opts } => { let peer_id = opts.get_peer_id(); - if let Ok(()) = self.dial_with_handler(opts, handler) { + if let Ok(()) = self.dial(opts) { if let Some(peer_id) = peer_id { return Some(SwarmEvent::Dialing(peer_id)); } @@ -1243,7 +1239,10 @@ where } // Poll the known peers. - match this.pool.poll(cx) { + match this.pool.poll( + |peer, connected_point| this.behaviour.new_handler(peer, connected_point), + cx, + ) { Poll::Pending => {} Poll::Ready(pool_event) => { if let Some(swarm_event) = this.handle_pool_event(pool_event) { @@ -1326,8 +1325,7 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TBehaviour: NetworkBehaviour, - THandler: IntoConnectionHandler, - THandler::Handler: ConnectionHandler< + THandler: ConnectionHandler< InEvent = THandlerInEvent, OutEvent = THandlerOutEvent, >, @@ -1633,22 +1631,13 @@ where } /// Builds a `Swarm` with the current configuration. - pub fn build(mut self) -> Swarm { - let supported_protocols = self - .behaviour - .new_handler() - .inbound_protocol() - .protocol_info() - .into_iter() - .map(|info| info.protocol_name().to_vec()) - .collect(); - + pub fn build(self) -> Swarm { Swarm { local_peer_id: self.local_peer_id, transport: self.transport, pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits), behaviour: self.behaviour, - supported_protocols, + supported_protocols: Default::default(), listened_addrs: HashMap::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), @@ -1885,7 +1874,7 @@ mod tests { ) -> bool where TBehaviour: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: Clone, + ::OutEvent: Clone, { swarm1 .behaviour() @@ -1905,7 +1894,7 @@ mod tests { ) -> bool where TBehaviour: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: Clone + ::OutEvent: Clone, { swarm1 .behaviour() diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 94a5fbfef54..1d03eec55bb 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -23,7 +23,7 @@ use crate::behaviour::{ FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener, }; use crate::{ - ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, + behaviour::THandlerInEvent, ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p_core::{ @@ -47,7 +47,7 @@ where /// The next action to return from `poll`. /// /// An action is only returned once. - pub next_action: Option>, + pub next_action: Option>>, } impl MockBehaviour @@ -72,8 +72,12 @@ where type ConnectionHandler = THandler; type OutEvent = TOutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.handler_proto.clone() + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(self.handler_proto.clone()) } fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { @@ -84,7 +88,8 @@ where &mut self, _: &mut Context, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { self.next_action.take().map_or(Poll::Pending, Poll::Ready) } @@ -109,8 +114,7 @@ where &mut self, _peer_id: PeerId, _connection_id: ConnectionId, - _event: <::Handler as - ConnectionHandler>::OutEvent, + _event: ::OutEvent, ) { } } @@ -130,7 +134,7 @@ where pub on_event: Vec<( PeerId, ConnectionId, - <::Handler as ConnectionHandler>::OutEvent, + ::OutEvent, )>, pub on_dial_failure: Vec>, pub on_new_listener: Vec, @@ -146,8 +150,7 @@ where impl CallTraceBehaviour where TInner: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: - Clone, + ::OutEvent: Clone, { pub fn new(inner: TInner) -> Self { Self { @@ -366,14 +369,17 @@ where impl NetworkBehaviour for CallTraceBehaviour where TInner: NetworkBehaviour, - <::Handler as ConnectionHandler>::OutEvent: - Clone, + ::OutEvent: Clone, { type ConnectionHandler = TInner::ConnectionHandler; type OutEvent = TInner::OutEvent; - fn new_handler(&mut self) -> Self::ConnectionHandler { - self.inner.new_handler() + fn new_handler( + &mut self, + peer: &PeerId, + endpoint: &ConnectedPoint, + ) -> Result> { + self.inner.new_handler(peer, endpoint) } fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { @@ -389,14 +395,10 @@ where FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } - FromSwarm::DialFailure(DialFailure { - peer_id, - handler, - error, - }) => { + FromSwarm::DialFailure(DialFailure { peer_id, error }) => { self.on_dial_failure.push(peer_id); #[allow(deprecated)] - self.inner.inject_dial_failure(peer_id, handler, error); + self.inner.inject_dial_failure(peer_id, error); } FromSwarm::NewListener(NewListener { listener_id }) => { self.on_new_listener.push(listener_id); @@ -445,7 +447,7 @@ where &mut self, p: PeerId, c: ConnectionId, - e: <::Handler as ConnectionHandler>::OutEvent, + e: ::OutEvent, ) { assert!( self.on_connection_established @@ -470,7 +472,8 @@ where &mut self, cx: &mut Context, args: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { self.poll += 1; self.inner.poll(cx, args) } diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index 84fb3bf4683..ce8417e19f7 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -371,10 +371,10 @@ fn generated_out_event_derive_debug() { #[test] fn custom_out_event_no_type_parameters() { use libp2p_core::connection::ConnectionId; + use libp2p_core::ConnectedPoint; use libp2p_core::PeerId; - use libp2p_swarm::{ - ConnectionHandler, IntoConnectionHandler, NetworkBehaviourAction, PollParameters, - }; + use libp2p_swarm::behaviour::THandlerInEvent; + use libp2p_swarm::{ConnectionHandler, NetworkBehaviourAction, PollParameters}; use std::task::Context; use std::task::Poll; @@ -386,15 +386,19 @@ fn custom_out_event_no_type_parameters() { type ConnectionHandler = dummy::ConnectionHandler; type OutEvent = void::Void; - fn new_handler(&mut self) -> Self::ConnectionHandler { - dummy::ConnectionHandler + fn new_handler( + &mut self, + _: &PeerId, + _: &ConnectedPoint, + ) -> Result> { + Ok(dummy::ConnectionHandler) } fn on_connection_handler_event( &mut self, _peer: PeerId, _connection: ConnectionId, - message: <::Handler as ConnectionHandler>::OutEvent, + message: ::OutEvent, ) { void::unreachable(message); } @@ -403,7 +407,8 @@ fn custom_out_event_no_type_parameters() { &mut self, _ctx: &mut Context, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll>> + { Poll::Pending }