Skip to content

Commit

Permalink
Manually pin Sleep futures (#2914)
Browse files Browse the repository at this point in the history
* Wrap `Sleep` timer in a `Pin<Box<_>>`

The `Sleep` type doesn't implement `Unpin` in newer versions of Tokio.

* Wrap `Sleep` type in a `Pin<Box<_>>`

In newer Tokio versions the `Sleep` type doesn't implement `Unpin`, so
it needs to be manually pinned.
  • Loading branch information
jvff authored Oct 22, 2021
1 parent ae6c90f commit 2a1d428
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
13 changes: 6 additions & 7 deletions tower-batch/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ where
// submitted, so that the batch latency of all entries is at most
// self.max_latency. However, we don't keep the timer running unless
// there is a pending request to prevent wakeups on idle services.
let mut timer: Option<Sleep> = None;
let mut timer: Option<Pin<Box<Sleep>>> = None;
let mut pending_items = 0usize;
loop {
match timer {
match timer.as_mut() {
None => match self.rx.next().await {
// The first message in a new batch.
Some(msg) => {
Expand All @@ -135,13 +135,13 @@ where
// Apply the provided span to request processing
.instrument(span)
.await;
timer = Some(sleep(self.max_latency));
timer = Some(Box::pin(sleep(self.max_latency)));
pending_items = 1;
}
// No more messages, ever.
None => return,
},
Some(mut sleep) => {
Some(sleep) => {
// Wait on either a new message or the batch timer.
// If both are ready, select! chooses one of them at random.
tokio::select! {
Expand All @@ -161,16 +161,15 @@ where
timer = None;
pending_items = 0;
} else {
// The timer is still running, set it back!
timer = Some(sleep);
// The timer is still running.
}
}
None => {
// No more messages, ever.
return;
}
},
() = &mut sleep => {
() = sleep => {
// The batch timer elapsed.
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
Expand Down
8 changes: 4 additions & 4 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, pin::Pin, sync::Arc};

use futures::{
future::{self, Either},
Expand Down Expand Up @@ -320,7 +320,7 @@ pub struct Connection<S, Tx> {
/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Sleep>,
pub(super) request_timer: Option<Pin<Box<Sleep>>>,

/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,
Expand Down Expand Up @@ -780,11 +780,11 @@ where
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.state = AwaitingRequest;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
}
Ok((new_state @ AwaitingResponse { .. }, None)) => {
self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
}
Err((e, tx)) => {
let e = SharedPeerError::from(e);
Expand Down

0 comments on commit 2a1d428

Please sign in to comment.