Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: use intrusive list strategy for broadcast #2509

Merged
merged 12 commits into from
May 12, 2020
Merged

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented May 8, 2020

Previously, in the broadcast channel, receiver wakers were passed to the
sender via an atomic stack with allocated nodes. When a message was
sent, the stack was drained. This caused a problem when many receivers
pushed a waiter node then dropped. The waiter node remained indefinitely
in cases where no values were sent.

This patch switches broadcast to use the intrusive linked-list waiter
strategy used by Notify and `Semaphore.

Previously, in the broadcast channel, receiver wakers were passed to the
sender via an atomic stack with allocated nodes. When a message was
sent, the stack was drained. This caused a problem when many receivers
pushed a waiter node then dropped. The waiter node remained indefinitely
in cases where no values were sent.

This patch switches broadcast to use the intrusive linked-list waiter
strategy used by `Notify` and `Semaphore.
@carllerche carllerche requested review from Matthias247, hawkw and kleimkuhler and removed request for Matthias247 May 8, 2020 23:05
@carllerche
Copy link
Member Author

I still need to fix poll_recv and the stream implementation. This will be a tiny bit tricky!

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
let missed = next.wrapping_sub(self.next);
// `tail.pos` points to the slot that the **next** send writes to. If
// the channel is closed, the previous slot is the oldest value.
let mut adjust = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the comments I don't really understand how we get here. What is the condition?
There is no new value available yet, but the sender is closed? Then we should never need to wait?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm? Not sure I follow. You mean how do we get to the bit of code starting with let mut adjust = 0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically have no idea what scenario this part of the function is handling and why we need adjust/closed/etc 😀

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from before this change, but I will try to clarify the comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the comment. Does it make more sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Makes more sense now. However I'm wondering whether just going to the newest position wouldn't be easier and more helpful (if you already lagged it's likely it might happen again. And starting with the max amount of buffered elements might give you a bit more runaway time until it happens than starting at the boundary).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Receivers can't just be dropped. They have to iterate each remaining slot to decrement the remaining count. If the user wishes to get the latest, on error, they can drop the receiver and create a new one. This would be equivalent. I'm happy to revisit the details (i'm not entirely satisfied w/ the broadcast channel) but in 0.2 we must maintain behavior.

@carllerche
Copy link
Member Author

I added deprecated support for Receiver::poll_ref and impl Stream for Receiver.

I will next need to add Receiver::into_stream().

@Darksonn Darksonn added A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync labels May 9, 2020
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
@carllerche
Copy link
Member Author

I have verified that tokio-rs/mini-redis#38 is fixed w/ this PR.

Copy link
Contributor

@kleimkuhler kleimkuhler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good! It's unfortunate we need to keep the waiter field around on Receiver, but the comments help on why that is still necessary.

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
let missed = next.wrapping_sub(self.next);
// `tail.pos` points to the slot that the **next** send writes to. If
// the channel is closed, the previous slot is the oldest value.
let mut adjust = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Makes more sense now. However I'm wondering whether just going to the newest position wouldn't be easier and more helpful (if you already lagged it's likely it might happen again. And starting with the max amount of buffered elements might give you a bit more runaway time until it happens than starting at the boundary).

tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
@carllerche carllerche merged commit fb7dfcf into master May 12, 2020
@Darksonn Darksonn deleted the broadcast-mem-leak branch May 13, 2020 06:33
jensim pushed a commit to jensim/tokio that referenced this pull request Jun 7, 2020
Previously, in the broadcast channel, receiver wakers were passed to the
sender via an atomic stack with allocated nodes. When a message was
sent, the stack was drained. This caused a problem when many receivers
pushed a waiter node then dropped. The waiter node remained indefinitely
in cases where no values were sent.

This patch switches broadcast to use the intrusive linked-list waiter
strategy used by `Notify` and `Semaphore.
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Dec 4, 2020
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Dec 4, 2020
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

Finally, I've added some tests for the `linkerd2-channel` crate, based
on Tokio's tests for the MPSC channel, modified where the APIs differ.
This should help ensure we get similar behavior to what we expect from
Tokio's MPSCs.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants