diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 3e6ac4e4127..e1dcdfb7a88 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -2,6 +2,15 @@ - Update to `libp2p-core` `v0.38.0`. +# 0.41.1 + +- Yield from `StreamMuxer::poll` as soon as we receive a single substream. + This fixes [issue 3041]. + See [PR 3071]. + +[PR 3071]: https://github.com/libp2p/rust-libp2p/pull/3071/ +[issue 3041]: https://github.com/libp2p/rust-libp2p/issues/3041/ + # 0.41.0 - Update to `libp2p-core` `v0.37.0`. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index cb8a1e4e5ef..6c4790924b5 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -22,7 +22,7 @@ log = "0.4" async-std = { version = "1.7.0", features = ["attributes"] } libp2p-muxer-test-harness = { path = "../test-harness" } -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index ce3639e572f..42fb1621e56 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -31,6 +31,7 @@ use futures::{ use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; +use std::task::Waker; use std::{ fmt, io, iter, mem, pin::Pin, @@ -55,6 +56,8 @@ pub struct Yamux { /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. /// Once the buffer is full, new inbound streams are dropped. inbound_stream_buffer: VecDeque, + /// Waker to be called when new inbound streams are available. + inbound_stream_waker: Option, } const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; @@ -81,6 +84,7 @@ where }, control: ctrl, inbound_stream_buffer: VecDeque::default(), + inbound_stream_waker: None, } } } @@ -101,6 +105,7 @@ where }, control: ctrl, inbound_stream_buffer: VecDeque::default(), + inbound_stream_waker: None, } } } @@ -122,6 +127,8 @@ where return Poll::Ready(Ok(stream)); } + self.inbound_stream_waker = Some(cx.waker().clone()); + self.poll_inner(cx) } @@ -140,17 +147,22 @@ where ) -> Poll> { let this = self.get_mut(); - loop { - let inbound_stream = ready!(this.poll_inner(cx))?; - - if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { - log::warn!("dropping {inbound_stream} because buffer is full"); - drop(inbound_stream); - continue; - } + let inbound_stream = ready!(this.poll_inner(cx))?; + if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { + log::warn!("dropping {inbound_stream} because buffer is full"); + drop(inbound_stream); + } else { this.inbound_stream_buffer.push_back(inbound_stream); + + if let Some(waker) = this.inbound_stream_waker.take() { + waker.wake() + } } + + // Schedule an immediate wake-up, allowing other code to run. + cx.waker().wake_by_ref(); + Poll::Pending } fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> {