Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partly revert "Fix poll_ready usage in ChainVerifier" #1735

Merged
merged 4 commits into from
Feb 20, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 35 additions & 38 deletions zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use zebra_state as zs;

use crate::{
block::BlockVerifier,
checkpoint::{CheckpointList, CheckpointVerifier},
block::VerifyBlockError,
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
BoxError, Config,
};

/// The bound for each verifier's buffer.
/// The bound for the chain verifier's buffer.
///
/// We choose the verifier buffer bound based on the maximum number of
/// concurrent verifier users, to avoid contention:
Expand All @@ -46,20 +47,17 @@ where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
// Normally, we erase the types on buffer-wrapped services.
// But if we did that here, the block and checkpoint services would be
// type-indistinguishable, risking future substitution errors.
block_verifier: Buffer<BlockVerifier<S>, Arc<block::Block>>,
checkpoint_verifier: Buffer<CheckpointVerifier<S>, Arc<block::Block>>,
block: BlockVerifier<S>,
checkpoint: CheckpointVerifier<S>,
max_checkpoint_height: block::Height,
}

#[derive(Debug, Display, Error)]
pub enum VerifyChainError {
/// block could not be checkpointed
Checkpoint(#[source] BoxError),
Checkpoint(#[source] VerifyCheckpointError),
/// block could not be verified
Block(#[source] BoxError),
Block(#[source] VerifyBlockError),
}

impl<S> Service<Arc<Block>> for ChainVerifier<S>
Expand All @@ -72,34 +70,38 @@ where
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Correctness:
//
// We can't call `poll_ready` on the block and checkpoint verifiers here,
// because each `poll_ready` must be followed by a `call`, and we don't
// know which verifier we're going to choose yet.
// See #1593 for details.
// We acquire checkpoint readiness before block readiness, to avoid an unlikely
// hang during the checkpoint to block verifier transition. If the checkpoint and
// block verifiers are contending for the same buffer/batch, we want the checkpoint
// verifier to win, so that checkpoint verification completes, and block verification
// can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
use futures::ready;
// The chain verifier holds one slot in each verifier, for each concurrent task.
// Therefore, any shared buffers or batches polled by these verifiers should double
// their bounds. (For example, the state service buffer.)
ready!(self
.checkpoint
.poll_ready(cx)
.map_err(VerifyChainError::Checkpoint))?;
ready!(self.block.poll_ready(cx).map_err(VerifyChainError::Block))?;
Poll::Ready(Ok(()))
}

fn call(&mut self, block: Arc<Block>) -> Self::Future {
match block.coinbase_height() {
// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has
// a matching `call`. See #1593 for details.
Some(height) if height <= self.max_checkpoint_height => self
.checkpoint_verifier
.clone()
.oneshot(block)
.checkpoint
.call(block)
.map_err(VerifyChainError::Checkpoint)
.boxed(),
// This also covers blocks with no height, which the block verifier
// will reject immediately.
_ => self
.block_verifier
.clone()
.oneshot(block)
.block
.call(block)
.map_err(VerifyChainError::Block)
.boxed(),
}
Expand All @@ -122,7 +124,7 @@ where
pub async fn init<S>(
config: Config,
network: Network,
state_service: S,
mut state_service: S,
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
Expand All @@ -137,13 +139,11 @@ where
.expect("hardcoded checkpoint list extends past sapling activation")
};

// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has a
// matching `call`. See #1593 for details.
let tip = match state_service
.clone()
.oneshot(zs::Request::Tip)
.ready_and()
.await
.unwrap()
.call(zs::Request::Tip)
.await
.unwrap()
{
Expand All @@ -152,16 +152,13 @@ where
};
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");

let block_verifier = BlockVerifier::new(network, state_service.clone());
let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND);
let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND);
let block = BlockVerifier::new(network, state_service.clone());
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

Buffer::new(
BoxService::new(ChainVerifier {
block_verifier,
checkpoint_verifier,
block,
checkpoint,
max_checkpoint_height,
}),
VERIFIER_BUFFER_BOUND,
Expand Down