Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2. fix(state): Make FindHeaders and FindHashes run concurrently with state updates #4826

Merged
merged 1 commit into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 51 additions & 279 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::{
convert,
future::Future,
ops::{RangeBounds, RangeInclusive},
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -34,7 +33,7 @@ use tracing::{instrument, Instrument, Span};
use tower::buffer::Buffer;

use zebra_chain::{
block::{self, CountedHeader, Height},
block::{self, CountedHeader},
diagnostic::CodeTimer,
parameters::{Network, NetworkUpgrade},
transparent,
Expand Down Expand Up @@ -489,15 +488,15 @@ impl StateService {
}

/// Return true if `hash` is in the current best chain.
#[allow(dead_code)]
pub fn best_chain_contains(&self, hash: block::Hash) -> bool {
self.best_height_by_hash(hash).is_some()
read::chain_contains_hash(self.mem.best_chain(), self.disk.db(), hash)
}

/// Return the height for the block at `hash`, if `hash` is in the best chain.
#[allow(dead_code)]
pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem
.best_height_by_hash(hash)
.or_else(|| self.disk.db().height(hash))
read::height_by_hash(self.mem.best_chain(), self.disk.db(), hash)
}

/// Return the height for the block at `hash` in any chain.
Expand Down Expand Up @@ -539,262 +538,6 @@ impl StateService {
}
}

/// Find the first hash that's in the peer's `known_blocks` and the local best chain.
///
/// Returns `None` if:
/// * there is no matching hash in the best chain, or
/// * the state is empty.
fn find_best_chain_intersection(&self, known_blocks: Vec<block::Hash>) -> Option<block::Hash> {
// We can get a block locator request before we have downloaded the genesis block
self.best_tip()?;

known_blocks
.iter()
.find(|&&hash| self.best_chain_contains(hash))
.cloned()
}

/// Returns a range of [`Height`]s in the best chain,
/// starting after the `intersection` hash on the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn find_best_chain_height_range(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> impl RangeBounds<u32> + Iterator<Item = u32> {
#[allow(clippy::reversed_empty_ranges)]
const EMPTY_RANGE: RangeInclusive<u32> = 1..=0;

assert!(max_len > 0, "max_len must be at least 1");

// We can get a block locator request before we have downloaded the genesis block
let chain_tip_height = if let Some((height, _)) = self.best_tip() {
height
} else {
tracing::debug!(
response_len = ?0,
"responding to peer GetBlocks or GetHeaders with empty state",
);

return EMPTY_RANGE;
};

// Find the intersection height
let intersection_height = match intersection {
Some(intersection_hash) => match self.best_height_by_hash(intersection_hash) {
Some(intersection_height) => Some(intersection_height),

// A recently committed block dropped the intersection we previously found
None => {
info!(
?intersection,
?stop,
?max_len,
"state found intersection but then dropped it, ignoring request",
);
return EMPTY_RANGE;
}
},
// There is no intersection
None => None,
};

// Now find the start and maximum heights
let (start_height, max_height) = match intersection_height {
// start after the intersection_height, and return max_len hashes or headers
Some(intersection_height) => (
Height(intersection_height.0 + 1),
Height(intersection_height.0 + max_len),
),
// start at genesis, and return max_len hashes or headers
None => (Height(0), Height(max_len - 1)),
};

let stop_height = stop.and_then(|hash| self.best_height_by_hash(hash));

// Compute the final height, making sure it is:
// * at or below our chain tip, and
// * at or below the height of the stop hash.
let final_height = std::cmp::min(max_height, chain_tip_height);
let final_height = stop_height
.map(|stop_height| std::cmp::min(final_height, stop_height))
.unwrap_or(final_height);

// TODO: implement Step for Height, when Step stabilises
// https://github.com/rust-lang/rust/issues/42168
let height_range = start_height.0..=final_height.0;
let response_len = height_range.clone().into_iter().count();

tracing::debug!(
?start_height,
?final_height,
?response_len,
?chain_tip_height,
?stop_height,
?intersection_height,
?intersection,
?stop,
?max_len,
"responding to peer GetBlocks or GetHeaders",
);

// Check the function implements the Find protocol
assert!(
response_len <= max_len.try_into().expect("fits in usize"),
"a Find response must not exceed the maximum response length",
);

height_range
}

/// Returns a list of [`block::Hash`]es in the best chain,
/// following the `intersection` with the best chain.
///
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn collect_best_chain_hashes(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash> {
let height_range = self.find_best_chain_height_range(intersection, stop, max_len);

// All the hashes should be in the best chain.
// If they are not, we don't want to return them.
let hashes: Vec<block::Hash> = height_range.into_iter().map_while(|height| {
let hash = self.best_hash(Height(height));

// A recently committed block dropped the intersection we previously found
if hash.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}

tracing::trace!(
?hash,
?height,
?intersection,
?stop,
?max_len,
"adding hash to peer Find response",
);

hash
}).collect();

// Check the function implements the Find protocol
assert!(
intersection
.map(|hash| !hashes.contains(&hash))
.unwrap_or(true),
"the list must not contain the intersection hash",
);

if let (Some(stop), Some((_, hashes_except_last))) = (stop, hashes.split_last()) {
assert!(
!hashes_except_last.contains(&stop),
"if the stop hash is in the list, it must be the final hash",
);
}

hashes
}

/// Returns a list of [`block::Header`]s in the best chain,
/// following the `intersection` with the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn collect_best_chain_headers(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>> {
let height_range = self.find_best_chain_height_range(intersection, stop, max_len);

// We don't check that this function implements the Find protocol,
// because fetching extra hashes (or re-calculating hashes) is expensive.
// (This was one of the most expensive and longest-running functions in the state.)

// Save a copy of the non-finalized chain state
// (but the finalized state is still concurrently mutable).
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();

// All the headers should be in the best chain.
// If they are not, we don't want to return them.
height_range.into_iter().map_while(|height| {
let header = read::block_header(best_chain.clone(), &db, Height(height).into());

// A recently committed block dropped the intersection we previously found
if header.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}

tracing::trace!(
?height,
?intersection,
?stop,
?max_len,
"adding header to peer Find response",
);

header
}).collect()
}

/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of hashes that follow that intersection, from the best chain.
///
/// Starts from the first matching hash in the best chain, ignoring all other hashes in
/// `known_blocks`. If there is no matching hash in the best chain, starts from the genesis
/// hash.
///
/// Includes finalized and non-finalized blocks.
///
/// Stops the list of hashes after:
/// * adding the best tip,
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding 500 hashes to the list.
///
/// Returns an empty list if the state is empty,
/// and a partial or empty list if the found heights are concurrently modified.
pub fn find_best_chain_hashes(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash> {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_hashes(intersection, stop, max_len)
}

/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of headers that follow that intersection, from the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
pub fn find_best_chain_headers(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>> {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_headers(intersection, stop, max_len)
}

/// Assert some assumptions about the prepared `block` before it is validated.
fn assert_block_can_be_validated(&self, block: &PreparedBlock) {
// required by validate_and_commit, moved here to make testing easier
Expand Down Expand Up @@ -1113,14 +856,32 @@ impl Service<Request> for StateService {

let timer = CodeTimer::start();

// TODO: move this work into the future, like Block and Transaction?
let res =
self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS);
// Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();

// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "FindBlockHashes");
// # Performance
//
// Allow other async tasks to make progress while the block is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let res = read::find_chain_hashes(
best_chain,
&db,
known_blocks,
stop,
MAX_FIND_BLOCK_HASHES_RESULTS,
);

// The work is done in the future.
timer.finish(module_path!(), line!(), "FindBlockHashes");

async move { Ok(Response::BlockHashes(res)) }.boxed()
Ok(Response::BlockHashes(res))
})
})
.map(|join_result| join_result.expect("panic in Request::Block"))
.boxed()
}
Request::FindBlockHeaders { known_blocks, stop } => {
metrics::counter!(
Expand All @@ -1143,19 +904,30 @@ impl Service<Request> for StateService {

let timer = CodeTimer::start();

// TODO: move this work into the future, like Block and Transaction?
let res = self.find_best_chain_headers(known_blocks, stop, max_len);

// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "FindBlockHeaders");
// Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();

async move {
Ok(Response::BlockHeaders(
res.into_iter()
// # Performance
//
// Allow other async tasks to make progress while the block is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let res =
read::find_chain_headers(best_chain, &db, known_blocks, stop, max_len);
let res = res
.into_iter()
.map(|header| CountedHeader { header })
.collect(),
))
}
.collect();

// The work is done in the future.
timer.finish(module_path!(), line!(), "FindBlockHeaders");

Ok(Response::BlockHeaders(res))
})
})
.map(|join_result| join_result.expect("panic in Request::Block"))
.boxed()
}
}
Expand Down
8 changes: 8 additions & 0 deletions zebra-state/src/service/finalized_state/zebra_db/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ impl ZebraDb {
self.db.zs_get(&hash_by_height, &height)
}

/// Returns `true` if `hash` is present in the finalized state.
#[allow(clippy::unwrap_in_result)]
pub fn contains_hash(&self, hash: block::Hash) -> bool {
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();

self.db.zs_contains(&height_by_hash, &hash)
}

/// Returns the height of the given block if it exists.
#[allow(clippy::unwrap_in_result)]
pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
Expand Down
Loading