Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to 0.1: Avoid starvation from FuturesUnordered::poll_next #2122

Merged
merged 1 commit into from
Apr 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@ use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// An unbounded set of futures.
///
/// This "combinator" also serves a special function in this library, providing
Expand Down Expand Up @@ -274,6 +291,10 @@ impl<T> Stream for FuturesUnordered<T>
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;

// Ensure `parent` is correctly set.
self.inner.parent.register();

Expand Down Expand Up @@ -369,12 +390,21 @@ impl<T> Stream for FuturesUnordered<T>
future.poll()
})
};
polled += 1;

let ret = match res {
Ok(Async::NotReady) => {
let node = bomb.node.take().unwrap();
*node.future.get() = Some(future);
bomb.queue.link(node);

if polled == YIELD_EVERY {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
task_impl::current().notify();
return Ok(Async::NotReady);
}
continue
}
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
Expand Down