From 96dbfcd1ade6de5b4ae623f59fb0d67b2916576a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 17 Jan 2022 16:35:14 +0100 Subject: [PATCH] core/src/transport: Add `Transport::dial_as_listener` (#2363) Allows `NetworkBehaviour` implementations to dial a peer, but instruct the dialed connection to be upgraded as if it were the listening endpoint. This is needed when establishing direct connections through NATs and/or Firewalls (hole punching). When hole punching via TCP (QUIC is different but similar) both ends dial the other at the same time resulting in a simultaneously opened TCP connection. To disambiguate who is the dialer and who the listener there are two options: 1. Use the Simultaneous Open Extension of Multistream Select. See [sim-open] specification and [sim-open-rust] Rust implementation. 2. Disambiguate the role (dialer or listener) based on the role within the DCUtR [dcutr] protocol. More specifically the node initiating the DCUtR process will act as a listener and the other as a dialer. This commit enables (2), i.e. enables the DCUtR protocol to specify the role used once the connection is established. While on the positive side (2) requires one round trip less than (1), on the negative side (2) only works for coordinated simultaneous dials. I.e. when a simultaneous dial happens by chance, and not coordinated via DCUtR, the connection attempt fails when only (2) is in place. [sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md [sim-open-rust]: https://github.com/libp2p/rust-libp2p/pull/2066 [dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md --- core/CHANGELOG.md | 12 +++++ core/src/connection.rs | 37 +++++++++++-- core/src/connection/listeners.rs | 14 +++++ core/src/connection/pool.rs | 34 +++++++----- core/src/connection/pool/concurrent_dial.rs | 22 +++++--- core/src/either.rs | 19 +++++++ core/src/network.rs | 53 ++++++++++++++++--- core/src/transport.rs | 11 +++- core/src/transport/and_then.rs | 29 ++++++++++- core/src/transport/boxed.rs | 12 +++++ core/src/transport/choice.rs | 20 ++++++++ core/src/transport/dummy.rs | 4 ++ core/src/transport/map.rs | 19 ++++++- core/src/transport/map_err.rs | 11 ++++ core/src/transport/memory.rs | 4 ++ core/src/transport/optional.rs | 8 +++ core/src/transport/timeout.rs | 11 ++++ core/src/transport/upgrade.rs | 18 ++++++- core/src/upgrade/apply.rs | 12 +++-- core/tests/concurrent_dialing.rs | 2 +- protocols/autonat/src/behaviour.rs | 15 +++++- protocols/autonat/tests/test_server.rs | 14 +++-- protocols/gossipsub/src/behaviour/tests.rs | 10 +++- protocols/identify/src/identify.rs | 2 +- protocols/kad/src/behaviour.rs | 2 +- protocols/kad/src/behaviour/test.rs | 5 +- protocols/ping/tests/ping.rs | 2 +- protocols/relay/src/v1/transport.rs | 43 ++++++++++++---- protocols/relay/src/v2/client/transport.rs | 11 ++++ protocols/relay/tests/v1.rs | 4 ++ protocols/rendezvous/tests/harness.rs | 4 +- protocols/rendezvous/tests/rendezvous.rs | 2 +- protocols/request-response/src/lib.rs | 4 +- src/bandwidth.rs | 7 +++ swarm/CHANGELOG.md | 7 ++- swarm/src/dial_opts.rs | 43 +++++++++++++++- swarm/src/lib.rs | 8 ++- transports/dns/src/lib.rs | 43 ++++++++++++++-- transports/tcp/src/lib.rs | 4 ++ transports/uds/src/lib.rs | 5 ++ transports/wasm-ext/src/lib.rs | 57 ++++++++++++++++----- transports/websocket/src/framed.rs | 47 ++++++++++++----- transports/websocket/src/lib.rs | 9 +++- 43 files changed, 594 insertions(+), 106 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index b7db3b4f919..3f61ba3fe46 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -22,12 +22,24 @@ - Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408]) +- Allow overriding role when dialing. This option is needed for NAT and firewall + hole punching. + + - Add `Transport::dial_as_listener`. As `Transport::dial` but + overrides the role of the local node on the connection . I.e. has the + local node act as a listener on the outgoing connection. + + - Add `override_role` option to `DialOpts`. + + See [PR 2363]. + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352 [PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392 [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408 +[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363 # 0.30.1 [2021-11-16] diff --git a/core/src/connection.rs b/core/src/connection.rs index d6249f19e8e..68a53d9e7e5 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -97,7 +97,10 @@ pub enum PendingPoint { /// There is no single address associated with the Dialer of a pending /// connection. Addresses are dialed in parallel. Only once the first dial /// is successful is the address of the connection known. - Dialer, + Dialer { + /// Same as [`ConnectedPoint::Dialer`] `role_override`. + role_override: Endpoint, + }, /// The socket comes from a listener. Listener { /// Local connection address. @@ -110,7 +113,7 @@ pub enum PendingPoint { impl From for PendingPoint { fn from(endpoint: ConnectedPoint) -> Self { match endpoint { - ConnectedPoint::Dialer { .. } => PendingPoint::Dialer, + ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override }, ConnectedPoint::Listener { local_addr, send_back_addr, @@ -129,6 +132,27 @@ pub enum ConnectedPoint { Dialer { /// Multiaddress that was successfully dialed. address: Multiaddr, + /// Whether the role of the local node on the connection should be + /// overriden. I.e. whether the local node should act as a listener on + /// the outgoing connection. + /// + /// This option is needed for NAT and firewall hole punching. + /// + /// - [`Endpoint::Dialer`] represents the default non-overriding option. + /// + /// - [`Endpoint::Listener`] represents the overriding option. + /// Realization depends on the transport protocol. E.g. in the case of + /// TCP, both endpoints dial each other, resulting in a _simultaneous + /// open_ TCP connection. On this new connection both endpoints assume + /// to be the dialer of the connection. This is problematic during the + /// connection upgrade process where an upgrade assumes one side to be + /// the listener. With the help of this option, both peers can + /// negotiate the roles (dialer and listener) for the new connection + /// ahead of time, through some external channel, e.g. the DCUtR + /// protocol, and thus have one peer dial the other and upgrade the + /// connection as a dialer and one peer dial the other and upgrade the + /// connection _as a listener_ overriding its role. + role_override: Endpoint, }, /// We received the node. Listener { @@ -179,7 +203,10 @@ impl ConnectedPoint { /// Returns true if the connection is relayed. pub fn is_relayed(&self) -> bool { match self { - ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Dialer { + address, + role_override: _, + } => address, ConnectedPoint::Listener { local_addr, .. } => local_addr, } .iter() @@ -194,7 +221,7 @@ impl ConnectedPoint { /// not be usable to establish new connections. pub fn get_remote_address(&self) -> &Multiaddr { match self { - ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Dialer { address, .. } => address, ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, } } @@ -204,7 +231,7 @@ impl ConnectedPoint { /// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`. pub fn set_remote_address(&mut self, new_address: Multiaddr) { match self { - ConnectedPoint::Dialer { address } => *address = new_address, + ConnectedPoint::Dialer { address, .. } => *address = new_address, ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address, } } diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index 4c394aeb75d..55a371b7f4c 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -491,6 +491,13 @@ mod tests { panic!() } + fn dial_as_listener( + self, + _: Multiaddr, + ) -> Result> { + panic!() + } + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } @@ -542,6 +549,13 @@ mod tests { panic!() } + fn dial_as_listener( + self, + _: Multiaddr, + ) -> Result> { + panic!() + } + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 85ac1b8a7ca..dc8a8ccf5e6 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -22,8 +22,8 @@ use crate::{ connection::{ handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo, - IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError, + Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, Endpoint, + IncomingInfo, IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint, Substream, }, muxing::StreamMuxer, @@ -460,7 +460,7 @@ where local_addr, send_back_addr, }), - PendingPoint::Dialer => None, + PendingPoint::Dialer { .. } => None, }) } @@ -535,6 +535,7 @@ where addresses: impl Iterator + Send + 'static, peer: Option, handler: THandler, + role_override: Endpoint, dial_concurrency_factor_override: Option, ) -> Result> where @@ -550,6 +551,7 @@ where peer, addresses, dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), + role_override, ); let connection_id = self.next_connection_id(); @@ -566,13 +568,15 @@ where .boxed(), ); - self.counters.inc_pending(&PendingPoint::Dialer); + let endpoint = PendingPoint::Dialer { role_override }; + + self.counters.inc_pending(&endpoint); self.pending.insert( connection_id, PendingConnectionInfo { peer_id: peer, handler, - endpoint: PendingPoint::Dialer, + endpoint: endpoint, _drop_notifier: drop_notifier, }, ); @@ -745,9 +749,13 @@ where self.counters.dec_pending(&endpoint); let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) { - (PendingPoint::Dialer, Some((address, errors))) => { - (ConnectedPoint::Dialer { address }, Some(errors)) - } + (PendingPoint::Dialer { role_override }, Some((address, errors))) => ( + ConnectedPoint::Dialer { + address, + role_override, + }, + Some(errors), + ), ( PendingPoint::Listener { local_addr, @@ -761,7 +769,7 @@ where }, None, ), - (PendingPoint::Dialer, None) => unreachable!( + (PendingPoint::Dialer { .. }, None) => unreachable!( "Established incoming connection via pending outgoing connection." ), (PendingPoint::Listener { .. }, Some(_)) => unreachable!( @@ -910,7 +918,7 @@ where self.counters.dec_pending(&endpoint); match (endpoint, error) { - (PendingPoint::Dialer, Either::Left(error)) => { + (PendingPoint::Dialer { .. }, Either::Left(error)) => { return Poll::Ready(PoolEvent::PendingOutboundConnectionError { id, error, @@ -933,7 +941,7 @@ where local_addr, }); } - (PendingPoint::Dialer, Either::Right(_)) => { + (PendingPoint::Dialer { .. }, Either::Right(_)) => { unreachable!("Inbound error for outbound connection.") } (PendingPoint::Listener { .. }, Either::Left(_)) => { @@ -1176,7 +1184,7 @@ impl ConnectionCounters { fn inc_pending(&mut self, endpoint: &PendingPoint) { match endpoint { - PendingPoint::Dialer => { + PendingPoint::Dialer { .. } => { self.pending_outgoing += 1; } PendingPoint::Listener { .. } => { @@ -1191,7 +1199,7 @@ impl ConnectionCounters { fn dec_pending(&mut self, endpoint: &PendingPoint) { match endpoint { - PendingPoint::Dialer => { + PendingPoint::Dialer { .. } => { self.pending_outgoing -= 1; } PendingPoint::Listener { .. } => { diff --git a/core/src/connection/pool/concurrent_dial.rs b/core/src/connection/pool/concurrent_dial.rs index 4e960e09188..6c10ca1b19d 100644 --- a/core/src/connection/pool/concurrent_dial.rs +++ b/core/src/connection/pool/concurrent_dial.rs @@ -18,9 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -pub use crate::connection::{ConnectionCounters, ConnectionLimits}; - use crate::{ + connection::Endpoint, transport::{Transport, TransportError}, Multiaddr, PeerId, }; @@ -63,14 +62,21 @@ where peer: Option, addresses: impl Iterator + Send + 'static, concurrency_factor: NonZeroU8, + role_override: Endpoint, ) -> Self { let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) { - Ok(address) => match transport.clone().dial(address.clone()) { - Ok(fut) => fut - .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) - .boxed(), - Err(err) => futures::future::ready((address, Err(err))).boxed(), - }, + Ok(address) => { + let dial = match role_override { + Endpoint::Dialer => transport.clone().dial(address.clone()), + Endpoint::Listener => transport.clone().dial_as_listener(address.clone()), + }; + match dial { + Ok(fut) => fut + .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) + .boxed(), + Err(err) => futures::future::ready((address, Err(err))).boxed(), + } + } Err(address) => futures::future::ready(( address.clone(), Err(TransportError::MultiaddrNotSupported(address)), diff --git a/core/src/either.rs b/core/src/either.rs index 66a11589f7a..46f8ba833cc 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -529,6 +529,25 @@ where } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + use TransportError::*; + match self { + EitherTransport::Left(a) => match a.dial_as_listener(addr) { + Ok(connec) => Ok(EitherFuture::First(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::A(err))), + }, + EitherTransport::Right(b) => match b.dial_as_listener(addr) { + Ok(connec) => Ok(EitherFuture::Second(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::B(err))), + }, + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { match self { EitherTransport::Left(a) => a.address_translation(server, observed), diff --git a/core/src/network.rs b/core/src/network.rs index 63a07573c7f..f96bee76f7e 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -21,7 +21,7 @@ mod event; pub mod peer; -pub use crate::connection::{ConnectionCounters, ConnectionLimits}; +pub use crate::connection::{ConnectionCounters, ConnectionLimits, Endpoint}; pub use event::{IncomingConnection, NetworkEvent}; pub use peer::Peer; @@ -97,7 +97,7 @@ where self.pool .iter_pending_info() .filter(move |(_, endpoint, peer_id)| { - matches!(endpoint, PendingPoint::Dialer) && peer_id.as_ref() == Some(&peer) + matches!(endpoint, PendingPoint::Dialer { .. }) && peer_id.as_ref() == Some(&peer) }) .map(|(connection_id, _, _)| connection_id) } @@ -206,19 +206,24 @@ where { let opts = opts.into(); - let (peer_id, addresses, dial_concurrency_factor_override) = match opts.0 { + let (peer_id, addresses, dial_concurrency_factor_override, role_override) = match opts.0 { // Dial a known peer. Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { peer_id, addresses, dial_concurrency_factor_override, + role_override, }) => ( Some(peer_id), Either::Left(addresses.into_iter()), dial_concurrency_factor_override, + role_override, ), // Dial an unknown peer. - Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { address }) => { + Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { + address, + role_override, + }) => { // If the address ultimately encapsulates an expected peer ID, dial that peer // such that any mismatch is detected. We do not "pop off" the `P2p` protocol // from the address, because it may be used by the `Transport`, i.e. `P2p` @@ -239,7 +244,12 @@ where Err(_) => return Err(DialError::InvalidPeerId { handler }), }; - (peer_id, Either::Right(std::iter::once(address)), None) + ( + peer_id, + Either::Right(std::iter::once(address)), + None, + role_override, + ) } }; @@ -248,6 +258,7 @@ where addresses, peer_id, handler, + role_override, dial_concurrency_factor_override, ) } @@ -284,7 +295,7 @@ where pub fn dialing_peers(&self) -> impl Iterator { self.pool .iter_pending_info() - .filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer)) + .filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer { .. })) .filter_map(|(_, _, peer)| peer.as_ref()) } @@ -627,6 +638,7 @@ impl WithPeerId { peer_id: self.peer_id, addresses, dial_concurrency_factor_override: Default::default(), + role_override: Endpoint::Dialer, } } } @@ -636,6 +648,7 @@ pub struct WithPeerIdWithAddresses { pub(crate) peer_id: PeerId, pub(crate) addresses: Vec, pub(crate) dial_concurrency_factor_override: Option, + pub(crate) role_override: Endpoint, } impl WithPeerIdWithAddresses { @@ -645,6 +658,17 @@ impl WithPeerIdWithAddresses { self } + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self, role: Endpoint) -> Self { + self.role_override = role; + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithPeerIdWithAddresses(self)) @@ -657,16 +681,31 @@ pub struct WithoutPeerId {} impl WithoutPeerId { /// Specify a single address to dial the unknown peer. pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { - WithoutPeerIdWithAddress { address } + WithoutPeerIdWithAddress { + address, + role_override: Endpoint::Dialer, + } } } #[derive(Debug, Clone, PartialEq)] pub struct WithoutPeerIdWithAddress { pub(crate) address: Multiaddr, + pub(crate) role_override: Endpoint, } impl WithoutPeerIdWithAddress { + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self, role: Endpoint) -> Self { + self.role_override = role; + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithoutPeerIdWithAddress(self)) diff --git a/core/src/transport.rs b/core/src/transport.rs index 7006c15a810..664d55b1d60 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -25,7 +25,7 @@ //! any desired protocols. The rest of the module defines combinators for //! modifying a transport through composition with other transports or protocol upgrades. -use crate::ConnectedPoint; +use crate::connection::ConnectedPoint; use futures::prelude::*; use multiaddr::Multiaddr; use std::{error::Error, fmt}; @@ -130,6 +130,15 @@ pub trait Transport { where Self: Sized; + /// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection. + /// + /// This option is needed for NAT and firewall hole punching. + /// + /// See [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) for related option. + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized; + /// Performs a transport-specific mapping of an address `observed` by /// a remote onto a local `listen` address to yield an address for /// the local node that may be reachable for other peers. diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index 3ff70fcae0c..219887fa6df 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + connection::{ConnectedPoint, Endpoint}, either::EitherError, transport::{ListenerEvent, Transport, TransportError}, - ConnectedPoint, }; use futures::{future::Either, prelude::*}; use multiaddr::Multiaddr; @@ -76,7 +76,32 @@ where .map_err(|err| err.map(EitherError::A))?; let future = AndThenFuture { inner: Either::Left(Box::pin(dialed_fut)), - args: Some((self.fun, ConnectedPoint::Dialer { address: addr })), + args: Some(( + self.fun, + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }, + )), + _marker: PhantomPinned, + }; + Ok(future) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let dialed_fut = self + .transport + .dial_as_listener(addr.clone()) + .map_err(|err| err.map(EitherError::A))?; + let future = AndThenFuture { + inner: Either::Left(Box::pin(dialed_fut)), + args: Some(( + self.fun, + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Listener, + }, + )), _marker: PhantomPinned, }; Ok(future) diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index 001a0c9fdf3..0f47470fcad 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -52,6 +52,7 @@ type ListenerUpgrade = Pin> + Send>>; trait Abstract { fn listen_on(&self, addr: Multiaddr) -> Result, TransportError>; fn dial(&self, addr: Multiaddr) -> Result, TransportError>; + fn dial_as_listener(&self, addr: Multiaddr) -> Result, TransportError>; fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; } @@ -85,6 +86,13 @@ where Ok(Box::pin(fut) as Dial<_>) } + fn dial_as_listener(&self, addr: Multiaddr) -> Result, TransportError> { + let fut = Transport::dial_as_listener(self.clone(), addr) + .map(|r| r.map_err(box_err)) + .map_err(|e| e.map(box_err))?; + Ok(Box::pin(fut) as Dial<_>) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { Transport::address_translation(self, server, observed) } @@ -119,6 +127,10 @@ impl Transport for Boxed { self.inner.dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.inner.dial_as_listener(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index e9545617f09..3f2d87064cb 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -83,6 +83,26 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let addr = match self.0.dial_as_listener(addr) { + Ok(connec) => return Ok(EitherFuture::First(connec)), + Err(TransportError::MultiaddrNotSupported(addr)) => addr, + Err(TransportError::Other(err)) => { + return Err(TransportError::Other(EitherError::A(err))) + } + }; + + let addr = match self.1.dial_as_listener(addr) { + Ok(connec) => return Ok(EitherFuture::Second(connec)), + Err(TransportError::MultiaddrNotSupported(addr)) => addr, + Err(TransportError::Other(err)) => { + return Err(TransportError::Other(EitherError::B(err))) + } + }; + + Err(TransportError::MultiaddrNotSupported(addr)) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { if let Some(addr) = self.0.address_translation(server, observed) { Some(addr) diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index a4eaa14901d..ead00ff9609 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -70,6 +70,10 @@ impl Transport for DummyTransport { Err(TransportError::MultiaddrNotSupported(addr)) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 4493507c1d9..ec5d8c4a1be 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -19,8 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + connection::{ConnectedPoint, Endpoint}, transport::{ListenerEvent, Transport, TransportError}, - ConnectedPoint, }; use futures::prelude::*; use multiaddr::Multiaddr; @@ -60,7 +60,22 @@ where fn dial(self, addr: Multiaddr) -> Result> { let future = self.transport.dial(addr.clone())?; - let p = ConnectedPoint::Dialer { address: addr }; + let p = ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }; + Ok(MapFuture { + inner: future, + args: Some((self.fun, p)), + }) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let future = self.transport.dial_as_listener(addr.clone())?; + let p = ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Listener, + }; Ok(MapFuture { inner: future, args: Some((self.fun, p)), diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index df26214435a..eba3e042552 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -68,6 +68,17 @@ where } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let map = self.map; + match self.transport.dial_as_listener(addr) { + Ok(future) => Ok(MapErrDial { + inner: future, + map: Some(map), + }), + Err(err) => Err(err.map(map)), + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) } diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 3b4706c9adb..075622a9f9c 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -205,6 +205,10 @@ impl Transport for MemoryTransport { DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 2b29773ee22..a3bfc744add 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -75,6 +75,14 @@ where } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + if let Some(inner) = self.0 { + inner.dial_as_listener(addr) + } else { + Err(TransportError::MultiaddrNotSupported(addr)) + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { if let Some(inner) = &self.0 { inner.address_translation(server, observed) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index 8084dcb7521..51fead8d42e 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -109,6 +109,17 @@ where }) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let dial = self + .inner + .dial_as_listener(addr) + .map_err(|err| err.map(TransportTimeoutError::Other))?; + Ok(Timeout { + inner: dial, + timer: Delay::new(self.outgoing_timeout), + }) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 7777be9256e..676e0989baa 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -23,6 +23,7 @@ pub use crate::upgrade::Version; use crate::{ + connection::ConnectedPoint, muxing::{StreamMuxer, StreamMuxerBox}, transport::{ and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, @@ -32,7 +33,7 @@ use crate::{ self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade, OutboundUpgradeApply, UpgradeError, }, - ConnectedPoint, Negotiated, PeerId, + Negotiated, PeerId, }; use futures::{prelude::*, ready}; use multiaddr::Multiaddr; @@ -340,6 +341,10 @@ where self.0.dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.0.dial_as_listener(addr) + } + fn listen_on(self, addr: Multiaddr) -> Result> { self.0.listen_on(addr) } @@ -393,6 +398,17 @@ where }) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let future = self + .inner + .dial_as_listener(addr) + .map_err(|err| err.map(TransportUpgradeError::Transport))?; + Ok(DialUpgradeFuture { + future: Box::pin(future), + upgrade: future::Either::Left(Some(self.upgrade)), + }) + } + fn listen_on(self, addr: Multiaddr) -> Result> { let stream = self .inner diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 3b4763d2303..ce506cfeb74 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError}; -use crate::{ConnectedPoint, Negotiated}; +use crate::{connection::ConnectedPoint, Negotiated}; use futures::{future::Either, prelude::*}; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; @@ -27,6 +27,7 @@ use std::{iter, mem, pin::Pin, task::Context, task::Poll}; pub use multistream_select::Version; +// TODO: Still needed? /// Applies an upgrade to the inbound and outbound direction of a connection or substream. pub fn apply( conn: C, @@ -38,10 +39,11 @@ where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade> + OutboundUpgrade>, { - if cp.is_listener() { - Either::Left(apply_inbound(conn, up)) - } else { - Either::Right(apply_outbound(conn, up, v)) + match cp { + ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => { + Either::Right(apply_outbound(conn, up, v)) + } + _ => Either::Left(apply_inbound(conn, up)), } } diff --git a/core/tests/concurrent_dialing.rs b/core/tests/concurrent_dialing.rs index e708be91ac7..2da22a5d240 100644 --- a/core/tests/concurrent_dialing.rs +++ b/core/tests/concurrent_dialing.rs @@ -120,7 +120,7 @@ fn concurrent_dialing() { .. }) => { match connection.endpoint() { - ConnectedPoint::Dialer { address } => { + ConnectedPoint::Dialer { address, .. } => { assert_eq!( *address, accepted_addr diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 8a8e1d1eabe..a1a8bb6ff21 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -30,7 +30,7 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::{ connection::{ConnectionId, ListenerId}, - ConnectedPoint, Multiaddr, PeerId, + ConnectedPoint, Endpoint, Multiaddr, PeerId, }; use libp2p_request_response::{ handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse, @@ -315,12 +315,23 @@ impl NetworkBehaviour for Behaviour { connections.insert(*conn, addr); match endpoint { - ConnectedPoint::Dialer { address } => { + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + } => { if let Some(event) = self.as_server().on_outbound_connection(peer, address) { self.pending_out_events .push_back(Event::InboundProbe(event)); } } + ConnectedPoint::Dialer { + address: _, + role_override: Endpoint::Listener, + } => { + // Outgoing connection was dialed as a listener. In other words outgoing connection + // was dialed as part of a hole punch. `libp2p-autonat` never attempts to hole + // punch, thus this connection has not been requested by this [`NetworkBehaviour`]. + } ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(), } } diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index b96adaf78a4..1e606cad95e 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -30,7 +30,7 @@ use libp2p::{ use libp2p_autonat::{ Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError, }; -use libp2p_core::ConnectedPoint; +use libp2p_core::{ConnectedPoint, Endpoint}; use libp2p_swarm::DialError; use std::{num::NonZeroU32, time::Duration}; @@ -191,7 +191,11 @@ async fn test_dial_back() { match server.select_next_some().await { SwarmEvent::ConnectionEstablished { peer_id, - endpoint: ConnectedPoint::Dialer { address }, + endpoint: + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + }, num_established, concurrent_dial_errors, } => { @@ -399,7 +403,11 @@ async fn test_dial_multiple_addr() { match server.select_next_some().await { SwarmEvent::ConnectionEstablished { peer_id, - endpoint: ConnectedPoint::Dialer { address }, + endpoint: + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + }, concurrent_dial_errors, .. } => { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 7e921c3e425..84f4ec94275 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -38,6 +38,7 @@ mod tests { use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; + use libp2p_core::Endpoint; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -189,7 +190,10 @@ mod tests { &peer, &ConnectionId::new(0), &if outbound { - ConnectedPoint::Dialer { address } + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + } } else { ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -534,6 +538,7 @@ mod tests { &ConnectionId::new(1), &ConnectedPoint::Dialer { address: "/ip4/127.0.0.1".parse::().unwrap(), + role_override: Endpoint::Dialer, }, None, ); @@ -4075,6 +4080,7 @@ mod tests { &ConnectionId::new(0), &ConnectedPoint::Dialer { address: addr.clone(), + role_override: Endpoint::Dialer, }, None, ); @@ -4094,6 +4100,7 @@ mod tests { &ConnectionId::new(0), &ConnectedPoint::Dialer { address: addr2.clone(), + role_override: Endpoint::Dialer, }, None, ); @@ -4122,6 +4129,7 @@ mod tests { &ConnectionId::new(0), &ConnectedPoint::Dialer { address: addr.clone(), + role_override: Endpoint::Dialer, }, None, ); diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index def59b7c401..2e644502014 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -228,7 +228,7 @@ impl NetworkBehaviour for Identify { failed_addresses: Option<&Vec>, ) { let addr = match endpoint { - ConnectedPoint::Dialer { address } => address.clone(), + ConnectedPoint::Dialer { address, .. } => address.clone(), ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), }; diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index c08e1401507..d4bda76172d 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1968,7 +1968,7 @@ where // since the remote address on an inbound connection may be specific // to that connection (e.g. typically the TCP port numbers). let address = match endpoint { - ConnectedPoint::Dialer { address } => Some(address), + ConnectedPoint::Dialer { address, .. } => Some(address), ConnectedPoint::Listener { .. } => None, }; self.connection_updated(source, address, NodeStatus::Connected); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index b5d52f273fb..21eb76c1d25 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -33,7 +33,7 @@ use libp2p_core::{ multiaddr::{multiaddr, Multiaddr, Protocol}, multihash::{Code, Multihash, MultihashDigest}, transport::MemoryTransport, - upgrade, PeerId, Transport, + upgrade, Endpoint, PeerId, Transport, }; use libp2p_noise as noise; use libp2p_swarm::{Swarm, SwarmEvent}; @@ -1287,6 +1287,7 @@ fn network_behaviour_inject_address_change() { let endpoint = ConnectedPoint::Dialer { address: old_address.clone(), + role_override: Endpoint::Dialer, }; // Mimick a connection being established. @@ -1316,9 +1317,11 @@ fn network_behaviour_inject_address_change() { &connection_id, &ConnectedPoint::Dialer { address: old_address.clone(), + role_override: Endpoint::Dialer, }, &ConnectedPoint::Dialer { address: new_address.clone(), + role_override: Endpoint::Dialer, }, ); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 3a7acd72fb1..dbde7db608d 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -30,7 +30,7 @@ use libp2p_core::{ use libp2p_mplex as mplex; use libp2p_noise as noise; use libp2p_ping as ping; -use libp2p_swarm::{dial_opts::DialOpts, DummyBehaviour, KeepAlive, Swarm, SwarmEvent}; +use libp2p_swarm::{DummyBehaviour, KeepAlive, Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; use libp2p_yamux as yamux; use quickcheck::*; diff --git a/protocols/relay/src/v1/transport.rs b/protocols/relay/src/v1/transport.rs index 811541793e8..c96129155dc 100644 --- a/protocols/relay/src/v1/transport.rs +++ b/protocols/relay/src/v1/transport.rs @@ -25,6 +25,7 @@ use futures::channel::oneshot; use futures::future::{BoxFuture, Future, FutureExt}; use futures::sink::SinkExt; use futures::stream::{Stream, StreamExt}; +use libp2p_core::connection::Endpoint; use libp2p_core::either::{EitherError, EitherFuture, EitherOutput}; use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::{ListenerEvent, TransportError}; @@ -220,15 +221,41 @@ impl Transport for RelayTransport { } fn dial(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Dialer) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Listener) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner_transport.address_translation(server, observed) + } +} + +impl RelayTransport { + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { match parse_relayed_multiaddr(addr)? { // Address does not contain circuit relay protocol. Use inner transport. - Err(addr) => match self.inner_transport.dial(addr) { - Ok(dialer) => Ok(EitherFuture::First(dialer)), - Err(TransportError::MultiaddrNotSupported(addr)) => { - Err(TransportError::MultiaddrNotSupported(addr)) + Err(addr) => { + let dial = match role_override { + Endpoint::Dialer => self.inner_transport.dial(addr), + Endpoint::Listener => self.inner_transport.dial_as_listener(addr), + }; + match dial { + Ok(dialer) => Ok(EitherFuture::First(dialer)), + Err(TransportError::MultiaddrNotSupported(addr)) => { + Err(TransportError::MultiaddrNotSupported(addr)) + } + Err(TransportError::Other(err)) => { + Err(TransportError::Other(EitherError::A(err))) + } } - Err(TransportError::Other(err)) => Err(TransportError::Other(EitherError::A(err))), - }, + } // Address does contain circuit relay protocol. Dial destination via relay. Ok(RelayedMultiaddr { relay_peer_id, @@ -263,10 +290,6 @@ impl Transport for RelayTransport { } } } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner_transport.address_translation(server, observed) - } } #[derive(Default)] diff --git a/protocols/relay/src/v2/client/transport.rs b/protocols/relay/src/v2/client/transport.rs index 6128f25bcc7..0a1cffad0bb 100644 --- a/protocols/relay/src/v2/client/transport.rs +++ b/protocols/relay/src/v2/client/transport.rs @@ -197,6 +197,17 @@ impl Transport for ClientTransport { .boxed()) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + // [`Transport::dial_as_listener`] is used for NAT and firewall + // traversal. One would coordinate such traversal via a previously + // established relayed connection, but never using a relayed connection + // itself. + return Err(TransportError::MultiaddrNotSupported(addr)); + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/protocols/relay/tests/v1.rs b/protocols/relay/tests/v1.rs index fe4ca9f29b5..69d08ae7d66 100644 --- a/protocols/relay/tests/v1.rs +++ b/protocols/relay/tests/v1.rs @@ -1264,6 +1264,10 @@ impl Transport for Firewall { self.0.dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.0.dial_as_listener(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.0.address_translation(server, observed) } diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 6b5a202b476..3709e509a06 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -29,9 +29,7 @@ use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; -use libp2p::swarm::{ - dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, -}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use libp2p::yamux::YamuxConfig; use std::fmt::Debug; use std::time::Duration; diff --git a/protocols/rendezvous/tests/rendezvous.rs b/protocols/rendezvous/tests/rendezvous.rs index 7d02610a5af..458ea588832 100644 --- a/protocols/rendezvous/tests/rendezvous.rs +++ b/protocols/rendezvous/tests/rendezvous.rs @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use libp2p_core::identity; use libp2p_rendezvous as rendezvous; -use libp2p_swarm::{dial_opts::DialOpts, DialError, Swarm, SwarmEvent}; +use libp2p_swarm::{DialError, Swarm, SwarmEvent}; use std::convert::TryInto; use std::time::Duration; diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 1f6d1eeb817..af6b601eae0 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -599,7 +599,7 @@ where new: &ConnectedPoint, ) { let new_address = match new { - ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Dialer { address, .. } => Some(address.clone()), ConnectedPoint::Listener { .. } => None, }; let connections = self @@ -631,7 +631,7 @@ where _errors: Option<&Vec>, ) { let address = match endpoint { - ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Dialer { address, .. } => Some(address.clone()), ConnectedPoint::Listener { .. } => None, }; self.connected diff --git a/src/bandwidth.rs b/src/bandwidth.rs index a341b4dfbab..1ff63cc5ec8 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -89,6 +89,13 @@ where .map(move |fut| BandwidthFuture { inner: fut, sinks }) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let sinks = self.sinks; + self.inner + .dial_as_listener(addr) + .map(move |fut| BandwidthFuture { inner: fut, sinks }) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index a97ea499aa6..57df6ddad68 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -14,9 +14,13 @@ - Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]). -- Enable overriding _dial concurrency factor_ per dial via +- Allow overriding _dial concurrency factor_ per dial via `DialOpts::override_dial_concurrency_factor`. See [PR 2404]. +- Allow overriding role when dialing through `override_role` option on + `DialOpts`. This option is needed for NAT and firewall hole punching. See [PR + 2363]. + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 @@ -24,6 +28,7 @@ [PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375 [PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378 [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 +[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363 # 0.32.0 [2021-11-16] diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index e98092a73d4..b443d8f420b 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -19,6 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use libp2p_core::connection::Endpoint; use libp2p_core::{Multiaddr, PeerId}; use std::num::NonZeroU8; @@ -51,6 +52,7 @@ impl DialOpts { WithPeerId { peer_id, condition: Default::default(), + role_override: Endpoint::Dialer, dial_concurrency_factor_override: Default::default(), } } @@ -108,6 +110,7 @@ pub(super) enum Opts { pub struct WithPeerId { pub(crate) peer_id: PeerId, pub(crate) condition: PeerCondition, + pub(crate) role_override: Endpoint, pub(crate) dial_concurrency_factor_override: Option, } @@ -132,10 +135,22 @@ impl WithPeerId { condition: self.condition, addresses, extend_addresses_through_behaviour: false, + role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, } } + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self) -> Self { + self.role_override = Endpoint::Listener; + self + } + /// Build the final [`DialOpts`]. /// /// Addresses to dial the peer are retrieved via @@ -151,6 +166,7 @@ pub struct WithPeerIdWithAddresses { pub(crate) condition: PeerCondition, pub(crate) addresses: Vec, pub(crate) extend_addresses_through_behaviour: bool, + pub(crate) role_override: Endpoint, pub(crate) dial_concurrency_factor_override: Option, } @@ -168,6 +184,17 @@ impl WithPeerIdWithAddresses { self } + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self) -> Self { + self.role_override = Endpoint::Listener; + self + } + /// Override /// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor). pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self { @@ -187,16 +214,30 @@ pub struct WithoutPeerId {} impl WithoutPeerId { /// Specify a single address to dial the unknown peer. pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { - WithoutPeerIdWithAddress { address } + WithoutPeerIdWithAddress { + address, + role_override: Endpoint::Dialer, + } } } #[derive(Debug)] pub struct WithoutPeerIdWithAddress { pub(crate) address: Multiaddr, + pub(crate) role_override: Endpoint, } impl WithoutPeerIdWithAddress { + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self) -> Self { + self.role_override = Endpoint::Listener; + self + } /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithoutPeerIdWithAddress(self)) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cc09ebf22a6..4ebd0da2e55 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -368,11 +368,13 @@ where dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { peer_id, condition, + role_override, dial_concurrency_factor_override, }) | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses { peer_id, condition, + role_override, dial_concurrency_factor_override, .. }) => { @@ -439,7 +441,9 @@ where addresses }; - let mut opts = libp2p_core::DialOpts::peer_id(peer_id).addresses(addresses); + let mut opts = libp2p_core::DialOpts::peer_id(peer_id) + .addresses(addresses) + .override_role(role_override); if let Some(f) = dial_concurrency_factor_override { opts = opts.override_dial_concurrency_factor(f); @@ -450,8 +454,10 @@ where // Dial an unknown peer. dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress { address, + role_override, }) => libp2p_core::DialOpts::unknown_peer_id() .address(address) + .override_role(role_override) .build(), }; diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 2cfc27f93a6..55684140152 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -58,6 +58,7 @@ use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider}; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ + connection::Endpoint, multiaddr::{Multiaddr, Protocol}, transport::{ListenerEvent, TransportError}, Transport, @@ -211,6 +212,31 @@ where } fn dial(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Dialer) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Listener) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } +} + +impl GenDnsConfig +where + T: Transport + Clone + Send + 'static, + T::Error: Send, + T::Dial: Send, + C: DnsHandle, + P: ConnectionProvider, +{ + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { // Asynchronlously resolve all DNS names in the address before proceeding // with dialing on the underlying transport. Ok(async move { @@ -293,7 +319,11 @@ where log::debug!("Dialing {}", addr); let transport = inner.clone(); - let result = match transport.dial(addr) { + let dial = match role_override { + Endpoint::Dialer => transport.dial(addr), + Endpoint::Listener => transport.dial_as_listener(addr), + }; + let result = match dial { Ok(out) => { // We only count attempts that the inner transport // actually accepted, i.e. for which it produced @@ -338,10 +368,6 @@ where .boxed() .right_future()) } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(server, observed) - } } /// The possible errors of a [`GenDnsConfig`] wrapped transport. @@ -579,6 +605,13 @@ mod tests { Ok(Box::pin(future::ready(Ok(())))) } + fn dial_as_listener( + self, + addr: Multiaddr, + ) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index bfad45f0e44..eec52e97193 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -405,6 +405,10 @@ where Ok(Box::pin(self.do_dial(socket_addr))) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.dial(addr) + } + /// When port reuse is disabled and hence ephemeral local ports are /// used for outgoing connections, the returned address is the /// `observed` address with the port replaced by the port of the diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index 34ac4eb51c3..a5fabc162a0 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -105,6 +105,7 @@ impl Transport for $uds_config { } fn dial(self, addr: Multiaddr) -> Result> { + // TODO: Should we dial at all? if let Ok(path) = multiaddr_to_path(&addr) { debug!("Dialing {}", addr); Ok(async move { <$unix_stream>::connect(&path).await }.boxed()) @@ -113,6 +114,10 @@ impl Transport for $uds_config { } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 27aafdb70c3..5ef8dbb0bbd 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -33,7 +33,11 @@ //! use futures::{future::Ready, prelude::*}; -use libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport}; +use libp2p_core::{ + connection::Endpoint, + transport::{ListenerEvent, TransportError}, + Multiaddr, Transport, +}; use parity_send_wrapper::SendWrapper; use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use wasm_bindgen::{prelude::*, JsCast}; @@ -61,7 +65,11 @@ pub mod ffi { /// If the multiaddress is not supported, you should return an instance of `Error` whose /// `name` property has been set to the string `"NotSupportedError"`. #[wasm_bindgen(method, catch)] - pub fn dial(this: &Transport, multiaddr: &str) -> Result; + pub fn dial( + this: &Transport, + multiaddr: &str, + _role_override: bool, + ) -> Result; /// Start listening on the given multiaddress. /// @@ -148,6 +156,29 @@ impl ExtTransport { inner: SendWrapper::new(transport), } } + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { + let promise = self + .inner + .dial( + &addr.to_string(), + matches!(role_override, Endpoint::Listener), + ) + .map_err(|err| { + if is_not_supported_error(&err) { + TransportError::MultiaddrNotSupported(addr) + } else { + TransportError::Other(JsErr::from(err)) + } + })?; + + Ok(Dial { + inner: SendWrapper::new(promise.into()), + }) + } } impl fmt::Debug for ExtTransport { @@ -187,18 +218,18 @@ impl Transport for ExtTransport { }) } - fn dial(self, addr: Multiaddr) -> Result> { - let promise = self.inner.dial(&addr.to_string()).map_err(|err| { - if is_not_supported_error(&err) { - TransportError::MultiaddrNotSupported(addr) - } else { - TransportError::Other(JsErr::from(err)) - } - })?; + fn dial(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + self.do_dial(addr, Endpoint::Dialer) + } - Ok(Dial { - inner: SendWrapper::new(promise.into()), - }) + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + self.do_dial(addr, Endpoint::Listener) } fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 266a6b9bada..1e009248fad 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -23,6 +23,7 @@ use either::Either; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use futures_rustls::{client, rustls, server}; use libp2p_core::{ + connection::Endpoint, either::EitherOutput, multiaddr::{Multiaddr, Protocol}, transport::{ListenerEvent, TransportError}, @@ -245,6 +246,32 @@ where } fn dial(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Dialer) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Listener) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } +} + +impl WsConfig +where + T: Transport + Send + Clone + 'static, + T::Error: Send + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { let addr = match parse_ws_dial_addr(addr) { Ok(addr) => addr, Err(Error::InvalidMultiaddr(a)) => { @@ -259,7 +286,7 @@ where let future = async move { loop { let this = self.clone(); - match this.dial_once(addr).await { + match this.dial_once(addr, role_override).await { Ok(Either::Left(redirect)) => { if remaining_redirects == 0 { debug!("Too many redirects (> {})", self.max_redirects); @@ -276,25 +303,19 @@ where Ok(Box::pin(future)) } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.transport.address_translation(server, observed) - } -} - -impl WsConfig -where - T: Transport, - T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ /// Attempts to dial the given address and perform a websocket handshake. async fn dial_once( self, addr: WsAddress, + role_override: Endpoint, ) -> Result>, Error> { trace!("Dialing websocket address: {:?}", addr); - let dial = self.transport.dial(addr.tcp_addr).map_err(|e| match e { + let dial = match role_override { + Endpoint::Dialer => self.transport.dial(addr.tcp_addr), + Endpoint::Listener => self.transport.dial_as_listener(addr.tcp_addr), + } + .map_err(|e| match e { TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a), TransportError::Other(e) => Error::Transport(e), })?; diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index eb14f078d5d..369e9000855 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -28,12 +28,13 @@ use error::Error; use framed::{Connection, Incoming}; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use libp2p_core::{ + connection::ConnectedPoint, multiaddr::Multiaddr, transport::{ map::{MapFuture, MapStream}, ListenerEvent, TransportError, }, - ConnectedPoint, Transport, + Transport, }; use rw_stream_sink::RwStreamSink; use std::{ @@ -129,6 +130,12 @@ where .dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.transport + .map(wrap_connection as WrapperFn) + .dial_as_listener(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) }