Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

muxers/yamux: Mitigation of unnecessary stream drops #3071

Merged
merged 4 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

- Update to `libp2p-core` `v0.38.0`.

# 0.41.1

- Yield from `StreamMuxer::poll` as soon as we receive a single substream.
This fixes [issue 3041].
See [PR 3071].

[PR 3071]: https://github.com/libp2p/rust-libp2p/pull/3071/
[issue 3041]: https://github.com/libp2p/rust-libp2p/issues/3041/

# 0.41.0

- Update to `libp2p-core` `v0.37.0`.
Expand Down
2 changes: 1 addition & 1 deletion muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ log = "0.4"
async-std = { version = "1.7.0", features = ["attributes"] }
libp2p-muxer-test-harness = { path = "../test-harness" }

# Passing arguments to the docsrs builder in order to properly document cfg's.
# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs]
all-features = true
Expand Down
28 changes: 20 additions & 8 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::{
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque;
use std::task::Waker;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand All @@ -55,6 +56,8 @@ pub struct Yamux<S> {
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
/// Once the buffer is full, new inbound streams are dropped.
inbound_stream_buffer: VecDeque<yamux::Stream>,
/// Waker to be called when new inbound streams are available.
inbound_stream_waker: Option<Waker>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;
Expand All @@ -81,6 +84,7 @@ where
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
}
}
}
Expand All @@ -101,6 +105,7 @@ where
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
}
}
}
Expand All @@ -122,6 +127,8 @@ where
return Poll::Ready(Ok(stream));
}

self.inbound_stream_waker = Some(cx.waker().clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll contract says that a waker needs to be registered in case Poll::Pending is returned. While perhaps this unconditional registration may be legal, I think it still violates expectations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This registration is not unconditional. It is conditional on the list being empty! The list no longer being empty is IMO a valid reason to wake the task that last polled here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, are you saying that self.poll_inner will return Poll::Pending? That would be fine, then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH: even if the inner stream does return Poll::Pending, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?

In general, the Rust async rules are thus: if you create a Poll::Pending, then you’re responsible for waking a waker. Otherwise you just pass along poll results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, are you saying that self.poll_inner will return Poll::Pending? That would be fine, then.

Not quite.

This poll function essentially should we polled again in two different scenarios:

  1. If the general poll-function pushed a new stream to the buffer.
  2. If the socket has more bytes to read (that is poll_inner)

Registering this waker here isn't strictly necessary because we poll the StreamMuxer in swarm::Connection in a loop anyway. But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.

OTOH: even if the inner stream does return Poll::Pending, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?

Like I said, it probably works without too because we will always be implicitly woken again, even if the task that calls the general poll consumes everything from the inner socket already and pushed them to the buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.

I agree with this. I don't think the inner working of impl StreamMuxer for Yamux should make assumptions on how it is called in Connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making things fail due to a missed wake-up would require polling this task through Arc<Mutex<...>>, which I hope nobody would ever consider doing, so I agree that this is harmless.

This discussion is yet another spotlight on how difficult it is to correctly reason about poll functions. This is exacerbated within libp2p by extending the concept — like is being done here — in non-trivial ways. If there actually was some surrounding task that did care about being woken for two different reasons, it would have to manufacture its own wakers (like FuturesUnordered does). But even that usage would be broken because the Waker destined for poll_inbound may be passed to poll_inner as well, overwriting a previously registered Waker that was destined for that purpose.

While debugging wake-up loops is painful, it is orders of magnitude easier than debugging missed wake-ups. May I suggest that we adhere to the policy that each async object offers exactly one poll function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not have poll semantics, because that can lead to confusing behaviour and bugs.

The following pattern is what I suggest:

if let Poll::Ready(x) = connection.poll(cx) {
    ...
    return x;
}
for sub in connection.inbound_streams.drain(..) {
    ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest that we adhere to the policy that each async object offers exactly one poll function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not have poll semantics, because that can lead to confusing behaviour and bugs.

The design of this trait was inspired by the AsyncWrite design which also has multiple poll_ functions that users need to drive. Sink is similar.

One problem with a single "poll" function design is that it puts more burden on implementations. For example, opening a new stream is not instantaneous, it may require negotiation of new credits with the other party. As such, a "new_outbound" function can only give you an ID or some other kind of handle for a new stream. This means every implementation needs to implement some kind of "stream ID" management. In contrast to that a poll_new_outbound function can just return Pending until the new stream is ready to be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never found Sink usable for anything I had to do, so its design carries little weight in my opinion.

Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on. The example of a “new_outbound” function boils down to the choice of polling the machinery until the new connection is ready, ignoring everything else that happens in the meantime. This already requires the machinery to aggregate its output and let the poller inspect it, for which there is no reason a priori to offer a poll-shaped API. In particular, no Context is necessary to ask whether events have been emitted, which removes one prolific source of confusion inherent to Rust’s Task design.

So my solution to the new_outbound problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my solution to the new_outbound problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.

This is basically the poll_ready& start_send API of Sink then, yes?

Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on.

poll_outbound is not the only issue. The problem of having to buffer streams is mostly because yamux doesn't allow us to backpressure the number of streams. The QUIC muxer on the other hand allows us to make progress on the connection itself without necessarily accepting new inbound streams. I am happy to change the API for something better but so far I've not found a solution where the caller (swarm::Connection) can explicitly signal to the muxer that it is now able to take more inbound streams.

We could move away from poll_inbound by having just a pop_inbound function. This however then requires more documentation on when the caller should call this function again if it ever returns None. At that stage, we are just re-inventing the wheel when we could also be using Poll and automatically wake the task when we know that there are new inbound streams available.


self.poll_inner(cx)
}

Expand All @@ -140,17 +147,22 @@ where
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
let this = self.get_mut();

loop {
let inbound_stream = ready!(this.poll_inner(cx))?;

if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full");
drop(inbound_stream);
continue;
}
let inbound_stream = ready!(this.poll_inner(cx))?;

if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full");
drop(inbound_stream);
} else {
this.inbound_stream_buffer.push_back(inbound_stream);

if let Some(waker) = this.inbound_stream_waker.take() {
waker.wake()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discussion above makes me think: is this muxer polled from multiple tasks? If it is, then poll_inner will probably switch out wakers all the time, making it non-deterministic which caller will eventually be woken. If not, then this extra wakeup makes little sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in rust-libp2p production code, it is only polled from a single task but it is a public interface so there may be other consumers, #2952 for example.

}
}

// Schedule an immediate wake-up, allowing other code to run.
cx.waker().wake_by_ref();
Poll::Pending
}

fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<YamuxResult<()>> {
Expand Down