diff --git a/core/src/either.rs b/core/src/either.rs index 32e09ca691c..d067817b652 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -206,6 +206,25 @@ where } } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + use TransportError::*; + match self { + Either::Left(a) => match a.dial_with_new_port(addr) { + Ok(connec) => Ok(EitherFuture::First(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(Either::Left(err))), + }, + Either::Right(b) => match b.dial_with_new_port(addr) { + Ok(connec) => Ok(EitherFuture::Second(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(Either::Right(err))), + }, + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { match self { Either::Left(a) => a.address_translation(server, observed), diff --git a/core/src/transport.rs b/core/src/transport.rs index ca1796b3b35..017f360494b 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -134,6 +134,14 @@ pub trait Transport { addr: Multiaddr, ) -> Result>; + /// + fn dial_with_new_port( + &mut self, + _addr: Multiaddr, + ) -> Result> { + unimplemented!() + } + /// Poll for [`TransportEvent`]s. /// /// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the lowest diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index fb5280568ea..506f4d566d9 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -105,6 +105,28 @@ where Ok(future) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let dialed_fut = self + .transport + .dial_with_new_port(addr.clone()) + .map_err(|err| err.map(Either::Left))?; + let future = AndThenFuture { + inner: Either::Left(Box::pin(dialed_fut)), + args: Some(( + self.fun.clone(), + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }, + )), + _marker: PhantomPinned, + }; + Ok(future) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) } diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index a55e4db8466..48871286916 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -56,6 +56,8 @@ trait Abstract { fn remove_listener(&mut self, id: ListenerId) -> bool; fn dial(&mut self, addr: Multiaddr) -> Result, TransportError>; fn dial_as_listener(&mut self, addr: Multiaddr) -> Result, TransportError>; + fn dial_with_new_port(&mut self, addr: Multiaddr) + -> Result, TransportError>; fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; fn poll( self: Pin<&mut Self>, @@ -92,6 +94,16 @@ where Ok(Box::pin(fut) as Dial<_>) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result, TransportError> { + let fut = Transport::dial_with_new_port(self, addr) + .map(|r| r.map_err(box_err)) + .map_err(|e| e.map(box_err))?; + Ok(Box::pin(fut) as Dial<_>) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { Transport::address_translation(self, server, observed) } @@ -142,6 +154,13 @@ impl Transport for Boxed { self.inner.dial_as_listener(addr) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.inner.dial_with_new_port(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index bb7d542d292..8e67cc43d9a 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -107,6 +107,29 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let addr = match self.0.dial_with_new_port(addr) { + Ok(connec) => return Ok(EitherFuture::First(connec)), + Err(TransportError::MultiaddrNotSupported(addr)) => addr, + Err(TransportError::Other(err)) => { + return Err(TransportError::Other(Either::Left(err))) + } + }; + + let addr = match self.1.dial_with_new_port(addr) { + Ok(connec) => return Ok(EitherFuture::Second(connec)), + Err(TransportError::MultiaddrNotSupported(addr)) => addr, + Err(TransportError::Other(err)) => { + return Err(TransportError::Other(Either::Right(err))) + } + }; + + Err(TransportError::MultiaddrNotSupported(addr)) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { if let Some(addr) = self.0.address_translation(server, observed) { Some(addr) diff --git a/core/src/transport/global_only.rs b/core/src/transport/global_only.rs index b0a12de0f70..0ec4a5a5157 100644 --- a/core/src/transport/global_only.rs +++ b/core/src/transport/global_only.rs @@ -333,6 +333,32 @@ impl crate::Transport for Transport { } } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + match addr.iter().next() { + Some(Protocol::Ip4(a)) => { + if !ipv4_global::is_global(a) { + debug!("Not dialing non global IP address {:?}.", a); + return Err(TransportError::MultiaddrNotSupported(addr)); + } + self.inner.dial_with_new_port(addr) + } + Some(Protocol::Ip6(a)) => { + if !ipv6_global::is_global(a) { + debug!("Not dialing non global IP address {:?}.", a); + return Err(TransportError::MultiaddrNotSupported(addr)); + } + self.inner.dial_with_new_port(addr) + } + _ => { + debug!("Not dialing unsupported Multiaddress {:?}.", addr); + Err(TransportError::MultiaddrNotSupported(addr)) + } + } + } + fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(listen, observed) } diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 50f7b826d36..6a75fc24664 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -96,6 +96,21 @@ where }) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let future = self.transport.dial_with_new_port(addr.clone())?; + let p = ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }; + Ok(MapFuture { + inner: future, + args: Some((self.fun.clone(), p)), + }) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) } diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 99f2912447f..112ab124812 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -84,6 +84,20 @@ where } } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let map = self.map.clone(); + match self.transport.dial_with_new_port(addr) { + Ok(future) => Ok(MapErrDial { + inner: future, + map: Some(map), + }), + Err(err) => Err(err.map(map)), + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) } diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 7e079d07fb5..41e5c6fd312 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -237,6 +237,13 @@ impl Transport for MemoryTransport { self.dial(addr) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.dial(addr) + } + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { None } diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 2d93077659c..7940ebbd575 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -95,6 +95,17 @@ where } } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + if let Some(inner) = self.0.as_mut() { + inner.dial_with_new_port(addr) + } else { + Err(TransportError::MultiaddrNotSupported(addr)) + } + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { if let Some(inner) = &self.0 { inner.address_translation(server, observed) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index c796e6f0775..092dbc83785 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -120,6 +120,20 @@ where }) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let dial = self + .inner + .dial_with_new_port(addr) + .map_err(|err| err.map(TransportTimeoutError::Other))?; + Ok(Timeout { + inner: dial, + timer: Delay::new(self.outgoing_timeout), + }) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.inner.address_translation(server, observed) } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 9f6998d9968..f70f2265aee 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -350,6 +350,13 @@ where self.0.dial_as_listener(addr) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.0.dial_with_new_port(addr) + } + fn listen_on(&mut self, addr: Multiaddr) -> Result> { self.0.listen_on(addr) } @@ -429,6 +436,20 @@ where }) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let future = self + .inner + .dial_with_new_port(addr) + .map_err(|err| err.map(TransportUpgradeError::Transport))?; + Ok(DialUpgradeFuture { + future: Box::pin(future), + upgrade: future::Either::Left(Some(self.upgrade.clone())), + }) + } + fn listen_on(&mut self, addr: Multiaddr) -> Result> { self.inner .listen_on(addr) diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index e0c0b2e9e0a..e53d8d01619 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -300,7 +300,7 @@ impl<'a> AsClient<'a> { return Err(OutboundProbeError::NoServer); } }; - let request_id = self.inner.send_request( + let request_id = self.inner.send_request_with_new_port( &server, DialRequest { peer_id: self.local_peer_id, diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 063943392f3..f4667fe6343 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -135,6 +135,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> { .override_dial_concurrency_factor( NonZeroU8::new(1).expect("1 > 0"), ) + .use_new_port() .addresses(addrs) .build(), }, diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 4267f83da8c..52b3608f720 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -396,6 +396,33 @@ where request_id } + /// TODO: Document + pub fn send_request_with_new_port( + &mut self, + peer: &PeerId, + request: TCodec::Request, + ) -> RequestId { + let request_id = self.next_request_id(); + let request = RequestProtocol { + request_id, + codec: self.codec.clone(), + protocols: self.outbound_protocols.clone(), + request, + }; + + if let Some(request) = self.try_send_request(peer, request) { + self.pending_events.push_back(ToSwarm::Dial { + opts: DialOpts::peer_id(*peer).use_new_port().build(), + }); + self.pending_outbound_requests + .entry(*peer) + .or_default() + .push(request); + } + + request_id + } + /// Initiates sending a response to an inbound request. /// /// If the [`ResponseChannel`] is already closed due to a timeout or the diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index dc3d94cb83b..729a377cb80 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -46,6 +46,7 @@ pub struct DialOpts { role_override: Endpoint, dial_concurrency_factor_override: Option, connection_id: ConnectionId, + use_new_port: bool, } impl DialOpts { @@ -66,6 +67,7 @@ impl DialOpts { condition: Default::default(), role_override: Endpoint::Dialer, dial_concurrency_factor_override: Default::default(), + use_new_port: false, } } @@ -147,6 +149,10 @@ impl DialOpts { pub(crate) fn role_override(&self) -> Endpoint { self.role_override } + + pub(crate) fn use_new_port(&self) -> bool { + self.use_new_port + } } impl From for DialOpts { @@ -167,6 +173,7 @@ pub struct WithPeerId { condition: PeerCondition, role_override: Endpoint, dial_concurrency_factor_override: Option, + use_new_port: bool, } impl WithPeerId { @@ -192,6 +199,7 @@ impl WithPeerId { extend_addresses_through_behaviour: false, role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, + use_new_port: self.use_new_port, } } @@ -206,6 +214,12 @@ impl WithPeerId { self } + /// TODO: Document + pub fn use_new_port(mut self) -> Self { + self.use_new_port = true; + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts { @@ -216,6 +230,7 @@ impl WithPeerId { role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, connection_id: ConnectionId::next(), + use_new_port: self.use_new_port, } } } @@ -228,6 +243,7 @@ pub struct WithPeerIdWithAddresses { extend_addresses_through_behaviour: bool, role_override: Endpoint, dial_concurrency_factor_override: Option, + use_new_port: bool, } impl WithPeerIdWithAddresses { @@ -262,6 +278,12 @@ impl WithPeerIdWithAddresses { self } + /// + pub fn use_new_port(mut self) -> Self { + self.use_new_port = true; + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts { @@ -272,6 +294,7 @@ impl WithPeerIdWithAddresses { role_override: self.role_override, dial_concurrency_factor_override: self.dial_concurrency_factor_override, connection_id: ConnectionId::next(), + use_new_port: self.use_new_port, } } } @@ -316,6 +339,7 @@ impl WithoutPeerIdWithAddress { role_override: self.role_override, dial_concurrency_factor_override: None, connection_id: ConnectionId::next(), + use_new_port: false, } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a32beda411b..fb5ccbaaa6a 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -599,10 +599,15 @@ where .into_iter() .map(|a| match p2p_addr(peer_id, a) { Ok(address) => { - let dial = match dial_opts.role_override() { - Endpoint::Dialer => self.transport.dial(address.clone()), - Endpoint::Listener => self.transport.dial_as_listener(address.clone()), + let dial = if dial_opts.use_new_port() { + self.transport.dial_with_new_port(address.clone()) + } else { + match dial_opts.role_override() { + Endpoint::Dialer => self.transport.dial(address.clone()), + Endpoint::Listener => self.transport.dial_as_listener(address.clone()), + } }; + match dial { Ok(fut) => fut .map(|r| (address, r.map_err(TransportError::Other))) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index d68eb7f1928..836a7727bd0 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -202,6 +202,23 @@ impl Transport for GenTransport

{ Err(TransportError::MultiaddrNotSupported(addr)) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let socket_family = socket_addr.ip().into(); + let mut dialer = Dialer::new::

(self.quinn_config.clone(), socket_family)?; + let dialer_state = &mut dialer.state; + + Ok(dialer_state.new_dial(socket_addr, self.handshake_timeout, version)) + } + fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 78f6c3f4656..2a33f6bfc9f 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -508,6 +508,45 @@ where self.dial(addr) } + // TODO: deduplicate with `dial` + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) { + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + socket_addr + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + log::debug!("WITH_NEW_PORT dialing {}", socket_addr); + + let socket = self + .create_socket(&socket_addr) + .map_err(TransportError::Other)?; + + socket + .set_nonblocking(true) + .map_err(TransportError::Other)?; + + Ok(async move { + // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus + // do the `connect` call within the [`Future`]. + match socket.connect(&socket_addr.into()) { + Ok(()) => {} + Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => return Err(err), + }; + + let stream = T::new_stream(socket.into()).await?; + Ok(stream) + } + .boxed()) + } + /// When port reuse is disabled and hence ephemeral local ports are /// used for outgoing connections, the returned address is the /// `observed` address with the port replaced by the port of the diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 369e1d612bf..d8433e3db37 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -214,6 +214,13 @@ where self.transport.dial_as_listener(addr) } + fn dial_with_new_port( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.transport.dial_with_new_port(addr) + } + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { self.transport.address_translation(server, observed) }