Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document correctness of select! and Poll::Pending #1954

Merged
merged 2 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tower-batch/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// This loop ensures that the task is scheduled as required, because it
// only returns Pending when another future returns Pending.
loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed(e) => {
Expand Down
19 changes: 13 additions & 6 deletions tower-batch/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,19 @@ where
return Poll::Ready(Err(self.get_worker_error()));
}

// Then, poll to acquire a semaphore permit. If we acquire a permit,
// then there's enough buffer capacity to send a new request. Otherwise,
// we need to wait for capacity.

// In tokio 0.3.7, `acquire_owned` panics if its semaphore returns an error,
// so we don't need to handle errors until we upgrade to tokio 1.0.
// CORRECTNESS
//
// Poll to acquire a semaphore permit. If we acquire a permit, then
// there's enough buffer capacity to send a new request. Otherwise, we
// need to wait for capacity.
//
// In tokio 0.3.7, `acquire_owned` panics if its semaphore returns an
// error, so we don't need to handle errors until we upgrade to
// tokio 1.0.
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`. If it returns Pending, the semaphore also schedules
// the task for wakeup when the next permit is available.
ready!(self.semaphore.poll_acquire(cx));

Poll::Ready(Ok(()))
Expand Down
1 change: 1 addition & 0 deletions tower-batch/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ where
},
Some(mut sleep) => {
// Wait on either a new message or the batch timer.
// If both are ready, select! chooses one of them at random.
tokio::select! {
maybe_msg = self.rx.recv() => match maybe_msg {
Some(msg) => {
Expand Down
7 changes: 7 additions & 0 deletions tower-fallback/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// This loop ensures that the task is scheduled as required, because it
// only returns Pending when a future or service returns Pending.
loop {
match this.state.as_mut().project() {
ResponseStateProj::PollResponse1 { fut, .. } => match ready!(fut.poll(cx)) {
Expand Down
8 changes: 7 additions & 1 deletion zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Correctness:
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// If either verifier is unready, this task is scheduled for wakeup when it becomes
// ready.
//
// We acquire checkpoint readiness before block readiness, to avoid an unlikely
// hang during the checkpoint to block verifier transition. If the checkpoint and
Expand Down
21 changes: 20 additions & 1 deletion zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ impl Stream for ClientRequestReceiver {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(client_request) => Poll::Ready(client_request.map(Into::into)),
// `inner.poll_next_unpin` parks the task for this future
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we
// return `Poll::Pending`.
//
// inner.poll_next_unpin` schedules this task for wakeup when
// there are new items available in the inner stream.
Poll::Pending => Poll::Pending,
}
}
Expand Down Expand Up @@ -198,6 +204,19 @@ impl Service<Request> for Client {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
//`ready!` returns `Poll::Pending` when `server_tx` is unready, and
// schedules this task for wakeup.
//
// Since `shutdown_tx` is used for oneshot communication to the heartbeat
// task, it will never be `Pending`.
//
// TODO: should the Client exit if the heartbeat task exits and drops
// `shutdown_tx`?
if ready!(self.server_tx.poll_ready(cx)).is_err() {
Poll::Ready(Err(self
.error_slot
Expand Down
19 changes: 19 additions & 0 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,25 @@ where
if self.preselected_p2c_index.is_none() {
trace!("no ready services, sending demand signal");
let _ = self.demand_signal.try_send(());
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we
// return `Poll::Pending`.
//
// As long as there are unready or new peers, this task will run,
// because:
// - `poll_discover` schedules this task for wakeup when new
// peers arrive.
// - if there are unready peers, `poll_unready` schedules this
// task for wakeup when peer services become ready.
// - if the preselected peer is not ready, `service.poll_ready`
// schedules this task for wakeup when that service becomes
// ready.
//
// To avoid peers blocking on a full background error channel:
// - if no background tasks have exited since the last poll,
// `poll_background_errors` schedules this task for wakeup when
// the next task exits.
return Poll::Pending;
}
}
Expand Down
9 changes: 9 additions & 0 deletions zebra-network/src/peer_set/unready_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
return Poll::Ready(Err((key, Error::Canceled)));
}

// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
//`ready!` returns `Poll::Pending` when the service is unready, and
// schedules this task for wakeup.
//
// `cancel.poll` also schedules this task for wakeup if it is canceled.
let res = ready!(this
.service
.as_mut()
Expand Down
11 changes: 11 additions & 0 deletions zebrad/src/components/inbound/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// If no download and verify tasks have exited since the last poll, this
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO:
// This would be cleaner with poll_map #63514, but that's nightly only.
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("block download and verify tasks must not panic") {
Expand Down Expand Up @@ -223,6 +232,8 @@ where
.in_current_span();

let task = tokio::spawn(async move {
// TODO: if the verifier and cancel are both ready, which should we
// prefer? (Currently, select! chooses one at random.)
tokio::select! {
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to completion");
Expand Down
13 changes: 13 additions & 0 deletions zebrad/src/components/sync/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// If no download and verify tasks have exited since the last poll, this
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO:
// This would be cleaner with poll_map #63514, but that's nightly only.
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("block download and verify tasks must not panic") {
Expand Down Expand Up @@ -149,6 +158,8 @@ where
let mut verifier = self.verifier.clone();
let task = tokio::spawn(
async move {
// TODO: if the verifier and cancel are both ready, which should
// we prefer? (Currently, select! chooses one at random.)
let rsp = tokio::select! {
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to download completion");
Expand All @@ -169,6 +180,8 @@ where
metrics::counter!("sync.downloaded.block.count", 1);

let rsp = verifier.ready_and().await?.call(block);
// TODO: if the verifier and cancel are both ready, which should
// we prefer? (Currently, select! chooses one at random.)
let verification = tokio::select! {
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to verification");
Expand Down
3 changes: 3 additions & 0 deletions zebrad/src/components/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub(crate) trait RuntimeRun {
impl RuntimeRun for Runtime {
fn run(&mut self, fut: impl Future<Output = Result<(), Report>>) {
let result = self.block_on(async move {
// If the run task and shutdown are both ready, select! chooses
// one of them at random.
tokio::select! {
result = fut => result,
_ = shutdown() => Ok(()),
Expand All @@ -68,6 +70,7 @@ mod imp {
use tracing::info;

pub(super) async fn shutdown() {
// If both signals are received, select! chooses one of them at random.
tokio::select! {
// SIGINT - Terminal interrupt signal. Typically generated by shells in response to Ctrl-C.
() = sig(SignalKind::interrupt(), "SIGINT") => {}
Expand Down