From eefdbd7c47bca1b4b37eef4075faa8ed2c11d0a4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 5 Jan 2023 10:52:50 +0100 Subject: [PATCH 1/6] fix(quic): Trigger `` wakeup on dial Scenario: rust-libp2p node A dials rust-libp2p node B. B listens on a QUIC address. A dials B via the `libp2p-quic` `Transport` wrapped in a `libp2p-dns` `Transport`. Note that `libp2p-dns` in itself is not relevant here. Only the fact that `libp2p-dns` delays a dial is relevant, i.e. that it first does other async stuff (DNS lookup) before creating the QUIC dial. In fact, dialing an IP address through the DNS `Transport` where no DNS resolution is needed triggers the below just fine. 1. A calls `Swarm::dial` which creates a `libp2p-dns` dial. 2. That dial is spawned onto the connection `Pool`, thus starting the DNS resolution. 3. A continuously calls `Swarm::poll`. 4. `libp2p-quic` `Transport::poll` is called, finding no dialers in `self.dialer` given that the spawned dial is still only resolving the DNS address. 5. On the spawned connection task: 1. The DNS resolution finishes. 2. Thus calling `Transport::dial` on `libp1p-quic` (note that the DNS dial has a clone of the QUIC `Transport` wrapped in an `Arc>`). 3. That adds a dialer to `self.dialer`. Note that there are no listeners, i.e. `Swarm::listen_on` was never called. 4. `DialerState::new_dial` is called which adds a message to `self.pending_dials` and wakes `self.waker`. Given that on the last `Transport::poll` there was no `self.dialer`, that waker is empty. Result: The message is stuck in the `DialerState::pending_dials`. The message is never send to the endpoint driver. The dial never succeeds. This commit includes a hot-fix. It demonstrates the above issue more than it represents a solid fix. --- transports/quic/src/transport.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 8d122424b1b..423121485f0 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -71,6 +71,7 @@ pub struct GenTransport { listeners: SelectAll>, /// Dialer for each socket family if no matching listener exists. dialer: HashMap, + dialer_waker: Option, } impl GenTransport

{ @@ -84,6 +85,7 @@ impl GenTransport

{ quinn_config, handshake_timeout, dialer: HashMap::new(), + dialer_waker: None, support_draft_29, } } @@ -161,6 +163,9 @@ impl Transport for GenTransport

{ let dialer = match self.dialer.entry(socket_family) { Entry::Occupied(occupied) => occupied.into_mut(), Entry::Vacant(vacant) => { + if let Some(waker) = self.dialer_waker.take() { + waker.wake(); + } vacant.insert(Dialer::new::

(self.quinn_config.clone(), socket_family)?) } }; @@ -200,6 +205,8 @@ impl Transport for GenTransport

{ errored.push(*key); } } + self.dialer_waker = Some(cx.waker().clone()); + for key in errored { // Endpoint driver of dialer crashed. // Drop dialer and all pending dials so that the connection receiver is notified. From 4dafc3b5d6c691fa319e9790c64c5e598f43bb4c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 6 Jan 2023 13:45:31 +0100 Subject: [PATCH 2/6] Add test for dial waking up transport --- transports/quic/tests/smoke.rs | 114 +++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index a147864528c..b2039e97129 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -1,12 +1,15 @@ #![cfg(any(feature = "async-std", feature = "tokio"))] use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; use futures::future::{poll_fn, Either}; use futures::stream::StreamExt; use futures::{future, AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt}; +use futures_timer::Delay; use libp2p_core::either::EitherOutput; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt, SubstreamBox}; use libp2p_core::transport::{Boxed, OrTransport, TransportEvent}; +use libp2p_core::transport::{ListenerId, TransportError}; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr, PeerId, Transport}; use libp2p_noise as noise; use libp2p_quic as quic; @@ -19,6 +22,10 @@ use std::io; use std::num::NonZeroU8; use std::task::Poll; use std::time::Duration; +use std::{ + pin::Pin, + sync::{Arc, Mutex}, +}; #[cfg(feature = "tokio")] #[tokio::test] @@ -90,6 +97,113 @@ async fn ipv4_dial_ipv6() { assert_eq!(b_connected, a_peer_id); } +/// Tests that a [`Transport::dial`] wakes up the task previously polling [`Transport::poll`]. +/// +/// See https://github.com/libp2p/rust-libp2p/pull/3306 for context. +#[cfg(feature = "async-std")] +#[async_std::test] +async fn wrapped_with_dns() { + let _ = env_logger::try_init(); + + struct FakeDns(Arc>>); + + impl Transport for FakeDns { + type Output = (PeerId, StreamMuxerBox); + type Error = std::io::Error; + type ListenerUpgrade = Pin> + Send>>; + type Dial = BoxFuture<'static, Result>; + + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.0.lock().unwrap().listen_on(addr) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + self.0.lock().unwrap().remove_listener(id) + } + + fn address_translation( + &self, + listen: &Multiaddr, + observed: &Multiaddr, + ) -> Option { + self.0.lock().unwrap().address_translation(listen, observed) + } + + /// Delayed dial, i.e. calling [`Transport::dial`] on the inner [`Transport`] not within the + /// synchronous [`Transport::dial`] method, but within the [`Future`] returned by the outer + /// [`Transport::dial`]. + fn dial(&mut self, addr: Multiaddr) -> Result> { + let t = self.0.clone(); + Ok(async move { + // Simulate DNS lookup. Giving the `Transport::poll` the chance to return + // `Poll::Pending` and thus suspending its task, waiting for a wakeup from the dial + // on the inner transport below. + Delay::new(Duration::from_millis(1)).await; + + let dial = t.lock().unwrap().dial(addr).map_err(|e| match e { + TransportError::MultiaddrNotSupported(_) => { + panic!() + } + TransportError::Other(e) => e, + })?; + dial.await + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.0.lock().unwrap().dial_as_listener(addr) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut *self.0.lock().unwrap()).poll(cx) + } + } + + let (a_peer_id, mut a_transport) = create_default_transport::(); + let (b_peer_id, mut b_transport) = { + let (id, transport) = create_default_transport::(); + (id, FakeDns(Arc::new(Mutex::new(transport))).boxed()) + }; + + // Spawn a + let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic-v1").await; + let listener = async_std::task::spawn(async move { + let (upgrade, _) = a_transport + .select_next_some() + .await + .into_incoming() + .unwrap(); + let (peer_id, _) = upgrade.await.unwrap(); + + peer_id + }); + + // Spawn b + // + // Note that the dial is spawned on a different task than the transport allowing the transport + // task to poll the transport once and then suspend, waiting for the wakeup from the dial. + let dial = async_std::task::spawn({ + let dial = b_transport.dial(a_addr).unwrap(); + async { dial.await.unwrap().0 } + }); + async_std::task::spawn(async move { b_transport.next().await }); + + let (a_connected, b_connected) = future::join(listener, dial).await; + + assert_eq!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); +} + #[cfg(feature = "async-std")] #[async_std::test] #[ignore] // Transport currently does not validate PeerId. Enable once we make use of PeerId validation in rustls. From 9e34c8f0b9d2ea02df6a027847b08c8445bbfa93 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 6 Jan 2023 14:04:27 +0100 Subject: [PATCH 3/6] Manage waker in `GenTransport` instead of `DialerState` --- transports/quic/src/transport.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 423121485f0..f82d3bb2118 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -163,9 +163,6 @@ impl Transport for GenTransport

{ let dialer = match self.dialer.entry(socket_family) { Entry::Occupied(occupied) => occupied.into_mut(), Entry::Vacant(vacant) => { - if let Some(waker) = self.dialer_waker.take() { - waker.wake(); - } vacant.insert(Dialer::new::

(self.quinn_config.clone(), socket_family)?) } }; @@ -181,6 +178,12 @@ impl Transport for GenTransport

{ &mut listeners[index].dialer_state } }; + + // Wakeup the task polling [`Transport::poll`] to drive the new dial. + if let Some(waker) = self.dialer_waker.take() { + waker.wake(); + } + Ok(dialer_state.new_dial(socket_addr, self.handshake_timeout, version)) } @@ -205,17 +208,20 @@ impl Transport for GenTransport

{ errored.push(*key); } } - self.dialer_waker = Some(cx.waker().clone()); - for key in errored { // Endpoint driver of dialer crashed. // Drop dialer and all pending dials so that the connection receiver is notified. self.dialer.remove(&key); } + match self.listeners.poll_next_unpin(cx) { - Poll::Ready(Some(ev)) => Poll::Ready(ev), - _ => Poll::Pending, + Poll::Ready(Some(ev)) => return Poll::Ready(ev), + _ => {} } + + self.dialer_waker = Some(cx.waker().clone()); + + return Poll::Pending; } } @@ -259,12 +265,11 @@ impl Drop for Dialer { } } -/// Pending dials to be sent to the endpoint was the [`endpoint::Channel`] -/// has capacity +/// Pending dials to be sent to the endpoint once the [`endpoint::Channel`] +/// has capacity. #[derive(Default, Debug)] struct DialerState { pending_dials: VecDeque, - waker: Option, } impl DialerState { @@ -284,10 +289,6 @@ impl DialerState { self.pending_dials.push_back(message); - if let Some(waker) = self.waker.take() { - waker.wake(); - } - async move { // Our oneshot getting dropped means the message didn't make it to the endpoint driver. let connection = tx.await.map_err(|_| Error::EndpointDriverCrashed)??; @@ -312,7 +313,6 @@ impl DialerState { Err(endpoint::Disconnected {}) => return Poll::Ready(Error::EndpointDriverCrashed), } } - self.waker = Some(cx.waker().clone()); Poll::Pending } } From 7475c1a164e43dc8faf849c1cfc48ab071e40dfc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Jan 2023 14:21:10 +0100 Subject: [PATCH 4/6] Use 100ms delay --- transports/quic/tests/smoke.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index b2039e97129..95dc3c4519e 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -141,7 +141,7 @@ async fn wrapped_with_dns() { // Simulate DNS lookup. Giving the `Transport::poll` the chance to return // `Poll::Pending` and thus suspending its task, waiting for a wakeup from the dial // on the inner transport below. - Delay::new(Duration::from_millis(1)).await; + Delay::new(Duration::from_millis(100)).await; let dial = t.lock().unwrap().dial(addr).map_err(|e| match e { TransportError::MultiaddrNotSupported(_) => { From ac7d07d34d3c102e11d334fef56f91c3ff60dd2c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Jan 2023 14:22:45 +0100 Subject: [PATCH 5/6] Rename to DialDelay --- transports/quic/tests/smoke.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 95dc3c4519e..649aca09b26 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -105,9 +105,9 @@ async fn ipv4_dial_ipv6() { async fn wrapped_with_dns() { let _ = env_logger::try_init(); - struct FakeDns(Arc>>); + struct DialDelay(Arc>>); - impl Transport for FakeDns { + impl Transport for DialDelay { type Output = (PeerId, StreamMuxerBox); type Error = std::io::Error; type ListenerUpgrade = Pin> + Send>>; @@ -172,7 +172,7 @@ async fn wrapped_with_dns() { let (a_peer_id, mut a_transport) = create_default_transport::(); let (b_peer_id, mut b_transport) = { let (id, transport) = create_default_transport::(); - (id, FakeDns(Arc::new(Mutex::new(transport))).boxed()) + (id, DialDelay(Arc::new(Mutex::new(transport))).boxed()) }; // Spawn a From 384ee511baae87f204d7140e8836db67949989c0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Jan 2023 14:28:25 +0100 Subject: [PATCH 6/6] Fix clippy warning --- transports/quic/src/transport.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 41cb244abe5..dea01c74685 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -216,14 +216,13 @@ impl Transport for GenTransport

{ self.dialer.remove(&key); } - match self.listeners.poll_next_unpin(cx) { - Poll::Ready(Some(ev)) => return Poll::Ready(ev), - _ => {} + if let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) { + return Poll::Ready(ev); } self.dialer_waker = Some(cx.waker().clone()); - return Poll::Pending; + Poll::Pending } }