diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 7b24a4e5715..4c14fecf4c9 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -18,7 +18,6 @@ use std::{ convert, future::Future, - ops::{RangeBounds, RangeInclusive}, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -34,7 +33,7 @@ use tracing::{instrument, Instrument, Span}; use tower::buffer::Buffer; use zebra_chain::{ - block::{self, CountedHeader, Height}, + block::{self, CountedHeader}, diagnostic::CodeTimer, parameters::{Network, NetworkUpgrade}, transparent, @@ -489,15 +488,15 @@ impl StateService { } /// Return true if `hash` is in the current best chain. + #[allow(dead_code)] pub fn best_chain_contains(&self, hash: block::Hash) -> bool { - self.best_height_by_hash(hash).is_some() + read::chain_contains_hash(self.mem.best_chain(), self.disk.db(), hash) } /// Return the height for the block at `hash`, if `hash` is in the best chain. + #[allow(dead_code)] pub fn best_height_by_hash(&self, hash: block::Hash) -> Option { - self.mem - .best_height_by_hash(hash) - .or_else(|| self.disk.db().height(hash)) + read::height_by_hash(self.mem.best_chain(), self.disk.db(), hash) } /// Return the height for the block at `hash` in any chain. @@ -539,262 +538,6 @@ impl StateService { } } - /// Find the first hash that's in the peer's `known_blocks` and the local best chain. - /// - /// Returns `None` if: - /// * there is no matching hash in the best chain, or - /// * the state is empty. - fn find_best_chain_intersection(&self, known_blocks: Vec) -> Option { - // We can get a block locator request before we have downloaded the genesis block - self.best_tip()?; - - known_blocks - .iter() - .find(|&&hash| self.best_chain_contains(hash)) - .cloned() - } - - /// Returns a range of [`Height`]s in the best chain, - /// starting after the `intersection` hash on the best chain. - /// - /// See [`Self::find_best_chain_hashes()`] for details. - fn find_best_chain_height_range( - &self, - intersection: Option, - stop: Option, - max_len: u32, - ) -> impl RangeBounds + Iterator { - #[allow(clippy::reversed_empty_ranges)] - const EMPTY_RANGE: RangeInclusive = 1..=0; - - assert!(max_len > 0, "max_len must be at least 1"); - - // We can get a block locator request before we have downloaded the genesis block - let chain_tip_height = if let Some((height, _)) = self.best_tip() { - height - } else { - tracing::debug!( - response_len = ?0, - "responding to peer GetBlocks or GetHeaders with empty state", - ); - - return EMPTY_RANGE; - }; - - // Find the intersection height - let intersection_height = match intersection { - Some(intersection_hash) => match self.best_height_by_hash(intersection_hash) { - Some(intersection_height) => Some(intersection_height), - - // A recently committed block dropped the intersection we previously found - None => { - info!( - ?intersection, - ?stop, - ?max_len, - "state found intersection but then dropped it, ignoring request", - ); - return EMPTY_RANGE; - } - }, - // There is no intersection - None => None, - }; - - // Now find the start and maximum heights - let (start_height, max_height) = match intersection_height { - // start after the intersection_height, and return max_len hashes or headers - Some(intersection_height) => ( - Height(intersection_height.0 + 1), - Height(intersection_height.0 + max_len), - ), - // start at genesis, and return max_len hashes or headers - None => (Height(0), Height(max_len - 1)), - }; - - let stop_height = stop.and_then(|hash| self.best_height_by_hash(hash)); - - // Compute the final height, making sure it is: - // * at or below our chain tip, and - // * at or below the height of the stop hash. - let final_height = std::cmp::min(max_height, chain_tip_height); - let final_height = stop_height - .map(|stop_height| std::cmp::min(final_height, stop_height)) - .unwrap_or(final_height); - - // TODO: implement Step for Height, when Step stabilises - // https://github.com/rust-lang/rust/issues/42168 - let height_range = start_height.0..=final_height.0; - let response_len = height_range.clone().into_iter().count(); - - tracing::debug!( - ?start_height, - ?final_height, - ?response_len, - ?chain_tip_height, - ?stop_height, - ?intersection_height, - ?intersection, - ?stop, - ?max_len, - "responding to peer GetBlocks or GetHeaders", - ); - - // Check the function implements the Find protocol - assert!( - response_len <= max_len.try_into().expect("fits in usize"), - "a Find response must not exceed the maximum response length", - ); - - height_range - } - - /// Returns a list of [`block::Hash`]es in the best chain, - /// following the `intersection` with the best chain. - /// - /// - /// See [`Self::find_best_chain_hashes()`] for details. - fn collect_best_chain_hashes( - &self, - intersection: Option, - stop: Option, - max_len: u32, - ) -> Vec { - let height_range = self.find_best_chain_height_range(intersection, stop, max_len); - - // All the hashes should be in the best chain. - // If they are not, we don't want to return them. - let hashes: Vec = height_range.into_iter().map_while(|height| { - let hash = self.best_hash(Height(height)); - - // A recently committed block dropped the intersection we previously found - if hash.is_none() { - info!( - ?intersection, - ?stop, - ?max_len, - "state found height range, but then partially dropped it, returning partial response", - ); - } - - tracing::trace!( - ?hash, - ?height, - ?intersection, - ?stop, - ?max_len, - "adding hash to peer Find response", - ); - - hash - }).collect(); - - // Check the function implements the Find protocol - assert!( - intersection - .map(|hash| !hashes.contains(&hash)) - .unwrap_or(true), - "the list must not contain the intersection hash", - ); - - if let (Some(stop), Some((_, hashes_except_last))) = (stop, hashes.split_last()) { - assert!( - !hashes_except_last.contains(&stop), - "if the stop hash is in the list, it must be the final hash", - ); - } - - hashes - } - - /// Returns a list of [`block::Header`]s in the best chain, - /// following the `intersection` with the best chain. - /// - /// See [`Self::find_best_chain_hashes()`] for details. - fn collect_best_chain_headers( - &self, - intersection: Option, - stop: Option, - max_len: u32, - ) -> Vec> { - let height_range = self.find_best_chain_height_range(intersection, stop, max_len); - - // We don't check that this function implements the Find protocol, - // because fetching extra hashes (or re-calculating hashes) is expensive. - // (This was one of the most expensive and longest-running functions in the state.) - - // Save a copy of the non-finalized chain state - // (but the finalized state is still concurrently mutable). - let best_chain = self.mem.best_chain().cloned(); - let db = self.disk.db().clone(); - - // All the headers should be in the best chain. - // If they are not, we don't want to return them. - height_range.into_iter().map_while(|height| { - let header = read::block_header(best_chain.clone(), &db, Height(height).into()); - - // A recently committed block dropped the intersection we previously found - if header.is_none() { - info!( - ?intersection, - ?stop, - ?max_len, - "state found height range, but then partially dropped it, returning partial response", - ); - } - - tracing::trace!( - ?height, - ?intersection, - ?stop, - ?max_len, - "adding header to peer Find response", - ); - - header - }).collect() - } - - /// Finds the first hash that's in the peer's `known_blocks` and the local best chain. - /// Returns a list of hashes that follow that intersection, from the best chain. - /// - /// Starts from the first matching hash in the best chain, ignoring all other hashes in - /// `known_blocks`. If there is no matching hash in the best chain, starts from the genesis - /// hash. - /// - /// Includes finalized and non-finalized blocks. - /// - /// Stops the list of hashes after: - /// * adding the best tip, - /// * adding the `stop` hash to the list, if it is in the best chain, or - /// * adding 500 hashes to the list. - /// - /// Returns an empty list if the state is empty, - /// and a partial or empty list if the found heights are concurrently modified. - pub fn find_best_chain_hashes( - &self, - known_blocks: Vec, - stop: Option, - max_len: u32, - ) -> Vec { - let intersection = self.find_best_chain_intersection(known_blocks); - self.collect_best_chain_hashes(intersection, stop, max_len) - } - - /// Finds the first hash that's in the peer's `known_blocks` and the local best chain. - /// Returns a list of headers that follow that intersection, from the best chain. - /// - /// See [`Self::find_best_chain_hashes()`] for details. - pub fn find_best_chain_headers( - &self, - known_blocks: Vec, - stop: Option, - max_len: u32, - ) -> Vec> { - let intersection = self.find_best_chain_intersection(known_blocks); - self.collect_best_chain_headers(intersection, stop, max_len) - } - /// Assert some assumptions about the prepared `block` before it is validated. fn assert_block_can_be_validated(&self, block: &PreparedBlock) { // required by validate_and_commit, moved here to make testing easier @@ -1113,14 +856,32 @@ impl Service for StateService { let timer = CodeTimer::start(); - // TODO: move this work into the future, like Block and Transaction? - let res = - self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS); + // Prepare data for concurrent execution + let best_chain = self.mem.best_chain().cloned(); + let db = self.disk.db().clone(); - // The work is all done, the future just returns the result. - timer.finish(module_path!(), line!(), "FindBlockHashes"); + // # Performance + // + // Allow other async tasks to make progress while the block is being read from disk. + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let res = read::find_chain_hashes( + best_chain, + &db, + known_blocks, + stop, + MAX_FIND_BLOCK_HASHES_RESULTS, + ); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "FindBlockHashes"); - async move { Ok(Response::BlockHashes(res)) }.boxed() + Ok(Response::BlockHashes(res)) + }) + }) + .map(|join_result| join_result.expect("panic in Request::Block")) + .boxed() } Request::FindBlockHeaders { known_blocks, stop } => { metrics::counter!( @@ -1143,19 +904,30 @@ impl Service for StateService { let timer = CodeTimer::start(); - // TODO: move this work into the future, like Block and Transaction? - let res = self.find_best_chain_headers(known_blocks, stop, max_len); - - // The work is all done, the future just returns the result. - timer.finish(module_path!(), line!(), "FindBlockHeaders"); + // Prepare data for concurrent execution + let best_chain = self.mem.best_chain().cloned(); + let db = self.disk.db().clone(); - async move { - Ok(Response::BlockHeaders( - res.into_iter() + // # Performance + // + // Allow other async tasks to make progress while the block is being read from disk. + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let res = + read::find_chain_headers(best_chain, &db, known_blocks, stop, max_len); + let res = res + .into_iter() .map(|header| CountedHeader { header }) - .collect(), - )) - } + .collect(); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "FindBlockHeaders"); + + Ok(Response::BlockHeaders(res)) + }) + }) + .map(|join_result| join_result.expect("panic in Request::Block")) .boxed() } } diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 56089c6d2ed..ed4b82611ce 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -73,6 +73,14 @@ impl ZebraDb { self.db.zs_get(&hash_by_height, &height) } + /// Returns `true` if `hash` is present in the finalized state. + #[allow(clippy::unwrap_in_result)] + pub fn contains_hash(&self, hash: block::Hash) -> bool { + let height_by_hash = self.db.cf_handle("height_by_hash").unwrap(); + + self.db.zs_contains(&height_by_hash, &hash) + } + /// Returns the height of the given block if it exists. #[allow(clippy::unwrap_in_result)] pub fn height(&self, hash: block::Hash) -> Option { diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index a6619735c71..aabfac839a5 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -457,6 +457,18 @@ impl Chain { .get(tx_loc.index.as_usize()) } + /// Returns the [`block::Hash`] for `height`, if it exists in this chain. + pub fn hash_by_height(&self, height: Height) -> Option { + let hash = self.blocks.get(&height)?.hash; + + Some(hash) + } + + /// Returns the [`Height`] for `hash`, if it exists in this chain. + pub fn height_by_hash(&self, hash: block::Hash) -> Option { + self.height_by_hash.get(&hash).cloned() + } + /// Returns the non-finalized tip block hash and height. #[allow(dead_code)] pub fn non_finalized_tip(&self) -> (block::Hash, block::Height) { diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index d611a35dd99..019a1bf8ee3 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -11,7 +11,7 @@ //! [5]: super::Chain use std::{ collections::{BTreeMap, BTreeSet, HashSet}, - ops::RangeInclusive, + ops::{RangeBounds, RangeInclusive}, sync::Arc, }; @@ -50,6 +50,8 @@ const FINALIZED_ADDRESS_INDEX_RETRIES: usize = 3; /// so they are not included in any address indexes. pub const ADDRESS_HEIGHTS_FULL_RANGE: RangeInclusive = Height(1)..=Height::MAX; +// Blocks and Transactions + /// Returns the [`Block`] with [`block::Hash`](zebra_chain::block::Hash) or /// [`Height`], if it exists in the non-finalized `chain` or finalized `db`. pub(crate) fn block( @@ -108,7 +110,7 @@ pub(crate) fn transaction( chain: Option, db: &ZebraDb, hash: transaction::Hash, -) -> Option<(Arc, block::Height)> +) -> Option<(Arc, Height)> where C: AsRef, { @@ -131,6 +133,344 @@ where .or_else(|| db.transaction(hash)) } +// FindBlockHeaders / FindBlockHashes + +/// Returns the tip of `chain`. +/// If there is no chain, returns the tip of `db`. +pub(crate) fn tip_height(chain: Option, db: &ZebraDb) -> Option +where + C: AsRef, +{ + chain + .map(|chain| chain.as_ref().non_finalized_tip_height()) + .or_else(|| db.finalized_tip_height()) +} + +/// Return the height for the block at `hash`, if `hash` is in the chain. +pub fn height_by_hash(chain: Option, db: &ZebraDb, hash: block::Hash) -> Option +where + C: AsRef, +{ + chain + .and_then(|chain| chain.as_ref().height_by_hash(hash)) + .or_else(|| db.height(hash)) +} + +/// Return the hash for the block at `height`, if `height` is in the chain. +pub fn hash_by_height(chain: Option, db: &ZebraDb, height: Height) -> Option +where + C: AsRef, +{ + chain + .and_then(|chain| chain.as_ref().hash_by_height(height)) + .or_else(|| db.hash(height)) +} + +/// Return true if `hash` is in the chain. +pub fn chain_contains_hash(chain: Option, db: &ZebraDb, hash: block::Hash) -> bool +where + C: AsRef, +{ + chain + .map(|chain| chain.as_ref().height_by_hash.contains_key(&hash)) + .unwrap_or(false) + || db.contains_hash(hash) +} + +/// Find the first hash that's in the peer's `known_blocks` and the chain. +/// +/// Returns `None` if: +/// * there is no matching hash in the chain, or +/// * the state is empty. +fn find_chain_intersection( + chain: Option, + db: &ZebraDb, + known_blocks: Vec, +) -> Option +where + C: AsRef, +{ + // We can get a block locator request before we have downloaded the genesis block + if chain.is_none() && db.is_empty() { + return None; + } + + let chain = chain.as_ref(); + + known_blocks + .iter() + .find(|&&hash| chain_contains_hash(chain, db, hash)) + .cloned() +} + +/// Returns a range of [`Height`]s in the chain, +/// starting after the `intersection` hash on the chain. +/// +/// See [`find_chain_hashes()`] for details. +fn find_chain_height_range( + chain: Option, + db: &ZebraDb, + intersection: Option, + stop: Option, + max_len: u32, +) -> impl RangeBounds + Iterator +where + C: AsRef, +{ + #[allow(clippy::reversed_empty_ranges)] + const EMPTY_RANGE: RangeInclusive = 1..=0; + + assert!(max_len > 0, "max_len must be at least 1"); + + let chain = chain.as_ref(); + + // We can get a block locator request before we have downloaded the genesis block + let chain_tip_height = if let Some(height) = tip_height(chain, db) { + height + } else { + tracing::debug!( + response_len = ?0, + "responding to peer GetBlocks or GetHeaders with empty state", + ); + + return EMPTY_RANGE; + }; + + // Find the intersection height + let intersection_height = match intersection { + Some(intersection_hash) => match height_by_hash(chain, db, intersection_hash) { + Some(intersection_height) => Some(intersection_height), + + // A recently committed block dropped the intersection we previously found + None => { + info!( + ?intersection, + ?stop, + ?max_len, + "state found intersection but then dropped it, ignoring request", + ); + return EMPTY_RANGE; + } + }, + // There is no intersection + None => None, + }; + + // Now find the start and maximum heights + let (start_height, max_height) = match intersection_height { + // start after the intersection_height, and return max_len hashes or headers + Some(intersection_height) => ( + Height(intersection_height.0 + 1), + Height(intersection_height.0 + max_len), + ), + // start at genesis, and return max_len hashes or headers + None => (Height(0), Height(max_len - 1)), + }; + + let stop_height = stop.and_then(|hash| height_by_hash(chain, db, hash)); + + // Compute the final height, making sure it is: + // * at or below our chain tip, and + // * at or below the height of the stop hash. + let final_height = std::cmp::min(max_height, chain_tip_height); + let final_height = stop_height + .map(|stop_height| std::cmp::min(final_height, stop_height)) + .unwrap_or(final_height); + + // TODO: implement Step for Height, when Step stabilises + // https://github.com/rust-lang/rust/issues/42168 + let height_range = start_height.0..=final_height.0; + let response_len = height_range.clone().into_iter().count(); + + tracing::debug!( + ?start_height, + ?final_height, + ?response_len, + ?chain_tip_height, + ?stop_height, + ?intersection_height, + ?intersection, + ?stop, + ?max_len, + "responding to peer GetBlocks or GetHeaders", + ); + + // Check the function implements the Find protocol + assert!( + response_len <= max_len.try_into().expect("fits in usize"), + "a Find response must not exceed the maximum response length", + ); + + height_range +} + +/// Returns a list of [`block::Hash`]es in the chain, +/// following the `intersection` with the chain. +/// +/// +/// See [`find_chain_hashes()`] for details. +fn collect_chain_hashes( + chain: Option, + db: &ZebraDb, + intersection: Option, + stop: Option, + max_len: u32, +) -> Vec +where + C: AsRef, +{ + let chain = chain.as_ref(); + + let height_range = find_chain_height_range(chain, db, intersection, stop, max_len); + + // All the hashes should be in the chain. + // If they are not, we don't want to return them. + let hashes: Vec = height_range.into_iter().map_while(|height| { + let hash = hash_by_height(chain, db, Height(height)); + + // A recently committed block dropped the intersection we previously found. + if hash.is_none() { + info!( + ?intersection, + ?stop, + ?max_len, + "state found height range, but then partially dropped it, returning partial response", + ); + } + + tracing::trace!( + ?hash, + ?height, + ?intersection, + ?stop, + ?max_len, + "adding hash to peer Find response", + ); + + hash + }).collect(); + + // Check the function implements the Find protocol + assert!( + intersection + .map(|hash| !hashes.contains(&hash)) + .unwrap_or(true), + "the list must not contain the intersection hash", + ); + + if let (Some(stop), Some((_, hashes_except_last))) = (stop, hashes.split_last()) { + assert!( + !hashes_except_last.contains(&stop), + "if the stop hash is in the list, it must be the final hash", + ); + } + + hashes +} + +/// Returns a list of [`block::Header`]s in the chain, +/// following the `intersection` with the chain. +/// +/// See [`find_chain_hashes()`] for details. +fn collect_chain_headers( + chain: Option, + db: &ZebraDb, + intersection: Option, + stop: Option, + max_len: u32, +) -> Vec> +where + C: AsRef, +{ + let chain = chain.as_ref(); + + let height_range = find_chain_height_range(chain, db, intersection, stop, max_len); + + // We don't check that this function implements the Find protocol, + // because fetching extra hashes (or re-calculating hashes) is expensive. + // (This was one of the most expensive and longest-running functions in the state.) + + // All the headers should be in the chain. + // If they are not, we don't want to return them. + height_range.into_iter().map_while(|height| { + let header = block_header(chain, db, Height(height).into()); + + // A recently committed block dropped the intersection we previously found + if header.is_none() { + info!( + ?intersection, + ?stop, + ?max_len, + "state found height range, but then partially dropped it, returning partial response", + ); + } + + tracing::trace!( + ?height, + ?intersection, + ?stop, + ?max_len, + "adding header to peer Find response", + ); + + header + }).collect() +} + +/// Finds the first hash that's in the peer's `known_blocks` and the chain. +/// Returns a list of hashes that follow that intersection, from the chain. +/// +/// Starts from the first matching hash in the chain, ignoring all other hashes in +/// `known_blocks`. If there is no matching hash in the chain, starts from the genesis +/// hash. +/// +/// Includes finalized and non-finalized blocks. +/// +/// Stops the list of hashes after: +/// * adding the tip, +/// * adding the `stop` hash to the list, if it is in the chain, or +/// * adding 500 hashes to the list. +/// +/// Returns an empty list if the state is empty, +/// and a partial or empty list if the found heights are concurrently modified. +pub fn find_chain_hashes( + chain: Option, + db: &ZebraDb, + known_blocks: Vec, + stop: Option, + max_len: u32, +) -> Vec +where + C: AsRef, +{ + let chain = chain.as_ref(); + let intersection = find_chain_intersection(chain, db, known_blocks); + + collect_chain_hashes(chain, db, intersection, stop, max_len) +} + +/// Finds the first hash that's in the peer's `known_blocks` and the chain. +/// Returns a list of headers that follow that intersection, from the chain. +/// +/// See [`find_chain_hashes()`] for details. +pub fn find_chain_headers( + chain: Option, + db: &ZebraDb, + known_blocks: Vec, + stop: Option, + max_len: u32, +) -> Vec> +where + C: AsRef, +{ + let chain = chain.as_ref(); + let intersection = find_chain_intersection(chain, db, known_blocks); + + collect_chain_headers(chain, db, intersection, stop, max_len) +} + +// Note Commitment Trees + /// Returns the Sapling /// [`NoteCommitmentTree`](sapling::tree::NoteCommitmentTree) specified by a /// hash or height, if it exists in the non-finalized `chain` or finalized `db`. @@ -181,6 +521,8 @@ where .or_else(|| db.orchard_tree(hash_or_height)) } +// Address Balance + /// Returns the total transparent balance for the supplied [`transparent::Address`]es. /// /// If the addresses do not exist in the non-finalized `chain` or finalized `db`, returns zero. @@ -301,6 +643,8 @@ fn apply_balance_change( balance?.constrain() } +// Address UTXOs + /// Returns the unspent transparent outputs (UTXOs) for the supplied [`transparent::Address`]es, /// in chain order; and the transaction IDs for the transactions containing those UTXOs. /// @@ -633,6 +977,8 @@ where .collect() } +// Address TX IDs + /// Returns the transaction IDs that sent or received funds from the supplied [`transparent::Address`]es, /// within `query_height_range`, in chain order. ///