Skip to content

Commit

Permalink
Handle spurious wakeups (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibeckermayer authored Feb 28, 2024
1 parent e8d7458 commit 278a2b1
Showing 1 changed file with 50 additions and 16 deletions.
66 changes: 50 additions & 16 deletions content/tokio/tutorial/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,19 +248,19 @@ impl MiniTokio {
tasks: VecDeque::new(),
}
}

/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}

fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);

while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
Expand Down Expand Up @@ -453,22 +453,33 @@ Wakers are `Sync` and can be cloned. When `wake` is called, the task must be
scheduled for execution. To implement this, we have a channel. When the `wake()`
is called on the waker, the task is pushed into the send half of the channel.
Our `Task` structure will implement the wake logic. To do this, it needs to
contain both the spawned future and the channel send half.
contain both the spawned future and the channel send half. We place the future
in a `TaskFuture` struct alongside a `Poll` enum to keep track of the latest
`Future::poll()` result, which is needed to handle spurious wake-ups. More
details are given in the implementation of the `poll()` method in `TaskFuture`.

```rust
# use std::future::Future;
# use std::pin::Pin;
# use std::sync::mpsc;
# use std::task::Poll;
use std::sync::{Arc, Mutex};

/// A structure holding a future and the result of
/// the latest call to its `poll` method.
struct TaskFuture {
future: Pin<Box<dyn Future<Output = ()> + Send>>,
poll: Poll<()>,
}

struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// one thread accesses `task_future` at any given time.
// The `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
task_future: Mutex<TaskFuture>,
executor: mpsc::Sender<Arc<Task>>,
}

Expand Down Expand Up @@ -520,13 +531,17 @@ channel. Next, we implement receiving and executing the tasks in the
# use std::future::Future;
# use std::pin::Pin;
# use std::sync::{Arc, Mutex};
# use std::task::{Context};
# use std::task::{Context, Poll};
# struct MiniTokio {
# scheduled: mpsc::Receiver<Arc<Task>>,
# sender: mpsc::Sender<Arc<Task>>,
# }
# struct TaskFuture {
# future: Pin<Box<dyn Future<Output = ()> + Send>>,
# poll: Poll<()>,
# }
# struct Task {
# future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
# task_future: Mutex<TaskFuture>,
# executor: mpsc::Sender<Arc<Task>>,
# }
# impl ArcWake for Task {
Expand Down Expand Up @@ -558,18 +573,38 @@ impl MiniTokio {
}
}

impl TaskFuture {
fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
TaskFuture {
future: Box::pin(future),
poll: Poll::Pending,
}
}

fn poll(&mut self, cx: &mut Context<'_>) {
// Spurious wake-ups are allowed, even after a future has
// returned `Ready`. However, polling a future which has
// already returned `Ready` is *not* allowed. For this
// reason we need to check that the future is still pending
// before we call it. Failure to do so can lead to a panic.
if self.poll.is_pending() {
self.poll = self.future.as_mut().poll(cx);
}
}
}

impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);

// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// No other thread ever tries to lock the task_future
let mut task_future = self.task_future.try_lock().unwrap();

// Poll the future
let _ = future.as_mut().poll(&mut cx);
// Poll the inner future
task_future.poll(&mut cx);
}

// Spawns a new task with the given future.
Expand All @@ -582,13 +617,12 @@ impl Task {
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
task_future: Mutex::new(TaskFuture::new(future)),
executor: sender.clone(),
});

let _ = sender.send(task);
}

}
```

Expand Down Expand Up @@ -638,7 +672,7 @@ use std::pin::Pin;
# type Output = ();
# fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
# Poll::Pending
# }
# }
# }

#[tokio::main]
Expand Down

0 comments on commit 278a2b1

Please sign in to comment.