diff --git a/content/tokio/tutorial/async.md b/content/tokio/tutorial/async.md index b4d05281..57216bab 100644 --- a/content/tokio/tutorial/async.md +++ b/content/tokio/tutorial/async.md @@ -248,7 +248,7 @@ impl MiniTokio { tasks: VecDeque::new(), } } - + /// Spawn a future onto the mini-tokio instance. fn spawn(&mut self, future: F) where @@ -256,11 +256,11 @@ impl MiniTokio { { self.tasks.push_back(Box::pin(future)); } - + fn run(&mut self) { let waker = task::noop_waker(); let mut cx = Context::from_waker(&waker); - + while let Some(mut task) = self.tasks.pop_front() { if task.as_mut().poll(&mut cx).is_pending() { self.tasks.push_back(task); @@ -453,22 +453,33 @@ Wakers are `Sync` and can be cloned. When `wake` is called, the task must be scheduled for execution. To implement this, we have a channel. When the `wake()` is called on the waker, the task is pushed into the send half of the channel. Our `Task` structure will implement the wake logic. To do this, it needs to -contain both the spawned future and the channel send half. +contain both the spawned future and the channel send half. We place the future +in a `TaskFuture` struct alongside a `Poll` enum to keep track of the latest +`Future::poll()` result, which is needed to handle spurious wake-ups. More +details are given in the implementation of the `poll()` method in `TaskFuture`. ```rust # use std::future::Future; # use std::pin::Pin; # use std::sync::mpsc; +# use std::task::Poll; use std::sync::{Arc, Mutex}; +/// A structure holding a future and the result of +/// the latest call to its `poll` method. +struct TaskFuture { + future: Pin + Send>>, + poll: Poll<()>, +} + struct Task { // The `Mutex` is to make `Task` implement `Sync`. Only - // one thread accesses `future` at any given time. The - // `Mutex` is not required for correctness. Real Tokio + // one thread accesses `task_future` at any given time. + // The `Mutex` is not required for correctness. Real Tokio // does not use a mutex here, but real Tokio has // more lines of code than can fit in a single tutorial // page. - future: Mutex + Send>>>, + task_future: Mutex, executor: mpsc::Sender>, } @@ -520,13 +531,17 @@ channel. Next, we implement receiving and executing the tasks in the # use std::future::Future; # use std::pin::Pin; # use std::sync::{Arc, Mutex}; -# use std::task::{Context}; +# use std::task::{Context, Poll}; # struct MiniTokio { # scheduled: mpsc::Receiver>, # sender: mpsc::Sender>, # } +# struct TaskFuture { +# future: Pin + Send>>, +# poll: Poll<()>, +# } # struct Task { -# future: Mutex + Send>>>, +# task_future: Mutex, # executor: mpsc::Sender>, # } # impl ArcWake for Task { @@ -558,6 +573,26 @@ impl MiniTokio { } } +impl TaskFuture { + fn new(future: impl Future + Send + 'static) -> TaskFuture { + TaskFuture { + future: Box::pin(future), + poll: Poll::Pending, + } + } + + fn poll(&mut self, cx: &mut Context<'_>) { + // Spurious wake-ups are allowed, even after a future has + // returned `Ready`. However, polling a future which has + // already returned `Ready` is *not* allowed. For this + // reason we need to check that the future is still pending + // before we call it. Failure to do so can lead to a panic. + if self.poll.is_pending() { + self.poll = self.future.as_mut().poll(cx); + } + } +} + impl Task { fn poll(self: Arc) { // Create a waker from the `Task` instance. This @@ -565,11 +600,11 @@ impl Task { let waker = task::waker(self.clone()); let mut cx = Context::from_waker(&waker); - // No other thread ever tries to lock the future - let mut future = self.future.try_lock().unwrap(); + // No other thread ever tries to lock the task_future + let mut task_future = self.task_future.try_lock().unwrap(); - // Poll the future - let _ = future.as_mut().poll(&mut cx); + // Poll the inner future + task_future.poll(&mut cx); } // Spawns a new task with the given future. @@ -582,13 +617,12 @@ impl Task { F: Future + Send + 'static, { let task = Arc::new(Task { - future: Mutex::new(Box::pin(future)), + task_future: Mutex::new(TaskFuture::new(future)), executor: sender.clone(), }); let _ = sender.send(task); } - } ``` @@ -638,7 +672,7 @@ use std::pin::Pin; # type Output = (); # fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { # Poll::Pending -# } +# } # } #[tokio::main]