Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams can prevent any other futures from being scheduled again #207

Closed
sdroege opened this issue Mar 9, 2018 · 5 comments
Closed

Streams can prevent any other futures from being scheduled again #207

sdroege opened this issue Mar 9, 2018 · 5 comments

Comments

@sdroege
Copy link
Contributor

sdroege commented Mar 9, 2018

Please see code below for a contrived example of the problem.

The problem here is that a single stream that continues to produce items will block a whole thread of the executor forever, instead of allowing other scheduled futures to be handled after an item was produced. Thus basically causing starvation. This happens because Stream::for_each is basically an infinite loop as long as items can be produced, going out of the loop with NotReady after each item (how to wake up the future afterwards best?) would solve this problem.

In practice this can cause e.g. a couple of fast TCP connections that are handled with for_each to starve any slower TCP connections, or in my specific use case of a Stream around an UdpSocket it allows any one of the sockets to completely occupy a thread (as long as packets only arrive fast enough) and prevent any other sockets with slower packet rate to be ever scheduled again. Note that fast/slow here is relative, and related to the processing time of each stream item and how fast new items arrive.

Is this expected behaviour and one is expected to implement a custom "scheduler" around e.g. Stream::into_future to do round-robin scheduling of all "equal" streams?

extern crate futures;
extern crate tokio;
extern crate tokio_reactor;

use futures::stream;
use futures::{Future, Stream};
use tokio::executor::thread_pool;
use tokio::reactor;

fn main() {
    let reactor = reactor::Reactor::new().unwrap().background().unwrap();

    let handle = reactor.handle().clone();

    let mut pool_builder = thread_pool::Builder::new();
    pool_builder.around_worker(move |w, enter| {
        ::tokio_reactor::with_default(&handle, enter, |_| {
            w.run();
        });
    });

    // Important to have 1 thread here, otherwise
    // both streams would just block two threads
    // forever.
    pool_builder.pool_size(1);
    let pool = pool_builder.build();

    pool.spawn(stream::repeat(1).for_each(|n| {
        println!("{}", n);

        Ok(())
    }));

    pool.spawn(stream::repeat(2).for_each(|n| {
        println!("{}", n);

        Ok(())
    }));

    pool.shutdown_on_idle()
        .and_then(|_| reactor.shutdown_on_idle())
        .wait()
        .unwrap();
}
@sdroege
Copy link
Contributor Author

sdroege commented Mar 9, 2018

This should maybe directly go to futures instead? Nothing tokio specific in here except for the reactor/executor

@sdroege
Copy link
Contributor Author

sdroege commented Mar 9, 2018

It might be a workaround for now to notify the current task and return NotReady on every poll directly after an item was produced. Assuming that the executor puts the future at the end of the list of futures to poll then. That does not seem like a nice solution though

@sdroege
Copy link
Contributor Author

sdroege commented Mar 9, 2018

Which could also be done from the for_each() closure, and be abstracted into some kind of funny YieldOnce Future like below. That works around the problem. However the default behaviour here seems like a potential footgun that people will only notice once it's too late.

extern crate futures;
extern crate tokio;
extern crate tokio_reactor;

use futures::stream;
use futures::task;
use futures::{Future, Stream, Poll, Async};
use tokio::executor::thread_pool;
use tokio::reactor;

struct YieldOnce(Option<()>);

impl YieldOnce {
    fn new() -> Self {
        YieldOnce(None)
    }
}

impl Future for YieldOnce {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        if let Some(_) = self.0.take() {
            Ok(Async::Ready(()))
        } else {
            self.0 = Some(());
            task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let reactor = reactor::Reactor::new().unwrap().background().unwrap();

    let handle = reactor.handle().clone();

    let mut pool_builder = thread_pool::Builder::new();
    pool_builder.around_worker(move |w, enter| {
        ::tokio_reactor::with_default(&handle, enter, |_| {
            w.run();
        });
    });

    // Important to have 1 thread here, otherwise
    // both streams would just block two threads
    // forever.
    pool_builder.pool_size(1);
    let pool = pool_builder.build();

    pool.spawn(stream::repeat(1).for_each(|n| {
        println!("{}", n);

        task::current().notify();

        YieldOnce::new()
    }));

    pool.spawn(stream::repeat(2).for_each(|n| {
        println!("{}", n);

        task::current().notify();

        YieldOnce::new()
    }));

    pool.shutdown_on_idle()
        .and_then(|_| reactor.shutdown_on_idle())
        .wait()
        .unwrap();
}

@carllerche
Copy link
Member

Yes, I think this issue should be moved to the futures repo. There may also be an existing issue.

IMO, the combinators should handle yielding internally.

@sdroege
Copy link
Contributor Author

sdroege commented Mar 13, 2018

Ok, created a ticket there. Thanks!

@sdroege sdroege closed this as completed Mar 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants