Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix poll_ready usage in ChainVerifier #1700

Merged
merged 11 commits into from
Feb 8, 2021
84 changes: 54 additions & 30 deletions zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,44 @@ use zebra_state as zs;

use crate::{
block::BlockVerifier,
block::VerifyBlockError,
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
checkpoint::{CheckpointList, CheckpointVerifier},
BoxError, Config,
};

/// The bound for each verifier's buffer.
///
/// We choose the verifier buffer bound based on the maximum number of
/// concurrent verifier users, to avoid contention:
/// - the `ChainSync` component
/// - the `Inbound` service
/// - a miner component, which we might add in future, and
/// - 1 extra slot to avoid contention.
///
/// We deliberately add extra slots, because they only cost a small amount of
/// memory, but missing slots can significantly slow down Zebra.
const VERIFIER_BUFFER_BOUND: usize = 4;

/// The chain verifier routes requests to either the checkpoint verifier or the
/// block verifier, depending on the maximum checkpoint height.
struct ChainVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
block: BlockVerifier<S>,
checkpoint: CheckpointVerifier<S>,
// Normally, we erase the types on buffer-wrapped services.
// But if we did that here, the block and checkpoint services would be
// type-indistinguishable, risking future substitution errors.
block_verifier: Buffer<BlockVerifier<S>, Arc<block::Block>>,
checkpoint_verifier: Buffer<CheckpointVerifier<S>, Arc<block::Block>>,
max_checkpoint_height: block::Height,
}

#[derive(Debug, Display, Error)]
pub enum VerifyChainError {
/// block could not be checkpointed
Checkpoint(#[source] VerifyCheckpointError),
Checkpoint(#[source] BoxError),
/// block could not be verified
Block(#[source] VerifyBlockError),
Block(#[source] BoxError),
Comment on lines 59 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're using a Buffer, we get BoxErrors back from the checkpoint and block verifiers.

}

impl<S> Service<Arc<Block>> for ChainVerifier<S>
Expand All @@ -57,30 +72,34 @@ where
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match (self.checkpoint.poll_ready(cx), self.block.poll_ready(cx)) {
// First, fail if either service fails.
(Poll::Ready(Err(e)), _) => Poll::Ready(Err(VerifyChainError::Checkpoint(e))),
(_, Poll::Ready(Err(e))) => Poll::Ready(Err(VerifyChainError::Block(e))),
// Second, we're unready if either service is unready.
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
// Finally, we're ready if both services are ready and OK.
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
}
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Correctness:
//
// We can't call `poll_ready` on the block and checkpoint verifiers here,
// because each `poll_ready` must be followed by a `call`, and we don't
// know which verifier we're going to choose yet.
Comment on lines +78 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't call poll_ready here - it doesn't matter what we do with the results, it's the call that takes up a buffer slot.

Copy link
Contributor

@teor2345 teor2345 Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaahc - this constraint is something to keep in mind for future service code reviews.

If we call poll_ready, but there isn't a matching call, we fill up our Buffers with unused reservations, and Zebra hangs.

// See #1593 for details.
Poll::Ready(Ok(()))
}

fn call(&mut self, block: Arc<Block>) -> Self::Future {
match block.coinbase_height() {
// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has
// a matching `call`. See #1593 for details.
Some(height) if height <= self.max_checkpoint_height => self
.checkpoint
.call(block)
.checkpoint_verifier
.clone()
.oneshot(block)
.map_err(VerifyChainError::Checkpoint)
.boxed(),
// This also covers blocks with no height, which the block verifier
// will reject immediately.
_ => self
.block
.call(block)
.block_verifier
.clone()
.oneshot(block)
Comment on lines -75 to +102
Copy link
Contributor

@teor2345 teor2345 Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These services are now Buffered, so we don't need an async block. We can just call ServiceExt::oneshot on a cloned service, and return those futures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaahc @oxarbitrage the best way to avoid poll_ready / call mismatches is to just use a oneshot.

It handles the poll_ready and call internally, so we can't possibly get it wrong.

.map_err(VerifyChainError::Block)
.boxed(),
}
Expand All @@ -103,7 +122,7 @@ where
pub async fn init<S>(
config: Config,
network: Network,
mut state_service: S,
state_service: S,
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
Expand All @@ -118,11 +137,13 @@ where
.expect("hardcoded checkpoint list extends past sapling activation")
};

// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has a
// matching `call`. See #1593 for details.
let tip = match state_service
.ready_and()
.await
.unwrap()
.call(zs::Request::Tip)
.clone()
.oneshot(zs::Request::Tip)
.await
.unwrap()
Comment on lines +140 to 148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix is part of #1675 - and it's actually something I missed during my review last week.

Since we're fixing this file, let's also use ServiceExt::oneshot here.

{
Expand All @@ -131,15 +152,18 @@ where
};
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");

let block = BlockVerifier::new(network, state_service.clone());
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);
let block_verifier = BlockVerifier::new(network, state_service.clone());
let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND);
let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND);
Comment on lines +155 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit easier to read the code if we create the services, then replace them with buffered versions.


Buffer::new(
BoxService::new(ChainVerifier {
block,
checkpoint,
block_verifier,
checkpoint_verifier,
max_checkpoint_height,
}),
3,
VERIFIER_BUFFER_BOUND,
)
}