Skip to content

Commit

Permalink
[mplex] Tweak default config and yield before exceeding buffer limits. (
Browse files Browse the repository at this point in the history
#1825)

* [mplex] Tweak default config and yield before exceeding buffer limits.

* Update CHANGELOG
  • Loading branch information
romanb authored Nov 9, 2020
1 parent cc588ec commit 3859116
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 15 deletions.
8 changes: 8 additions & 0 deletions muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# 0.24.0 [unreleased]

- Change the default configuration to use `MaxBufferBehaviour::Block`
and yield from waiting for the next substream or reading from a
particular substream whenever the current read loop may have
already filled a substream buffer, to give the current task a
chance to read from the buffer(s) before the `MaxBufferBehaviour`
takes effect. This is primarily relevant for
`MaxBufferBehaviour::ResetStream`.

- Tweak the naming in the `MplexConfig` API for better
consistency with `libp2p-yamux`.
[PR 1822](https://github.com/libp2p/rust-libp2p/pull/1822).
Expand Down
27 changes: 19 additions & 8 deletions muxers/mplex/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,26 @@ impl MplexConfig {
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
/// Reset the substream whose frame buffer overflowed.
///
/// > **Note**: If more than [`MplexConfig::set_max_buffer_size()`] frames
/// > are received in succession for a substream in the context of
/// > trying to read data from a different substream, the former substream
/// > may be reset before application code had a chance to read from the
/// > buffer. The max. buffer size needs to be sized appropriately when
/// > using this option to balance maximum resource usage and the
/// > probability of premature termination of a substream.
ResetStream,
/// No new message can be read from any substream as long as the buffer
/// for a single substream is full.
/// No new message can be read from the underlying connection from any
/// substream as long as the buffer for a single substream is full,
/// i.e. application code is expected to read from the full buffer.
///
/// This can potentially introduce a deadlock if you are waiting for a
/// message from a substream before processing the messages received
/// on another substream, i.e. if there are data dependencies across
/// substreams.
/// > **Note**: To avoid blocking without making progress, application
/// > tasks should ensure that, when woken, always try to read (i.e.
/// > make progress) from every substream on which data is expected.
/// > This is imperative in general, as a woken task never knows for
/// > which substream it has been woken, but failure to do so with
/// > [`MaxBufferBehaviour::Block`] in particular may lead to stalled
/// > execution or spinning of a task without progress.
Block,
}

Expand All @@ -106,9 +118,8 @@ impl Default for MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 32,
max_buffer_behaviour: MaxBufferBehaviour::ResetStream,
max_buffer_behaviour: MaxBufferBehaviour::Block,
split_send_size: 1024,
}
}
}

52 changes: 45 additions & 7 deletions muxers/mplex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,18 @@ where
}

debug_assert!(self.open_buffer.is_empty());
let mut num_buffered = 0;

loop {
// Whenever we may have completely filled a substream
// buffer while waiting for the next inbound stream,
// yield to give the current task a chance to read
// from the respective substreams.
if num_buffered == self.config.max_buffer_len {
cx.waker().clone().wake();
return Poll::Pending
}

// Wait for the next inbound `Open` frame.
match ready!(self.poll_read_frame(cx, None))? {
Frame::Open { stream_id } => {
Expand All @@ -225,6 +235,7 @@ where
}
Frame::Data { stream_id, data } => {
self.buffer(stream_id.into_local(), data)?;
num_buffered += 1;
}
Frame::Close { stream_id } => {
self.on_close(stream_id.into_local())?;
Expand Down Expand Up @@ -406,7 +417,18 @@ where
buf.shrink_to_fit();
}

let mut num_buffered = 0;

loop {
// Whenever we may have completely filled a substream
// buffer of another substream while waiting for the
// next frame for `id`, yield to give the current task
// a chance to read from the other substream(s).
if num_buffered == self.config.max_buffer_len {
cx.waker().clone().wake();
return Poll::Pending
}

// Check if the targeted substream (if any) reached EOF.
if !self.can_read(&id) {
// Note: Contrary to what is recommended by the spec, we must
Expand All @@ -427,6 +449,7 @@ where
// currently being polled, so it needs to be buffered and
// the interested tasks notified.
self.buffer(stream_id.into_local(), data)?;
num_buffered += 1;
}
frame @ Frame::Open { .. } => {
if let Some(id) = self.on_open(frame.remote_id())? {
Expand Down Expand Up @@ -1106,15 +1129,30 @@ mod tests {
let id = LocalStreamId::listener(0);
match m.poll_next_stream(cx) {
Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
Poll::Pending => {}
Poll::Pending => {
// We expect the implementation to yield when the buffer
// is full but before it is exceeded and the max buffer
// behaviour takes effect, giving the current task a
// chance to read from the buffer. Here we just read
// again to provoke the max buffer behaviour.
assert_eq!(
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
cfg.max_buffer_len
);
match m.poll_next_stream(cx) {
Poll::Ready(r) => panic!("Unexpected result for next stream: {:?}", r),
Poll::Pending => {
// Expect the buffer for stream 0 to be exceeded, triggering
// the max. buffer behaviour.
assert_eq!(
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
cfg.max_buffer_len + 1
);
}
}
}
}

// Expect the buffer for stream 0 to be just 1 over the limit.
assert_eq!(
m.substreams.get_mut(&id).unwrap().recv_buf().len(),
cfg.max_buffer_len + 1
);

// Expect either a `Reset` to be sent or all reads to be
// blocked `Pending`, depending on the `MaxBufferBehaviour`.
match cfg.max_buffer_behaviour {
Expand Down

0 comments on commit 3859116

Please sign in to comment.