diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 63541349f22..4ace9b94657 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,5 +1,13 @@ # 0.24.0 [unreleased] +- Change the default configuration to use `MaxBufferBehaviour::Block` + and yield from waiting for the next substream or reading from a + particular substream whenever the current read loop may have + already filled a substream buffer, to give the current task a + chance to read from the buffer(s) before the `MaxBufferBehaviour` + takes effect. This is primarily relevant for + `MaxBufferBehaviour::ResetStream`. + - Tweak the naming in the `MplexConfig` API for better consistency with `libp2p-yamux`. [PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822). diff --git a/muxers/mplex/src/config.rs b/muxers/mplex/src/config.rs index fe615ba48d5..7d48b50999d 100644 --- a/muxers/mplex/src/config.rs +++ b/muxers/mplex/src/config.rs @@ -90,14 +90,26 @@ impl MplexConfig { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum MaxBufferBehaviour { /// Reset the substream whose frame buffer overflowed. + /// + /// > **Note**: If more than [`MplexConfig::set_max_buffer_size()`] frames + /// > are received in succession for a substream in the context of + /// > trying to read data from a different substream, the former substream + /// > may be reset before application code had a chance to read from the + /// > buffer. The max. buffer size needs to be sized appropriately when + /// > using this option to balance maximum resource usage and the + /// > probability of premature termination of a substream. ResetStream, - /// No new message can be read from any substream as long as the buffer - /// for a single substream is full. + /// No new message can be read from the underlying connection from any + /// substream as long as the buffer for a single substream is full, + /// i.e. application code is expected to read from the full buffer. /// - /// This can potentially introduce a deadlock if you are waiting for a - /// message from a substream before processing the messages received - /// on another substream, i.e. if there are data dependencies across - /// substreams. + /// > **Note**: To avoid blocking without making progress, application + /// > tasks should ensure that, when woken, always try to read (i.e. + /// > make progress) from every substream on which data is expected. + /// > This is imperative in general, as a woken task never knows for + /// > which substream it has been woken, but failure to do so with + /// > [`MaxBufferBehaviour::Block`] in particular may lead to stalled + /// > execution or spinning of a task without progress. Block, } @@ -106,9 +118,8 @@ impl Default for MplexConfig { MplexConfig { max_substreams: 128, max_buffer_len: 32, - max_buffer_behaviour: MaxBufferBehaviour::ResetStream, + max_buffer_behaviour: MaxBufferBehaviour::Block, split_send_size: 1024, } } } - diff --git a/muxers/mplex/src/io.rs b/muxers/mplex/src/io.rs index ffead520ee7..a390f79c2ed 100644 --- a/muxers/mplex/src/io.rs +++ b/muxers/mplex/src/io.rs @@ -214,8 +214,18 @@ where } debug_assert!(self.open_buffer.is_empty()); + let mut num_buffered = 0; loop { + // Whenever we may have completely filled a substream + // buffer while waiting for the next inbound stream, + // yield to give the current task a chance to read + // from the respective substreams. + if num_buffered == self.config.max_buffer_len { + cx.waker().clone().wake(); + return Poll::Pending + } + // Wait for the next inbound `Open` frame. match ready!(self.poll_read_frame(cx, None))? { Frame::Open { stream_id } => { @@ -225,6 +235,7 @@ where } Frame::Data { stream_id, data } => { self.buffer(stream_id.into_local(), data)?; + num_buffered += 1; } Frame::Close { stream_id } => { self.on_close(stream_id.into_local())?; @@ -406,7 +417,18 @@ where buf.shrink_to_fit(); } + let mut num_buffered = 0; + loop { + // Whenever we may have completely filled a substream + // buffer of another substream while waiting for the + // next frame for `id`, yield to give the current task + // a chance to read from the other substream(s). + if num_buffered == self.config.max_buffer_len { + cx.waker().clone().wake(); + return Poll::Pending + } + // Check if the targeted substream (if any) reached EOF. if !self.can_read(&id) { // Note: Contrary to what is recommended by the spec, we must @@ -427,6 +449,7 @@ where // currently being polled, so it needs to be buffered and // the interested tasks notified. self.buffer(stream_id.into_local(), data)?; + num_buffered += 1; } frame @ Frame::Open { .. } => { if let Some(id) = self.on_open(frame.remote_id())? { @@ -1106,15 +1129,30 @@ mod tests { let id = LocalStreamId::listener(0); match m.poll_next_stream(cx) { Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r), - Poll::Pending => {} + Poll::Pending => { + // We expect the implementation to yield when the buffer + // is full but before it is exceeded and the max buffer + // behaviour takes effect, giving the current task a + // chance to read from the buffer. Here we just read + // again to provoke the max buffer behaviour. + assert_eq!( + m.substreams.get_mut(&id).unwrap().recv_buf().len(), + cfg.max_buffer_len + ); + match m.poll_next_stream(cx) { + Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r), + Poll::Pending => { + // Expect the buffer for stream 0 to be exceeded, triggering + // the max. buffer behaviour. + assert_eq!( + m.substreams.get_mut(&id).unwrap().recv_buf().len(), + cfg.max_buffer_len + 1 + ); + } + } + } } - // Expect the buffer for stream 0 to be just 1 over the limit. - assert_eq!( - m.substreams.get_mut(&id).unwrap().recv_buf().len(), - cfg.max_buffer_len + 1 - ); - // Expect either a `Reset` to be sent or all reads to be // blocked `Pending`, depending on the `MaxBufferBehaviour`. match cfg.max_buffer_behaviour {