Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_Async in depth_ readability improvement #812

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 83 additions & 101 deletions content/tokio/tutorial/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -453,34 +453,27 @@ Wakers are `Sync` and can be cloned. When `wake` is called, the task must be
scheduled for execution. To implement this, we have a channel. When the `wake()`
is called on the waker, the task is pushed into the send half of the channel.
Our `Task` structure will implement the wake logic. To do this, it needs to
contain both the spawned future and the channel send half. We place the future
in a `TaskFuture` struct alongside a `Poll` enum to keep track of the latest
`Future::poll()` result, which is needed to handle spurious wake-ups. More
details are given in the implementation of the `poll()` method in `TaskFuture`.
contain both the spawned future and the channel send half. We track if a
`Future::poll()` output was `Ready` to handle spurious wake-ups. More
details are given in the implementation of the `poll()` method.

```rust
# use std::future::Future;
# use std::pin::Pin;
# use std::sync::mpsc;
# use std::task::Poll;
# use std::sync::{mpsc, RwLock};
use std::sync::{Arc, Mutex};

/// A structure holding a future and the result of
/// the latest call to its `poll` method.
struct TaskFuture {
future: Pin<Box<dyn Future<Output = ()> + Send>>,
poll: Poll<()>,
}

struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `task_future` at any given time.
// The `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
task_future: Mutex<TaskFuture>,
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: mpsc::Sender<Arc<Task>>,
// Must be only switched to `Ready` and never back.
is_done: RwLock<bool>
}

impl Task {
Expand Down Expand Up @@ -529,20 +522,18 @@ channel. Next, we implement receiving and executing the tasks in the
# use std::sync::mpsc;
# use futures::task::{self, ArcWake};
# use std::future::Future;
# use std::ops::Deref;
# use std::pin::Pin;
# use std::sync::{Arc, Mutex};
# use std::sync::{Arc, Mutex, RwLock};
# use std::task::{Context, Poll};
# struct MiniTokio {
# scheduled: mpsc::Receiver<Arc<Task>>,
# sender: mpsc::Sender<Arc<Task>>,
# }
# struct TaskFuture {
# future: Pin<Box<dyn Future<Output = ()> + Send>>,
# poll: Poll<()>,
# }
# struct Task {
# task_future: Mutex<TaskFuture>,
# future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
# executor: mpsc::Sender<Arc<Task>>,
# is_done: RwLock<bool>
# }
# impl ArcWake for Task {
# fn wake_by_ref(arc_self: &Arc<Self>) {}
Expand Down Expand Up @@ -573,38 +564,28 @@ impl MiniTokio {
}
}

impl TaskFuture {
fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
TaskFuture {
future: Box::pin(future),
poll: Poll::Pending,
}
}

fn poll(&mut self, cx: &mut Context<'_>) {
// Spurious wake-ups are allowed, even after a future has
// returned `Ready`. However, polling a future which has
// already returned `Ready` is *not* allowed. For this
// reason we need to check that the future is still pending
// before we call it. Failure to do so can lead to a panic.
if self.poll.is_pending() {
self.poll = self.future.as_mut().poll(cx);
}
}
}

impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);

// No other thread ever tries to lock the task_future
let mut task_future = self.task_future.try_lock().unwrap();

// Poll the inner future
task_future.poll(&mut cx);
// Spurious wake-ups are allowed, even after a future has
// returned `Ready`. However, polling a future which has
// already returned `Ready` is **not** allowed. For this
// reason we need to check that the future is still pending
// before we call it. Failure to do so can lead to a panic.
if !self.is_done.read().unwrap().deref() {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);

// No other thread ever tries to lock the task_future
let mut future = self.future.try_lock().unwrap();

// Poll the inner future and if `Ready` save that it's done.
let poll_status = future.as_mut().poll(&mut cx);
if poll_status == Poll::Ready(()) {
*self.is_done.write().unwrap() = true;
}
}
}

// Spawns a new task with the given future.
Expand All @@ -617,8 +598,9 @@ impl Task {
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
task_future: Mutex::new(TaskFuture::new(future)),
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
is_done: RwLock::new(false)
});

let _ = sender.send(task);
Expand Down Expand Up @@ -735,59 +717,59 @@ impl Future for Delay {
// Check the current instant. If the duration has elapsed, then
// this future has completed so we return `Poll::Ready`.
if Instant::now() >= self.when {
return Poll::Ready(());
}

// The duration has not elapsed. If this is the first time the future
// is called, spawn the timer thread. If the timer thread is already
// running, ensure the stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();

// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
Poll::Ready(())
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());

// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
// The duration has not elapsed. If this is the first time the future
// is called, spawn the timer thread. If the timer thread is already
// running, ensure the stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();

// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}

// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());

// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}

// By now, the waker is stored and the timer thread is started.
// The duration has not elapsed (recall that we checked for this
// first thing), ergo the future has not completed so we must
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}

// By now, the waker is stored and the timer thread is started.
// The duration has not elapsed (recall that we checked for this
// first thing), ergo the future has not completed so we must
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
```
Expand Down
Loading