diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index b7db3b4f919..3f61ba3fe46 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -22,12 +22,24 @@ - Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408]) +- Allow overriding role when dialing. This option is needed for NAT and firewall + hole punching. + + - Add `Transport::dial_as_listener`. As `Transport::dial` but + overrides the role of the local node on the connection . I.e. has the + local node act as a listener on the outgoing connection. + + - Add `override_role` option to `DialOpts`. + + See [PR 2363]. + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352 [PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392 [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408 +[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363 # 0.30.1 [2021-11-16] diff --git a/core/src/connection.rs b/core/src/connection.rs index d6249f19e8e..68a53d9e7e5 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -97,7 +97,10 @@ pub enum PendingPoint { /// There is no single address associated with the Dialer of a pending /// connection. Addresses are dialed in parallel. Only once the first dial /// is successful is the address of the connection known. - Dialer, + Dialer { + /// Same as [`ConnectedPoint::Dialer`] `role_override`. + role_override: Endpoint, + }, /// The socket comes from a listener. Listener { /// Local connection address. @@ -110,7 +113,7 @@ pub enum PendingPoint { impl From for PendingPoint { fn from(endpoint: ConnectedPoint) -> Self { match endpoint { - ConnectedPoint::Dialer { .. } => PendingPoint::Dialer, + ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override }, ConnectedPoint::Listener { local_addr, send_back_addr, @@ -129,6 +132,27 @@ pub enum ConnectedPoint { Dialer { /// Multiaddress that was successfully dialed. address: Multiaddr, + /// Whether the role of the local node on the connection should be + /// overriden. I.e. whether the local node should act as a listener on + /// the outgoing connection. + /// + /// This option is needed for NAT and firewall hole punching. + /// + /// - [`Endpoint::Dialer`] represents the default non-overriding option. + /// + /// - [`Endpoint::Listener`] represents the overriding option. + /// Realization depends on the transport protocol. E.g. in the case of + /// TCP, both endpoints dial each other, resulting in a _simultaneous + /// open_ TCP connection. On this new connection both endpoints assume + /// to be the dialer of the connection. This is problematic during the + /// connection upgrade process where an upgrade assumes one side to be + /// the listener. With the help of this option, both peers can + /// negotiate the roles (dialer and listener) for the new connection + /// ahead of time, through some external channel, e.g. the DCUtR + /// protocol, and thus have one peer dial the other and upgrade the + /// connection as a dialer and one peer dial the other and upgrade the + /// connection _as a listener_ overriding its role. + role_override: Endpoint, }, /// We received the node. Listener { @@ -179,7 +203,10 @@ impl ConnectedPoint { /// Returns true if the connection is relayed. pub fn is_relayed(&self) -> bool { match self { - ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Dialer { + address, + role_override: _, + } => address, ConnectedPoint::Listener { local_addr, .. } => local_addr, } .iter() @@ -194,7 +221,7 @@ impl ConnectedPoint { /// not be usable to establish new connections. pub fn get_remote_address(&self) -> &Multiaddr { match self { - ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Dialer { address, .. } => address, ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, } } @@ -204,7 +231,7 @@ impl ConnectedPoint { /// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`. pub fn set_remote_address(&mut self, new_address: Multiaddr) { match self { - ConnectedPoint::Dialer { address } => *address = new_address, + ConnectedPoint::Dialer { address, .. } => *address = new_address, ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address, } } diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index 4c394aeb75d..55a371b7f4c 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -491,6 +491,13 @@ mod tests { panic!() } + fn dial_as_listener( + self, + _: Multiaddr, + ) -> Result> { + panic!() + } + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } @@ -542,6 +549,13 @@ mod tests { panic!() } + fn dial_as_listener( + self, + _: Multiaddr, + ) -> Result> { + panic!() + } + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 85ac1b8a7ca..dc8a8ccf5e6 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -22,8 +22,8 @@ use crate::{ connection::{ handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo, - IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError, + Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, Endpoint, + IncomingInfo, IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint, Substream, }, muxing::StreamMuxer, @@ -460,7 +460,7 @@ where local_addr, send_back_addr, }), - PendingPoint::Dialer => None, + PendingPoint::Dialer { .. } => None, }) } @@ -535,6 +535,7 @@ where addresses: impl Iterator + Send + 'static, peer: Option, handler: THandler, + role_override: Endpoint, dial_concurrency_factor_override: Option, ) -> Result> where @@ -550,6 +551,7 @@ where peer, addresses, dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), + role_override, ); let connection_id = self.next_connection_id(); @@ -566,13 +568,15 @@ where .boxed(), ); - self.counters.inc_pending(&PendingPoint::Dialer); + let endpoint = PendingPoint::Dialer { role_override }; + + self.counters.inc_pending(&endpoint); self.pending.insert( connection_id, PendingConnectionInfo { peer_id: peer, handler, - endpoint: PendingPoint::Dialer, + endpoint: endpoint, _drop_notifier: drop_notifier, }, ); @@ -745,9 +749,13 @@ where self.counters.dec_pending(&endpoint); let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) { - (PendingPoint::Dialer, Some((address, errors))) => { - (ConnectedPoint::Dialer { address }, Some(errors)) - } + (PendingPoint::Dialer { role_override }, Some((address, errors))) => ( + ConnectedPoint::Dialer { + address, + role_override, + }, + Some(errors), + ), ( PendingPoint::Listener { local_addr, @@ -761,7 +769,7 @@ where }, None, ), - (PendingPoint::Dialer, None) => unreachable!( + (PendingPoint::Dialer { .. }, None) => unreachable!( "Established incoming connection via pending outgoing connection." ), (PendingPoint::Listener { .. }, Some(_)) => unreachable!( @@ -910,7 +918,7 @@ where self.counters.dec_pending(&endpoint); match (endpoint, error) { - (PendingPoint::Dialer, Either::Left(error)) => { + (PendingPoint::Dialer { .. }, Either::Left(error)) => { return Poll::Ready(PoolEvent::PendingOutboundConnectionError { id, error, @@ -933,7 +941,7 @@ where local_addr, }); } - (PendingPoint::Dialer, Either::Right(_)) => { + (PendingPoint::Dialer { .. }, Either::Right(_)) => { unreachable!("Inbound error for outbound connection.") } (PendingPoint::Listener { .. }, Either::Left(_)) => { @@ -1176,7 +1184,7 @@ impl ConnectionCounters { fn inc_pending(&mut self, endpoint: &PendingPoint) { match endpoint { - PendingPoint::Dialer => { + PendingPoint::Dialer { .. } => { self.pending_outgoing += 1; } PendingPoint::Listener { .. } => { @@ -1191,7 +1199,7 @@ impl ConnectionCounters { fn dec_pending(&mut self, endpoint: &PendingPoint) { match endpoint { - PendingPoint::Dialer => { + PendingPoint::Dialer { .. } => { self.pending_outgoing -= 1; } PendingPoint::Listener { .. } => { diff --git a/core/src/connection/pool/concurrent_dial.rs b/core/src/connection/pool/concurrent_dial.rs index 4e960e09188..6c10ca1b19d 100644 --- a/core/src/connection/pool/concurrent_dial.rs +++ b/core/src/connection/pool/concurrent_dial.rs @@ -18,9 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -pub use crate::connection::{ConnectionCounters, ConnectionLimits}; - use crate::{ + connection::Endpoint, transport::{Transport, TransportError}, Multiaddr, PeerId, }; @@ -63,14 +62,21 @@ where peer: Option, addresses: impl Iterator + Send + 'static, concurrency_factor: NonZeroU8, + role_override: Endpoint, ) -> Self { let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) { - Ok(address) => match transport.clone().dial(address.clone()) { - Ok(fut) => fut - .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) - .boxed(), - Err(err) => futures::future::ready((address, Err(err))).boxed(), - }, + Ok(address) => { + let dial = match role_override { + Endpoint::Dialer => transport.clone().dial(address.clone()), + Endpoint::Listener => transport.clone().dial_as_listener(address.clone()), + }; + match dial { + Ok(fut) => fut + .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) + .boxed(), + Err(err) => futures::future::ready((address, Err(err))).boxed(), + } + } Err(address) => futures::future::ready(( address.clone(), Err(TransportError::MultiaddrNotSupported(address)), diff --git a/core/src/either.rs b/core/src/either.rs index 66a11589f7a..46f8ba833cc 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -529,6 +529,25 @@ where } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + use TransportError::*; + match self { + EitherTransport::Left(a) => match a.dial_as_listener(addr) { + Ok(connec) => Ok(EitherFuture::First(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::A(err))), + }, + EitherTransport::Right(b) => match b.dial_as_listener(addr) { + Ok(connec) => Ok(EitherFuture::Second(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::B(err))), + }, + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { match self { EitherTransport::Left(a) => a.address_translation(server, observed), diff --git a/core/src/network.rs b/core/src/network.rs index 63a07573c7f..f96bee76f7e 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -21,7 +21,7 @@ mod event; pub mod peer; -pub use crate::connection::{ConnectionCounters, ConnectionLimits}; +pub use crate::connection::{ConnectionCounters, ConnectionLimits, Endpoint}; pub use event::{IncomingConnection, NetworkEvent}; pub use peer::Peer; @@ -97,7 +97,7 @@ where self.pool .iter_pending_info() .filter(move |(_, endpoint, peer_id)| { - matches!(endpoint, PendingPoint::Dialer) && peer_id.as_ref() == Some(&peer) + matches!(endpoint, PendingPoint::Dialer { .. }) && peer_id.as_ref() == Some(&peer) }) .map(|(connection_id, _, _)| connection_id) } @@ -206,19 +206,24 @@ where { let opts = opts.into(); - let (peer_id, addresses, dial_concurrency_factor_override) = match opts.0 { + let (peer_id, addresses, dial_concurrency_factor_override, role_override) = match opts.0 { // Dial a known peer. Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { peer_id, addresses, dial_concurrency_factor_override, + role_override, }) => ( Some(peer_id), Either::Left(addresses.into_iter()), dial_concurrency_factor_override, + role_override, ), // Dial an unknown peer. - Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { address }) => { + Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { + address, + role_override, + }) => { // If the address ultimately encapsulates an expected peer ID, dial that peer // such that any mismatch is detected. We do not "pop off" the `P2p` protocol // from the address, because it may be used by the `Transport`, i.e. `P2p` @@ -239,7 +244,12 @@ where Err(_) => return Err(DialError::InvalidPeerId { handler }), }; - (peer_id, Either::Right(std::iter::once(address)), None) + ( + peer_id, + Either::Right(std::iter::once(address)), + None, + role_override, + ) } }; @@ -248,6 +258,7 @@ where addresses, peer_id, handler, + role_override, dial_concurrency_factor_override, ) } @@ -284,7 +295,7 @@ where pub fn dialing_peers(&self) -> impl Iterator { self.pool .iter_pending_info() - .filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer)) + .filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer { .. })) .filter_map(|(_, _, peer)| peer.as_ref()) } @@ -627,6 +638,7 @@ impl WithPeerId { peer_id: self.peer_id, addresses, dial_concurrency_factor_override: Default::default(), + role_override: Endpoint::Dialer, } } } @@ -636,6 +648,7 @@ pub struct WithPeerIdWithAddresses { pub(crate) peer_id: PeerId, pub(crate) addresses: Vec, pub(crate) dial_concurrency_factor_override: Option, + pub(crate) role_override: Endpoint, } impl WithPeerIdWithAddresses { @@ -645,6 +658,17 @@ impl WithPeerIdWithAddresses { self } + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self, role: Endpoint) -> Self { + self.role_override = role; + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithPeerIdWithAddresses(self)) @@ -657,16 +681,31 @@ pub struct WithoutPeerId {} impl WithoutPeerId { /// Specify a single address to dial the unknown peer. pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { - WithoutPeerIdWithAddress { address } + WithoutPeerIdWithAddress { + address, + role_override: Endpoint::Dialer, + } } } #[derive(Debug, Clone, PartialEq)] pub struct WithoutPeerIdWithAddress { pub(crate) address: Multiaddr, + pub(crate) role_override: Endpoint, } impl WithoutPeerIdWithAddress { + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self, role: Endpoint) -> Self { + self.role_override = role; + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithoutPeerIdWithAddress(self)) diff --git a/core/src/transport.rs b/core/src/transport.rs index 7006c15a810..664d55b1d60 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -25,7 +25,7 @@ //! any desired protocols. The rest of the module defines combinators for //! modifying a transport through composition with other transports or protocol upgrades. -use crate::ConnectedPoint; +use crate::connection::ConnectedPoint; use futures::prelude::*; use multiaddr::Multiaddr; use std::{error::Error, fmt}; @@ -130,6 +130,15 @@ pub trait Transport { where Self: Sized; + /// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection. + /// + /// This option is needed for NAT and firewall hole punching. + /// + /// See [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) for related option. + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized; + /// Performs a transport-specific mapping of an address `observed` by /// a remote onto a local `listen` address to yield an address for /// the local node that may be reachable for other peers. diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index 3ff70fcae0c..219887fa6df 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + connection::{ConnectedPoint, Endpoint}, either::EitherError, transport::{ListenerEvent, Transport, TransportError}, - ConnectedPoint, }; use futures::{future::Either, prelude::*}; use multiaddr::Multiaddr; @@ -76,7 +76,32 @@ where .map_err(|err| err.map(EitherError::A))?; let future = AndThenFuture { inner: Either::Left(Box::pin(dialed_fut)), - args: Some((self.fun, ConnectedPoint::Dialer { address: addr })), + args: Some(( + self.fun, + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }, + )), + _marker: PhantomPinned, + }; + Ok(future) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let dialed_fut = self + .transport + .dial_as_listener(addr.clone()) + .map_err(|err| err.map(EitherError::A))?; + let future = AndThenFuture { + inner: Either::Left(Box::pin(dialed_fut)), + args: Some(( + self.fun, + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Listener, + }, + )), _marker: PhantomPinned, }; Ok(future) diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index 001a0c9fdf3..0f47470fcad 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -52,6 +52,7 @@ type ListenerUpgrade = Pin> + Send>>; trait Abstract { fn listen_on(&self, addr: Multiaddr) -> Result, TransportError>; fn dial(&self, addr: Multiaddr) -> Result, TransportError>; + fn dial_as_listener(&self, addr: Multiaddr) -> Result, TransportError>; fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; } @@ -85,6 +86,13 @@ where Ok(Box::pin(fut) as Dial<_>) } + fn dial_as_listener(&self, addr: Multiaddr) -> Result, TransportError> { + let fut = Transport::dial_as_listener(self.clone(), addr) + .map(|r| r.map_err(box_err)) + .map_err(|e| e.map(box_err))?; + Ok(Box::pin(fut) as Dial<_>) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { Transport::address_translation(self, server, observed) } @@ -119,6 +127,10 @@ impl Transport for Boxed { self.inner.dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.inner.dial_as_listener(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index e9545617f09..3f2d87064cb 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -83,6 +83,26 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let addr = match self.0.dial_as_listener(addr) { + Ok(connec) => return Ok(EitherFuture::First(connec)), + Err(TransportError::MultiaddrNotSupported(addr)) => addr, + Err(TransportError::Other(err)) => { + return Err(TransportError::Other(EitherError::A(err))) + } + }; + + let addr = match self.1.dial_as_listener(addr) { + Ok(connec) => return Ok(EitherFuture::Second(connec)), + Err(TransportError::MultiaddrNotSupported(addr)) => addr, + Err(TransportError::Other(err)) => { + return Err(TransportError::Other(EitherError::B(err))) + } + }; + + Err(TransportError::MultiaddrNotSupported(addr)) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { if let Some(addr) = self.0.address_translation(server, observed) { Some(addr) diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index a4eaa14901d..ead00ff9609 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -70,6 +70,10 @@ impl Transport for DummyTransport { Err(TransportError::MultiaddrNotSupported(addr)) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 4493507c1d9..ec5d8c4a1be 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -19,8 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + connection::{ConnectedPoint, Endpoint}, transport::{ListenerEvent, Transport, TransportError}, - ConnectedPoint, }; use futures::prelude::*; use multiaddr::Multiaddr; @@ -60,7 +60,22 @@ where fn dial(self, addr: Multiaddr) -> Result> { let future = self.transport.dial(addr.clone())?; - let p = ConnectedPoint::Dialer { address: addr }; + let p = ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }; + Ok(MapFuture { + inner: future, + args: Some((self.fun, p)), + }) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let future = self.transport.dial_as_listener(addr.clone())?; + let p = ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Listener, + }; Ok(MapFuture { inner: future, args: Some((self.fun, p)), diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index df26214435a..eba3e042552 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -68,6 +68,17 @@ where } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let map = self.map; + match self.transport.dial_as_listener(addr) { + Ok(future) => Ok(MapErrDial { + inner: future, + map: Some(map), + }), + Err(err) => Err(err.map(map)), + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) } diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 3b4706c9adb..075622a9f9c 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -205,6 +205,10 @@ impl Transport for MemoryTransport { DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 2b29773ee22..a3bfc744add 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -75,6 +75,14 @@ where } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + if let Some(inner) = self.0 { + inner.dial_as_listener(addr) + } else { + Err(TransportError::MultiaddrNotSupported(addr)) + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { if let Some(inner) = &self.0 { inner.address_translation(server, observed) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index 8084dcb7521..51fead8d42e 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -109,6 +109,17 @@ where }) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let dial = self + .inner + .dial_as_listener(addr) + .map_err(|err| err.map(TransportTimeoutError::Other))?; + Ok(Timeout { + inner: dial, + timer: Delay::new(self.outgoing_timeout), + }) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 7777be9256e..676e0989baa 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -23,6 +23,7 @@ pub use crate::upgrade::Version; use crate::{ + connection::ConnectedPoint, muxing::{StreamMuxer, StreamMuxerBox}, transport::{ and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, @@ -32,7 +33,7 @@ use crate::{ self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade, OutboundUpgradeApply, UpgradeError, }, - ConnectedPoint, Negotiated, PeerId, + Negotiated, PeerId, }; use futures::{prelude::*, ready}; use multiaddr::Multiaddr; @@ -340,6 +341,10 @@ where self.0.dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.0.dial_as_listener(addr) + } + fn listen_on(self, addr: Multiaddr) -> Result> { self.0.listen_on(addr) } @@ -393,6 +398,17 @@ where }) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let future = self + .inner + .dial_as_listener(addr) + .map_err(|err| err.map(TransportUpgradeError::Transport))?; + Ok(DialUpgradeFuture { + future: Box::pin(future), + upgrade: future::Either::Left(Some(self.upgrade)), + }) + } + fn listen_on(self, addr: Multiaddr) -> Result> { let stream = self .inner diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 3b4763d2303..ce506cfeb74 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError}; -use crate::{ConnectedPoint, Negotiated}; +use crate::{connection::ConnectedPoint, Negotiated}; use futures::{future::Either, prelude::*}; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; @@ -27,6 +27,7 @@ use std::{iter, mem, pin::Pin, task::Context, task::Poll}; pub use multistream_select::Version; +// TODO: Still needed? /// Applies an upgrade to the inbound and outbound direction of a connection or substream. pub fn apply( conn: C, @@ -38,10 +39,11 @@ where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade> + OutboundUpgrade>, { - if cp.is_listener() { - Either::Left(apply_inbound(conn, up)) - } else { - Either::Right(apply_outbound(conn, up, v)) + match cp { + ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => { + Either::Right(apply_outbound(conn, up, v)) + } + _ => Either::Left(apply_inbound(conn, up)), } } diff --git a/core/tests/concurrent_dialing.rs b/core/tests/concurrent_dialing.rs index e708be91ac7..2da22a5d240 100644 --- a/core/tests/concurrent_dialing.rs +++ b/core/tests/concurrent_dialing.rs @@ -120,7 +120,7 @@ fn concurrent_dialing() { .. }) => { match connection.endpoint() { - ConnectedPoint::Dialer { address } => { + ConnectedPoint::Dialer { address, .. } => { assert_eq!( *address, accepted_addr diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 8a8e1d1eabe..a1a8bb6ff21 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -30,7 +30,7 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::{ connection::{ConnectionId, ListenerId}, - ConnectedPoint, Multiaddr, PeerId, + ConnectedPoint, Endpoint, Multiaddr, PeerId, }; use libp2p_request_response::{ handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse, @@ -315,12 +315,23 @@ impl NetworkBehaviour for Behaviour { connections.insert(*conn, addr); match endpoint { - ConnectedPoint::Dialer { address } => { + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + } => { if let Some(event) = self.as_server().on_outbound_connection(peer, address) { self.pending_out_events .push_back(Event::InboundProbe(event)); } } + ConnectedPoint::Dialer { + address: _, + role_override: Endpoint::Listener, + } => { + // Outgoing connection was dialed as a listener. In other words outgoing connection + // was dialed as part of a hole punch. `libp2p-autonat` never attempts to hole + // punch, thus this connection has not been requested by this [`NetworkBehaviour`]. + } ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(), } } diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index b96adaf78a4..1e606cad95e 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -30,7 +30,7 @@ use libp2p::{ use libp2p_autonat::{ Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError, }; -use libp2p_core::ConnectedPoint; +use libp2p_core::{ConnectedPoint, Endpoint}; use libp2p_swarm::DialError; use std::{num::NonZeroU32, time::Duration}; @@ -191,7 +191,11 @@ async fn test_dial_back() { match server.select_next_some().await { SwarmEvent::ConnectionEstablished { peer_id, - endpoint: ConnectedPoint::Dialer { address }, + endpoint: + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + }, num_established, concurrent_dial_errors, } => { @@ -399,7 +403,11 @@ async fn test_dial_multiple_addr() { match server.select_next_some().await { SwarmEvent::ConnectionEstablished { peer_id, - endpoint: ConnectedPoint::Dialer { address }, + endpoint: + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + }, concurrent_dial_errors, .. } => { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 7e921c3e425..84f4ec94275 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -38,6 +38,7 @@ mod tests { use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; + use libp2p_core::Endpoint; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -189,7 +190,10 @@ mod tests { &peer, &ConnectionId::new(0), &if outbound { - ConnectedPoint::Dialer { address } + ConnectedPoint::Dialer { + address, + role_override: Endpoint::Dialer, + } } else { ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -534,6 +538,7 @@ mod tests { &ConnectionId::new(1), &ConnectedPoint::Dialer { address: "/ip4/127.0.0.1".parse::().unwrap(), + role_override: Endpoint::Dialer, }, None, ); @@ -4075,6 +4080,7 @@ mod tests { &ConnectionId::new(0), &ConnectedPoint::Dialer { address: addr.clone(), + role_override: Endpoint::Dialer, }, None, ); @@ -4094,6 +4100,7 @@ mod tests { &ConnectionId::new(0), &ConnectedPoint::Dialer { address: addr2.clone(), + role_override: Endpoint::Dialer, }, None, ); @@ -4122,6 +4129,7 @@ mod tests { &ConnectionId::new(0), &ConnectedPoint::Dialer { address: addr.clone(), + role_override: Endpoint::Dialer, }, None, ); diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index def59b7c401..2e644502014 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -228,7 +228,7 @@ impl NetworkBehaviour for Identify { failed_addresses: Option<&Vec>, ) { let addr = match endpoint { - ConnectedPoint::Dialer { address } => address.clone(), + ConnectedPoint::Dialer { address, .. } => address.clone(), ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), }; diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index c08e1401507..d4bda76172d 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1968,7 +1968,7 @@ where // since the remote address on an inbound connection may be specific // to that connection (e.g. typically the TCP port numbers). let address = match endpoint { - ConnectedPoint::Dialer { address } => Some(address), + ConnectedPoint::Dialer { address, .. } => Some(address), ConnectedPoint::Listener { .. } => None, }; self.connection_updated(source, address, NodeStatus::Connected); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index b5d52f273fb..21eb76c1d25 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -33,7 +33,7 @@ use libp2p_core::{ multiaddr::{multiaddr, Multiaddr, Protocol}, multihash::{Code, Multihash, MultihashDigest}, transport::MemoryTransport, - upgrade, PeerId, Transport, + upgrade, Endpoint, PeerId, Transport, }; use libp2p_noise as noise; use libp2p_swarm::{Swarm, SwarmEvent}; @@ -1287,6 +1287,7 @@ fn network_behaviour_inject_address_change() { let endpoint = ConnectedPoint::Dialer { address: old_address.clone(), + role_override: Endpoint::Dialer, }; // Mimick a connection being established. @@ -1316,9 +1317,11 @@ fn network_behaviour_inject_address_change() { &connection_id, &ConnectedPoint::Dialer { address: old_address.clone(), + role_override: Endpoint::Dialer, }, &ConnectedPoint::Dialer { address: new_address.clone(), + role_override: Endpoint::Dialer, }, ); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 3a7acd72fb1..dbde7db608d 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -30,7 +30,7 @@ use libp2p_core::{ use libp2p_mplex as mplex; use libp2p_noise as noise; use libp2p_ping as ping; -use libp2p_swarm::{dial_opts::DialOpts, DummyBehaviour, KeepAlive, Swarm, SwarmEvent}; +use libp2p_swarm::{DummyBehaviour, KeepAlive, Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; use libp2p_yamux as yamux; use quickcheck::*; diff --git a/protocols/relay/src/v1/transport.rs b/protocols/relay/src/v1/transport.rs index 811541793e8..c96129155dc 100644 --- a/protocols/relay/src/v1/transport.rs +++ b/protocols/relay/src/v1/transport.rs @@ -25,6 +25,7 @@ use futures::channel::oneshot; use futures::future::{BoxFuture, Future, FutureExt}; use futures::sink::SinkExt; use futures::stream::{Stream, StreamExt}; +use libp2p_core::connection::Endpoint; use libp2p_core::either::{EitherError, EitherFuture, EitherOutput}; use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::{ListenerEvent, TransportError}; @@ -220,15 +221,41 @@ impl Transport for RelayTransport { } fn dial(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Dialer) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Listener) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner_transport.address_translation(server, observed) + } +} + +impl RelayTransport { + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { match parse_relayed_multiaddr(addr)? { // Address does not contain circuit relay protocol. Use inner transport. - Err(addr) => match self.inner_transport.dial(addr) { - Ok(dialer) => Ok(EitherFuture::First(dialer)), - Err(TransportError::MultiaddrNotSupported(addr)) => { - Err(TransportError::MultiaddrNotSupported(addr)) + Err(addr) => { + let dial = match role_override { + Endpoint::Dialer => self.inner_transport.dial(addr), + Endpoint::Listener => self.inner_transport.dial_as_listener(addr), + }; + match dial { + Ok(dialer) => Ok(EitherFuture::First(dialer)), + Err(TransportError::MultiaddrNotSupported(addr)) => { + Err(TransportError::MultiaddrNotSupported(addr)) + } + Err(TransportError::Other(err)) => { + Err(TransportError::Other(EitherError::A(err))) + } } - Err(TransportError::Other(err)) => Err(TransportError::Other(EitherError::A(err))), - }, + } // Address does contain circuit relay protocol. Dial destination via relay. Ok(RelayedMultiaddr { relay_peer_id, @@ -263,10 +290,6 @@ impl Transport for RelayTransport { } } } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner_transport.address_translation(server, observed) - } } #[derive(Default)] diff --git a/protocols/relay/src/v2/client/transport.rs b/protocols/relay/src/v2/client/transport.rs index 6128f25bcc7..0a1cffad0bb 100644 --- a/protocols/relay/src/v2/client/transport.rs +++ b/protocols/relay/src/v2/client/transport.rs @@ -197,6 +197,17 @@ impl Transport for ClientTransport { .boxed()) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + // [`Transport::dial_as_listener`] is used for NAT and firewall + // traversal. One would coordinate such traversal via a previously + // established relayed connection, but never using a relayed connection + // itself. + return Err(TransportError::MultiaddrNotSupported(addr)); + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/protocols/relay/tests/v1.rs b/protocols/relay/tests/v1.rs index fe4ca9f29b5..69d08ae7d66 100644 --- a/protocols/relay/tests/v1.rs +++ b/protocols/relay/tests/v1.rs @@ -1264,6 +1264,10 @@ impl Transport for Firewall { self.0.dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.0.dial_as_listener(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.0.address_translation(server, observed) } diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 6b5a202b476..3709e509a06 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -29,9 +29,7 @@ use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; -use libp2p::swarm::{ - dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, -}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use libp2p::yamux::YamuxConfig; use std::fmt::Debug; use std::time::Duration; diff --git a/protocols/rendezvous/tests/rendezvous.rs b/protocols/rendezvous/tests/rendezvous.rs index 7d02610a5af..458ea588832 100644 --- a/protocols/rendezvous/tests/rendezvous.rs +++ b/protocols/rendezvous/tests/rendezvous.rs @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use libp2p_core::identity; use libp2p_rendezvous as rendezvous; -use libp2p_swarm::{dial_opts::DialOpts, DialError, Swarm, SwarmEvent}; +use libp2p_swarm::{DialError, Swarm, SwarmEvent}; use std::convert::TryInto; use std::time::Duration; diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 1f6d1eeb817..af6b601eae0 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -599,7 +599,7 @@ where new: &ConnectedPoint, ) { let new_address = match new { - ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Dialer { address, .. } => Some(address.clone()), ConnectedPoint::Listener { .. } => None, }; let connections = self @@ -631,7 +631,7 @@ where _errors: Option<&Vec>, ) { let address = match endpoint { - ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Dialer { address, .. } => Some(address.clone()), ConnectedPoint::Listener { .. } => None, }; self.connected diff --git a/src/bandwidth.rs b/src/bandwidth.rs index a341b4dfbab..1ff63cc5ec8 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -89,6 +89,13 @@ where .map(move |fut| BandwidthFuture { inner: fut, sinks }) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + let sinks = self.sinks; + self.inner + .dial_as_listener(addr) + .map(move |fut| BandwidthFuture { inner: fut, sinks }) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index a97ea499aa6..57df6ddad68 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -14,9 +14,13 @@ - Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]). -- Enable overriding _dial concurrency factor_ per dial via +- Allow overriding _dial concurrency factor_ per dial via `DialOpts::override_dial_concurrency_factor`. See [PR 2404]. +- Allow overriding role when dialing through `override_role` option on + `DialOpts`. This option is needed for NAT and firewall hole punching. See [PR + 2363]. + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 @@ -24,6 +28,7 @@ [PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375 [PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378 [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 +[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363 # 0.32.0 [2021-11-16] diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index e98092a73d4..b443d8f420b 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -19,6 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use libp2p_core::connection::Endpoint; use libp2p_core::{Multiaddr, PeerId}; use std::num::NonZeroU8; @@ -51,6 +52,7 @@ impl DialOpts { WithPeerId { peer_id, condition: Default::default(), + role_override: Endpoint::Dialer, dial_concurrency_factor_override: Default::default(), } } @@ -108,6 +110,7 @@ pub(super) enum Opts { pub struct WithPeerId { pub(crate) peer_id: PeerId, pub(crate) condition: PeerCondition, + pub(crate) role_override: Endpoint, pub(crate) dial_concurrency_factor_override: Option, } @@ -132,10 +135,22 @@ impl WithPeerId { condition: self.condition, addresses, extend_addresses_through_behaviour: false, + role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, } } + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self) -> Self { + self.role_override = Endpoint::Listener; + self + } + /// Build the final [`DialOpts`]. /// /// Addresses to dial the peer are retrieved via @@ -151,6 +166,7 @@ pub struct WithPeerIdWithAddresses { pub(crate) condition: PeerCondition, pub(crate) addresses: Vec, pub(crate) extend_addresses_through_behaviour: bool, + pub(crate) role_override: Endpoint, pub(crate) dial_concurrency_factor_override: Option, } @@ -168,6 +184,17 @@ impl WithPeerIdWithAddresses { self } + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self) -> Self { + self.role_override = Endpoint::Listener; + self + } + /// Override /// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor). pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self { @@ -187,16 +214,30 @@ pub struct WithoutPeerId {} impl WithoutPeerId { /// Specify a single address to dial the unknown peer. pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { - WithoutPeerIdWithAddress { address } + WithoutPeerIdWithAddress { + address, + role_override: Endpoint::Dialer, + } } } #[derive(Debug)] pub struct WithoutPeerIdWithAddress { pub(crate) address: Multiaddr, + pub(crate) role_override: Endpoint, } impl WithoutPeerIdWithAddress { + /// Override role of local node on connection. I.e. execute the dial _as a + /// listener_. + /// + /// See + /// [`ConnectedPoint::Dialer`](libp2p_core::connection::ConnectedPoint::Dialer) + /// for details. + pub fn override_role(mut self) -> Self { + self.role_override = Endpoint::Listener; + self + } /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithoutPeerIdWithAddress(self)) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cc09ebf22a6..4ebd0da2e55 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -368,11 +368,13 @@ where dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { peer_id, condition, + role_override, dial_concurrency_factor_override, }) | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses { peer_id, condition, + role_override, dial_concurrency_factor_override, .. }) => { @@ -439,7 +441,9 @@ where addresses }; - let mut opts = libp2p_core::DialOpts::peer_id(peer_id).addresses(addresses); + let mut opts = libp2p_core::DialOpts::peer_id(peer_id) + .addresses(addresses) + .override_role(role_override); if let Some(f) = dial_concurrency_factor_override { opts = opts.override_dial_concurrency_factor(f); @@ -450,8 +454,10 @@ where // Dial an unknown peer. dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress { address, + role_override, }) => libp2p_core::DialOpts::unknown_peer_id() .address(address) + .override_role(role_override) .build(), }; diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 2cfc27f93a6..55684140152 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -58,6 +58,7 @@ use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider}; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ + connection::Endpoint, multiaddr::{Multiaddr, Protocol}, transport::{ListenerEvent, TransportError}, Transport, @@ -211,6 +212,31 @@ where } fn dial(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Dialer) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Listener) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } +} + +impl GenDnsConfig +where + T: Transport + Clone + Send + 'static, + T::Error: Send, + T::Dial: Send, + C: DnsHandle, + P: ConnectionProvider, +{ + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { // Asynchronlously resolve all DNS names in the address before proceeding // with dialing on the underlying transport. Ok(async move { @@ -293,7 +319,11 @@ where log::debug!("Dialing {}", addr); let transport = inner.clone(); - let result = match transport.dial(addr) { + let dial = match role_override { + Endpoint::Dialer => transport.dial(addr), + Endpoint::Listener => transport.dial_as_listener(addr), + }; + let result = match dial { Ok(out) => { // We only count attempts that the inner transport // actually accepted, i.e. for which it produced @@ -338,10 +368,6 @@ where .boxed() .right_future()) } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(server, observed) - } } /// The possible errors of a [`GenDnsConfig`] wrapped transport. @@ -579,6 +605,13 @@ mod tests { Ok(Box::pin(future::ready(Ok(())))) } + fn dial_as_listener( + self, + addr: Multiaddr, + ) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index bfad45f0e44..eec52e97193 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -405,6 +405,10 @@ where Ok(Box::pin(self.do_dial(socket_addr))) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.dial(addr) + } + /// When port reuse is disabled and hence ephemeral local ports are /// used for outgoing connections, the returned address is the /// `observed` address with the port replaced by the port of the diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index 34ac4eb51c3..a5fabc162a0 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -105,6 +105,7 @@ impl Transport for $uds_config { } fn dial(self, addr: Multiaddr) -> Result> { + // TODO: Should we dial at all? if let Ok(path) = multiaddr_to_path(&addr) { debug!("Dialing {}", addr); Ok(async move { <$unix_stream>::connect(&path).await }.boxed()) @@ -113,6 +114,10 @@ impl Transport for $uds_config { } } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 27aafdb70c3..5ef8dbb0bbd 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -33,7 +33,11 @@ //! use futures::{future::Ready, prelude::*}; -use libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport}; +use libp2p_core::{ + connection::Endpoint, + transport::{ListenerEvent, TransportError}, + Multiaddr, Transport, +}; use parity_send_wrapper::SendWrapper; use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use wasm_bindgen::{prelude::*, JsCast}; @@ -61,7 +65,11 @@ pub mod ffi { /// If the multiaddress is not supported, you should return an instance of `Error` whose /// `name` property has been set to the string `"NotSupportedError"`. #[wasm_bindgen(method, catch)] - pub fn dial(this: &Transport, multiaddr: &str) -> Result; + pub fn dial( + this: &Transport, + multiaddr: &str, + _role_override: bool, + ) -> Result; /// Start listening on the given multiaddress. /// @@ -148,6 +156,29 @@ impl ExtTransport { inner: SendWrapper::new(transport), } } + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { + let promise = self + .inner + .dial( + &addr.to_string(), + matches!(role_override, Endpoint::Listener), + ) + .map_err(|err| { + if is_not_supported_error(&err) { + TransportError::MultiaddrNotSupported(addr) + } else { + TransportError::Other(JsErr::from(err)) + } + })?; + + Ok(Dial { + inner: SendWrapper::new(promise.into()), + }) + } } impl fmt::Debug for ExtTransport { @@ -187,18 +218,18 @@ impl Transport for ExtTransport { }) } - fn dial(self, addr: Multiaddr) -> Result> { - let promise = self.inner.dial(&addr.to_string()).map_err(|err| { - if is_not_supported_error(&err) { - TransportError::MultiaddrNotSupported(addr) - } else { - TransportError::Other(JsErr::from(err)) - } - })?; + fn dial(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + self.do_dial(addr, Endpoint::Dialer) + } - Ok(Dial { - inner: SendWrapper::new(promise.into()), - }) + fn dial_as_listener(self, addr: Multiaddr) -> Result> + where + Self: Sized, + { + self.do_dial(addr, Endpoint::Listener) } fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 266a6b9bada..1e009248fad 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -23,6 +23,7 @@ use either::Either; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use futures_rustls::{client, rustls, server}; use libp2p_core::{ + connection::Endpoint, either::EitherOutput, multiaddr::{Multiaddr, Protocol}, transport::{ListenerEvent, TransportError}, @@ -245,6 +246,32 @@ where } fn dial(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Dialer) + } + + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.do_dial(addr, Endpoint::Listener) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } +} + +impl WsConfig +where + T: Transport + Send + Clone + 'static, + T::Error: Send + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + fn do_dial( + self, + addr: Multiaddr, + role_override: Endpoint, + ) -> Result<::Dial, TransportError<::Error>> { let addr = match parse_ws_dial_addr(addr) { Ok(addr) => addr, Err(Error::InvalidMultiaddr(a)) => { @@ -259,7 +286,7 @@ where let future = async move { loop { let this = self.clone(); - match this.dial_once(addr).await { + match this.dial_once(addr, role_override).await { Ok(Either::Left(redirect)) => { if remaining_redirects == 0 { debug!("Too many redirects (> {})", self.max_redirects); @@ -276,25 +303,19 @@ where Ok(Box::pin(future)) } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.transport.address_translation(server, observed) - } -} - -impl WsConfig -where - T: Transport, - T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static, -{ /// Attempts to dial the given address and perform a websocket handshake. async fn dial_once( self, addr: WsAddress, + role_override: Endpoint, ) -> Result>, Error> { trace!("Dialing websocket address: {:?}", addr); - let dial = self.transport.dial(addr.tcp_addr).map_err(|e| match e { + let dial = match role_override { + Endpoint::Dialer => self.transport.dial(addr.tcp_addr), + Endpoint::Listener => self.transport.dial_as_listener(addr.tcp_addr), + } + .map_err(|e| match e { TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a), TransportError::Other(e) => Error::Transport(e), })?; diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index eb14f078d5d..369e9000855 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -28,12 +28,13 @@ use error::Error; use framed::{Connection, Incoming}; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use libp2p_core::{ + connection::ConnectedPoint, multiaddr::Multiaddr, transport::{ map::{MapFuture, MapStream}, ListenerEvent, TransportError, }, - ConnectedPoint, Transport, + Transport, }; use rw_stream_sink::RwStreamSink; use std::{ @@ -129,6 +130,12 @@ where .dial(addr) } + fn dial_as_listener(self, addr: Multiaddr) -> Result> { + self.transport + .map(wrap_connection as WrapperFn) + .dial_as_listener(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) }