Skip to content

Commit

Permalink
feat(transport): allow ListenerId to be user-controlled
Browse files Browse the repository at this point in the history
`Transport::listen_on` is an asynchronous operation. It returns immediately but the actual process of establishing a listening socket happens as part of `Transport::poll` which will return one or more `TransportEvent`s related to a particular `listen_on` call.

Currently, `listen_on` returns a `ListenerId` which allows the user of the `Transport` interface to correlate the events with a particular `listen_on` call. This "user" is the `Swarm` runtime. Currently, a user of libp2p establishes a new listening socket by talking to the `Swarm::listen_on` interface and it is not possible to do the same thing via the `NetworkBehaviour` trait.

Within the `NetworkBehaviour` trait, we emit _commands_ to the `Swarm` like `ToSwarm::Dial`. These commands don't have a "return value" like a synchronous function does and thus, if we were to add a `ToSwarm::ListenOn` command, it could not receive the `ListenerId` from the `Transport`.

To fix this and to be consistent with our [coding guidelines](https://github.com/libp2p/rust-libp2p/blob/master/docs/coding-guidelines.md#allow-correlating-asynchronous-responses-to-their-requests) we change the interface of `Transport::listen_on` to require the user to pass in a `ListenerId`. This will allow us to construct a command in a `NetworkBehaviour` that remembers this ID which enables precise tracking of which events containing a `ListenerId` correlate which a particular `listen_on` command.

This is especially important in the context of listening on wildcard addresses like `0.0.0.0` because we end up binding to multiple network interfaces and thus emit multiple events for a single `listen_on` call.

Pull-Request: libp2p#3567.
  • Loading branch information
dariusc93 authored May 14, 2023
1 parent 9e62588 commit 5b32c8a
Show file tree
Hide file tree
Showing 34 changed files with 289 additions and 133 deletions.
4 changes: 4 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
## 0.40.0 - unreleased

- Allow `ListenerId` to be user-controlled, i.e. to be provided on `Transport::listen_on`.
See [PR 3567].

- Raise MSRV to 1.65.
See [PR 3715].

- Remove deprecated symbols related to upgrades.
See [PR 3867].

[PR 3567]: https://github.com/libp2p/rust-libp2p/pull/3567
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3867]: https://github.com/libp2p/rust-libp2p/pull/3867

Expand Down
10 changes: 7 additions & 3 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,18 @@ where
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
use TransportError::*;
match self {
Either::Left(a) => a.listen_on(addr).map_err(|e| match e {
Either::Left(a) => a.listen_on(id, addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(Either::Left(err)),
}),
Either::Right(b) => b.listen_on(addr).map_err(|e| match e {
Either::Right(b) => b.listen_on(id, addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(Either::Right(err)),
}),
Expand Down
30 changes: 22 additions & 8 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
error::Error,
fmt,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
};

Expand All @@ -55,6 +56,8 @@ pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
pub use self::upgrade::Upgrade;

static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1);

/// A transport provides connection-oriented communication between two peers
/// through ordered streams of data (i.e. connections).
///
Expand Down Expand Up @@ -109,8 +112,12 @@ pub trait Transport {
/// obtained from [dialing](Transport::dial).
type Dial: Future<Output = Result<Self::Output, Self::Error>>;

/// Listens on the given [`Multiaddr`] for inbound connections.
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>>;
/// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>>;

/// Remove a listener.
///
Expand Down Expand Up @@ -241,18 +248,25 @@ pub trait Transport {

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ListenerId(u64);
pub struct ListenerId(usize);

impl ListenerId {
#[deprecated(note = "Renamed to ` ListenerId::next`.")]
#[allow(clippy::new_without_default)]
/// Creates a new `ListenerId`.
pub fn new() -> Self {
ListenerId(rand::random())
ListenerId::next()
}

/// Creates a new `ListenerId`.
pub fn next() -> Self {
ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst))
}
}

impl Default for ListenerId {
fn default() -> Self {
Self::new()
#[deprecated(note = "Use ` ListenerId::next` instead.")]
#[allow(clippy::should_implement_trait)]
pub fn default() -> Self {
Self::next()
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ where
type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
type Dial = AndThenFuture<T::Dial, C, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.transport
.listen_on(addr)
.listen_on(id, addr)
.map_err(|err| err.map(Either::Left))
}

Expand Down
22 changes: 17 additions & 5 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;

trait Abstract<O> {
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>>;
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<io::Error>>;
fn remove_listener(&mut self, id: ListenerId) -> bool;
fn dial(&mut self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn dial_as_listener(&mut self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
Expand All @@ -70,8 +74,12 @@ where
T::Dial: Send + 'static,
T::ListenerUpgrade: Send + 'static,
{
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
Transport::listen_on(self, addr).map_err(|e| e.map(box_err))
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<io::Error>> {
Transport::listen_on(self, id, addr).map_err(|e| e.map(box_err))
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down Expand Up @@ -123,8 +131,12 @@ impl<O> Transport for Boxed<O> {
type ListenerUpgrade = ListenerUpgrade<O>;
type Dial = Dial<O>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner.listen_on(addr)
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.inner.listen_on(id, addr)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
10 changes: 7 additions & 3 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ where
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let addr = match self.0.listen_on(addr) {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let addr = match self.0.listen_on(id, addr) {
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
res => return res.map_err(|err| err.map(Either::Left)),
};

let addr = match self.1.listen_on(addr) {
let addr = match self.1.listen_on(id, addr) {
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
res => return res.map_err(|err| err.map(Either::Right)),
};
Expand Down
6 changes: 5 additions & 1 deletion core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl<TOut> Transport for DummyTransport<TOut> {
type ListenerUpgrade = futures::future::Pending<Result<Self::Output, io::Error>>;
type Dial = futures::future::Pending<Result<Self::Output, io::Error>>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
_id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/global_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ impl<T: crate::Transport + Unpin> crate::Transport for Transport<T> {
type ListenerUpgrade = <T as crate::Transport>::ListenerUpgrade;
type Dial = <T as crate::Transport>::Dial;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner.listen_on(addr)
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.inner.listen_on(id, addr)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ where
type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
type Dial = MapFuture<T::Dial, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.transport.listen_on(addr)
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.transport.listen_on(id, addr)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
10 changes: 8 additions & 2 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ where
type ListenerUpgrade = MapErrListenerUpgrade<T, F>;
type Dial = MapErrDial<T, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let map = self.map.clone();
self.transport.listen_on(addr).map_err(|err| err.map(map))
self.transport
.listen_on(id, addr)
.map_err(|err| err.map(map))
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
55 changes: 38 additions & 17 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ impl Transport for MemoryTransport {
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = DialFuture;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let port = if let Ok(port) = parse_memory_addr(&addr) {
port
} else {
Expand All @@ -191,7 +195,6 @@ impl Transport for MemoryTransport {
None => return Err(TransportError::Other(MemoryTransportError::Unreachable)),
};

let id = ListenerId::new();
let listener = Listener {
id,
port,
Expand All @@ -201,7 +204,7 @@ impl Transport for MemoryTransport {
};
self.listeners.push_back(Box::pin(listener));

Ok(id)
Ok(())
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down Expand Up @@ -457,30 +460,40 @@ mod tests {
let addr_1: Multiaddr = "/memory/1639174018481".parse().unwrap();
let addr_2: Multiaddr = "/memory/8459375923478".parse().unwrap();

let listener_id_1 = transport.listen_on(addr_1.clone()).unwrap();
let listener_id_1 = ListenerId::next();

transport.listen_on(listener_id_1, addr_1.clone()).unwrap();
assert!(
transport.remove_listener(listener_id_1),
"Listener doesn't exist."
);

let listener_id_2 = transport.listen_on(addr_1.clone()).unwrap();
let listener_id_3 = transport.listen_on(addr_2.clone()).unwrap();
let listener_id_2 = ListenerId::next();
transport.listen_on(listener_id_2, addr_1.clone()).unwrap();
let listener_id_3 = ListenerId::next();
transport.listen_on(listener_id_3, addr_2.clone()).unwrap();

assert!(transport.listen_on(addr_1.clone()).is_err());
assert!(transport.listen_on(addr_2.clone()).is_err());
assert!(transport
.listen_on(ListenerId::next(), addr_1.clone())
.is_err());
assert!(transport
.listen_on(ListenerId::next(), addr_2.clone())
.is_err());

assert!(
transport.remove_listener(listener_id_2),
"Listener doesn't exist."
);
assert!(transport.listen_on(addr_1).is_ok());
assert!(transport.listen_on(addr_2.clone()).is_err());
assert!(transport.listen_on(ListenerId::next(), addr_1).is_ok());
assert!(transport
.listen_on(ListenerId::next(), addr_2.clone())
.is_err());

assert!(
transport.remove_listener(listener_id_3),
"Listener doesn't exist."
);
assert!(transport.listen_on(addr_2).is_ok());
assert!(transport.listen_on(ListenerId::next(), addr_2).is_ok());
}

#[test]
Expand All @@ -489,8 +502,11 @@ mod tests {
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
.is_err());
let _listener = transport
.listen_on("/memory/810172461024613".parse().unwrap())
transport
.listen_on(
ListenerId::next(),
"/memory/810172461024613".parse().unwrap(),
)
.unwrap();
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
Expand All @@ -504,7 +520,8 @@ mod tests {

let mut transport = MemoryTransport::default().boxed();
futures::executor::block_on(async {
let listener_id = transport.listen_on(addr.clone()).unwrap();
let listener_id = ListenerId::next();
transport.listen_on(listener_id, addr.clone()).unwrap();
let reported_addr = transport
.select_next_some()
.await
Expand Down Expand Up @@ -539,7 +556,7 @@ mod tests {
let mut t1 = MemoryTransport::default().boxed();

let listener = async move {
t1.listen_on(t1_addr.clone()).unwrap();
t1.listen_on(ListenerId::next(), t1_addr.clone()).unwrap();
let upgrade = loop {
let event = t1.select_next_some().await;
if let Some(upgrade) = event.into_incoming() {
Expand Down Expand Up @@ -577,7 +594,9 @@ mod tests {
let mut listener_transport = MemoryTransport::default().boxed();

let listener = async move {
listener_transport.listen_on(listener_addr.clone()).unwrap();
listener_transport
.listen_on(ListenerId::next(), listener_addr.clone())
.unwrap();
loop {
if let TransportEvent::Incoming { send_back_addr, .. } =
listener_transport.select_next_some().await
Expand Down Expand Up @@ -614,7 +633,9 @@ mod tests {
let mut listener_transport = MemoryTransport::default().boxed();

let listener = async move {
listener_transport.listen_on(listener_addr.clone()).unwrap();
listener_transport
.listen_on(ListenerId::next(), listener_addr.clone())
.unwrap();
loop {
if let TransportEvent::Incoming { send_back_addr, .. } =
listener_transport.select_next_some().await
Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ where
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
if let Some(inner) = self.0.as_mut() {
inner.listen_on(addr)
inner.listen_on(id, addr)
} else {
Err(TransportError::MultiaddrNotSupported(addr))
}
Expand Down
Loading

0 comments on commit 5b32c8a

Please sign in to comment.