Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): Implement dial_with_new_port to prevent port reuse #3910

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ where
}
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
Either::Left(a) => match a.dial_with_new_port(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(Either::Left(err))),
},
Either::Right(b) => match b.dial_with_new_port(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(Either::Right(err))),
},
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match self {
Either::Left(a) => a.address_translation(server, observed),
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ pub trait Transport {
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>>;

///
fn dial_with_new_port(
&mut self,
_addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
unimplemented!()
}
Comment on lines +138 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of this API at all but I also don't have a better idea in the short-term. Long-term, I'd like to get rid of the Dial associated type (see #3436).


/// Poll for [`TransportEvent`]s.
///
/// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the lowest
Expand Down
22 changes: 22 additions & 0 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ where
Ok(future)
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self
.transport
.dial_with_new_port(addr.clone())
.map_err(|err| err.map(Either::Left))?;
let future = AndThenFuture {
inner: Either::Left(Box::pin(dialed_fut)),
args: Some((
self.fun.clone(),
ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Dialer,
},
)),
_marker: PhantomPinned,
};
Ok(future)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
Expand Down
19 changes: 19 additions & 0 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ trait Abstract<O> {
fn remove_listener(&mut self, id: ListenerId) -> bool;
fn dial(&mut self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn dial_as_listener(&mut self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn dial_with_new_port(&mut self, addr: Multiaddr)
-> Result<Dial<O>, TransportError<io::Error>>;
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
fn poll(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -92,6 +94,16 @@ where
Ok(Box::pin(fut) as Dial<_>)
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Dial<O>, TransportError<io::Error>> {
let fut = Transport::dial_with_new_port(self, addr)
.map(|r| r.map_err(box_err))
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Transport::address_translation(self, server, observed)
}
Expand Down Expand Up @@ -142,6 +154,13 @@ impl<O> Transport for Boxed<O> {
self.inner.dial_as_listener(addr)
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial_with_new_port(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ where
Err(TransportError::MultiaddrNotSupported(addr))
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = match self.0.dial_with_new_port(addr) {
Ok(connec) => return Ok(EitherFuture::First(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => {
return Err(TransportError::Other(Either::Left(err)))
}
};

let addr = match self.1.dial_with_new_port(addr) {
Ok(connec) => return Ok(EitherFuture::Second(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => {
return Err(TransportError::Other(Either::Right(err)))
}
};

Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(addr) = self.0.address_translation(server, observed) {
Some(addr)
Expand Down
26 changes: 26 additions & 0 deletions core/src/transport/global_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,32 @@ impl<T: crate::Transport + Unpin> crate::Transport for Transport<T> {
}
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
match addr.iter().next() {
Some(Protocol::Ip4(a)) => {
if !ipv4_global::is_global(a) {
debug!("Not dialing non global IP address {:?}.", a);
return Err(TransportError::MultiaddrNotSupported(addr));
}
self.inner.dial_with_new_port(addr)
}
Some(Protocol::Ip6(a)) => {
if !ipv6_global::is_global(a) {
debug!("Not dialing non global IP address {:?}.", a);
return Err(TransportError::MultiaddrNotSupported(addr));
}
self.inner.dial_with_new_port(addr)
}
_ => {
debug!("Not dialing unsupported Multiaddress {:?}.", addr);
Err(TransportError::MultiaddrNotSupported(addr))
}
}
}

fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(listen, observed)
}
Expand Down
15 changes: 15 additions & 0 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ where
})
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial_with_new_port(addr.clone())?;
let p = ConnectedPoint::Dialer {
address: addr,
role_override: Endpoint::Dialer,
};
Ok(MapFuture {
inner: future,
args: Some((self.fun.clone(), p)),
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ where
}
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let map = self.map.clone();
match self.transport.dial_with_new_port(addr) {
Ok(future) => Ok(MapErrDial {
inner: future,
map: Some(map),
}),
Err(err) => Err(err.map(map)),
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ impl Transport for MemoryTransport {
self.dial(addr)
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<DialFuture, TransportError<Self::Error>> {
self.dial(addr)
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ where
}
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
if let Some(inner) = self.0.as_mut() {
inner.dial_with_new_port(addr)
} else {
Err(TransportError::MultiaddrNotSupported(addr))
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(inner) = &self.0 {
inner.address_translation(server, observed)
Expand Down
14 changes: 14 additions & 0 deletions core/src/transport/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ where
})
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let dial = self
.inner
.dial_with_new_port(addr)
.map_err(|err| err.map(TransportTimeoutError::Other))?;
Ok(Timeout {
inner: dial,
timer: Delay::new(self.outgoing_timeout),
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ where
self.0.dial_as_listener(addr)
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
self.0.dial_with_new_port(addr)
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.0.listen_on(addr)
}
Expand Down Expand Up @@ -429,6 +436,20 @@ where
})
}

fn dial_with_new_port(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self
.inner
.dial_with_new_port(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade.clone())),
})
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner
.listen_on(addr)
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/src/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl<'a> AsClient<'a> {
return Err(OutboundProbeError::NoServer);
}
};
let request_id = self.inner.send_request(
let request_id = self.inner.send_request_with_new_port(
&server,
DialRequest {
peer_id: self.local_peer_id,
Expand Down
1 change: 1 addition & 0 deletions protocols/autonat/src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
.override_dial_concurrency_factor(
NonZeroU8::new(1).expect("1 > 0"),
)
.use_new_port()
.addresses(addrs)
.build(),
},
Expand Down
27 changes: 27 additions & 0 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,33 @@ where
request_id
}

/// TODO: Document
pub fn send_request_with_new_port(
Copy link
Contributor

@thomaseizinger thomaseizinger May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really work unfortunately. libp2p-request-response hides the idea of connections and streams from the user. At the moment, there is no guarantee that a certain messages is actually going to be sent over a specific connection.

I think to effectively utilize this API in AutoNAT, we will have to rewrite it to not use libp2p-request-response.

The flow would something like:

  • Issue a new ToSwarm::Dial that uses the new_port option
  • Grab the (future) ConnectionId from DialOpts
  • Remember it in the behaviour, together with the request that we want to send on that connection
  • Wait for handle_established_outbound_connection with this new ConnectionId
  • Initialize the handler with that connection

This is quite specific to AutoNAT and I am not sure we should extend libp2p-request-response to be able to do this. We could make libp2p-request-response do such kind of things as well. But, I think it is a bad fit for the abstraction because it gets too leaky. To me, libp2p-request-response is all about providing convenience when you just want to declare a message type and send it back and forth between two peers. If we don't hide certain details about the internals of libp2p with it, then the complexity for the user doesn't actually go away.

Another thing that bothers me is that we expose functions such as add_address. Those should go away in my opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite specific to AutoNAT and I am not sure we should extend libp2p-request-response to be able to do this.

Agreed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that even with rewriting it, the issue of "don't run other protocols over that connection" is not solved although I am not entirely convinced it is actually a problem.

&mut self,
peer: &PeerId,
request: TCodec::Request,
) -> RequestId {
let request_id = self.next_request_id();
let request = RequestProtocol {
request_id,
codec: self.codec.clone(),
protocols: self.outbound_protocols.clone(),
request,
};

if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer).use_new_port().build(),
});
self.pending_outbound_requests
.entry(*peer)
.or_default()
.push(request);
}

request_id
}

/// Initiates sending a response to an inbound request.
///
/// If the [`ResponseChannel`] is already closed due to a timeout or the
Expand Down
Loading