diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index a1f7c99067e..bbd5bfcac9a 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,4 +1,4 @@ -use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; use slog::{crit, debug, Logger}; use std::collections::HashMap; @@ -412,8 +412,13 @@ impl BeaconBlockStreamer { fn check_caches(&self, root: Hash256) -> Option>> { if self.check_caches == CheckCaches::Yes { self.beacon_chain - .data_availability_checker - .get_block(&root) + .reqresp_pre_import_cache + .read() + .get(&root) + .map(|block| { + metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS); + block.clone() + }) .or(self.beacon_chain.early_attester_cache.get_block(root)) } else { None diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 421bc12ee43..9811ac7cf8d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -121,7 +121,7 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tokio_stream::Stream; use tree_hash::TreeHash; -use types::blob_sidecar::FixedBlobSidecarList; +use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList}; use types::payload::BlockProductionVersion; use types::*; @@ -359,6 +359,9 @@ pub type BeaconStore = Arc< >, >; +/// Cache gossip verified blocks to serve over ReqResp before they are imported +type ReqRespPreImportCache = HashMap>>; + /// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block /// operations and chooses a canonical head. pub struct BeaconChain { @@ -461,6 +464,8 @@ pub struct BeaconChain { pub(crate) attester_cache: Arc, /// A cache used when producing attestations whilst the head block is still being imported. pub early_attester_cache: EarlyAttesterCache, + /// Cache gossip verified blocks to serve over ReqResp before they are imported + pub reqresp_pre_import_cache: Arc>>, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. @@ -2898,8 +2903,6 @@ impl BeaconChain { } } - self.data_availability_checker - .notify_gossip_blob(block_root, &blob); let r = self.check_gossip_blob_availability_and_import(blob).await; self.remove_notified(&block_root, r) } @@ -2932,8 +2935,6 @@ impl BeaconChain { } } - self.data_availability_checker - .notify_rpc_blobs(block_root, &blobs); let r = self .check_rpc_blob_availability_and_import(slot, block_root, blobs) .await; @@ -2950,7 +2951,7 @@ impl BeaconChain { let has_missing_components = matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); if !has_missing_components { - self.data_availability_checker.remove_notified(block_root); + self.reqresp_pre_import_cache.write().remove(block_root); } r } @@ -2963,8 +2964,10 @@ impl BeaconChain { unverified_block: B, notify_execution_layer: NotifyExecutionLayer, ) -> Result> { - self.data_availability_checker - .notify_block(block_root, unverified_block.block_cloned()); + self.reqresp_pre_import_cache + .write() + .insert(block_root, unverified_block.block_cloned()); + let r = self .process_block(block_root, unverified_block, notify_execution_layer, || { Ok(()) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c1ebeb68bba..c390f881092 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -955,6 +955,7 @@ where validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), early_attester_cache: <_>::default(), + reqresp_pre_import_cache: <_>::default(), light_client_server_cache: LightClientServerCache::new(), light_client_server_tx: self.light_client_server_tx, shutdown_sender: self diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index a4cb4177268..1d4e7e57e92 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -7,11 +7,8 @@ pub use crate::data_availability_checker::availability_view::{ }; pub use crate::data_availability_checker::child_components::ChildComponents; use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache; -use crate::data_availability_checker::processing_cache::ProcessingCache; use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; -use parking_lot::RwLock; -pub use processing_cache::ProcessingComponents; use slasher::test_utils::E; use slog::{debug, error, Logger}; use slot_clock::SlotClock; @@ -21,7 +18,6 @@ use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use task_executor::TaskExecutor; -use types::beacon_block_body::KzgCommitmentOpts; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; @@ -29,7 +25,6 @@ mod availability_view; mod child_components; mod error; mod overflow_lru_cache; -mod processing_cache; mod state_lru_cache; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; @@ -50,7 +45,6 @@ pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get(); /// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as /// checking whether a "availability check" is required at all. pub struct DataAvailabilityChecker { - processing_cache: RwLock>, availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Option>, @@ -89,7 +83,6 @@ impl DataAvailabilityChecker { ) -> Result { let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; Ok(Self { - processing_cache: <_>::default(), availability_cache: Arc::new(overflow_cache), slot_clock, log: log.clone(), @@ -98,17 +91,25 @@ impl DataAvailabilityChecker { }) } - /// Checks if the given block root is cached. + /// Checks if the block root is currenlty in the availability cache awaiting processing because + /// of missing components. pub fn has_block(&self, block_root: &Hash256) -> bool { - self.processing_cache.read().has_block(block_root) + self.availability_cache.has_block(block_root) } - /// Get the processing info for a block. - pub fn get_processing_components( - &self, - block_root: Hash256, - ) -> Option> { - self.processing_cache.read().get(&block_root).cloned() + pub fn get_missing_blob_ids_with(&self, block_root: Hash256) -> MissingBlobs { + self.availability_cache + .with_pending_components(&block_root, |pending_components| match pending_components { + Some(pending_components) => self.get_missing_blob_ids( + block_root, + pending_components + .get_cached_block() + .as_ref() + .map(|b| b.as_block()), + &pending_components.verified_blobs, + ), + None => MissingBlobs::new_without_block(block_root, self.is_deneb()), + }) } /// If there's no block, all possible ids will be returned that don't exist in the given blobs. @@ -116,7 +117,7 @@ impl DataAvailabilityChecker { pub fn get_missing_blob_ids( &self, block_root: Hash256, - block: &Option>>, + block: Option<&SignedBeaconBlock>, blobs: &FixedVector, ::MaxBlobsPerBlock>, ) -> MissingBlobs { let Some(current_slot) = self.slot_clock.now_or_genesis() else { @@ -132,10 +133,15 @@ impl DataAvailabilityChecker { if self.da_check_required_for_epoch(current_epoch) { match block { Some(cached_block) => { - let block_commitments = cached_block.get_commitments(); + let block_commitments_len = cached_block + .message() + .body() + .blob_kzg_commitments() + .map(|v| v.len()) + .unwrap_or(0); let blob_ids = blobs .iter() - .take(block_commitments.len()) + .take(block_commitments_len) .enumerate() .filter_map(|(index, blob_commitment_opt)| { blob_commitment_opt.is_none().then_some(BlobIdentifier { @@ -163,14 +169,6 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } - /// Get a block from the availability cache. Includes any blocks we are currently processing. - pub fn get_block(&self, block_root: &Hash256) -> Option>> { - self.processing_cache - .read() - .get(block_root) - .and_then(|cached| cached.block.clone()) - } - /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( @@ -323,52 +321,6 @@ impl DataAvailabilityChecker { block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch()) } - /// Adds a block to the processing cache. This block's commitments are unverified but caching - /// them here is useful to avoid duplicate downloads of blocks, as well as understanding - /// our blob download requirements. We will also serve this over RPC. - pub fn notify_block(&self, block_root: Hash256, block: Arc>) { - self.processing_cache - .write() - .entry(block_root) - .or_default() - .merge_block(block); - } - - /// Add a single blob commitment to the processing cache. This commitment is unverified but caching - /// them here is useful to avoid duplicate downloads of blobs, as well as understanding - /// our block and blob download requirements. - pub fn notify_gossip_blob(&self, block_root: Hash256, blob: &GossipVerifiedBlob) { - let index = blob.index(); - let commitment = blob.kzg_commitment(); - self.processing_cache - .write() - .entry(block_root) - .or_default() - .merge_single_blob(index as usize, commitment); - } - - /// Adds blob commitments to the processing cache. These commitments are unverified but caching - /// them here is useful to avoid duplicate downloads of blobs, as well as understanding - /// our block and blob download requirements. - pub fn notify_rpc_blobs(&self, block_root: Hash256, blobs: &FixedBlobSidecarList) { - let mut commitments = KzgCommitmentOpts::::default(); - for blob in blobs.iter().flatten() { - if let Some(commitment) = commitments.get_mut(blob.index as usize) { - *commitment = Some(blob.kzg_commitment); - } - } - self.processing_cache - .write() - .entry(block_root) - .or_default() - .merge_blobs(commitments); - } - - /// Clears the block and all blobs from the processing cache for a give root if they exist. - pub fn remove_notified(&self, block_root: &Hash256) { - self.processing_cache.write().remove(block_root) - } - /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { @@ -410,7 +362,6 @@ impl DataAvailabilityChecker { /// Collects metrics from the data availability checker. pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { DataAvailabilityCheckerMetrics { - processing_cache_size: self.processing_cache.read().len(), num_store_entries: self.availability_cache.num_store_entries(), state_cache_size: self.availability_cache.state_cache_size(), block_cache_size: self.availability_cache.block_cache_size(), @@ -420,7 +371,6 @@ impl DataAvailabilityChecker { /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { - pub processing_cache_size: usize, pub num_store_entries: usize, pub state_cache_size: usize, pub block_cache_size: usize, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs index 7b305cf959d..d4e5ca34492 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs @@ -2,7 +2,6 @@ use super::state_lru_cache::DietAvailabilityPendingExecutedBlock; use crate::blob_verification::KzgVerifiedBlob; use crate::block_verification_types::AsBlock; use crate::data_availability_checker::overflow_lru_cache::PendingComponents; -use crate::data_availability_checker::ProcessingComponents; use kzg::KzgCommitment; use ssz_types::FixedVector; use std::sync::Arc; @@ -178,14 +177,6 @@ macro_rules! impl_availability_view { }; } -impl_availability_view!( - ProcessingComponents, - Arc>, - KzgCommitment, - block, - blob_commitments -); - impl_availability_view!( PendingComponents, DietAvailabilityPendingExecutedBlock, @@ -293,32 +284,6 @@ pub mod tests { (block, blobs, invalid_blobs) } - type ProcessingViewSetup = ( - Arc>, - FixedVector, ::MaxBlobsPerBlock>, - FixedVector, ::MaxBlobsPerBlock>, - ); - - pub fn setup_processing_components( - block: SignedBeaconBlock, - valid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - invalid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - ) -> ProcessingViewSetup { - let blobs = FixedVector::from( - valid_blobs - .iter() - .map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment)) - .collect::>(), - ); - let invalid_blobs = FixedVector::from( - invalid_blobs - .iter() - .map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment)) - .collect::>(), - ); - (Arc::new(block), blobs, invalid_blobs) - } - type PendingComponentsSetup = ( DietAvailabilityPendingExecutedBlock, FixedVector>, ::MaxBlobsPerBlock>, @@ -490,13 +455,6 @@ pub mod tests { }; } - generate_tests!( - processing_components_tests, - ProcessingComponents::, - kzg_commitments, - processing_blobs, - setup_processing_components - ); generate_tests!( pending_components_tests, PendingComponents, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 50e07987fdf..db26d70982a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -305,6 +305,11 @@ impl Critical { Ok(()) } + /// Returns true if the block root is known, without altering the LRU ordering + pub fn has_block(&self, block_root: &Hash256) -> bool { + self.in_memory.peek(block_root).is_some() || self.store_keys.get(block_root).is_some() + } + /// This only checks for the blobs in memory pub fn peek_blob( &self, @@ -322,6 +327,13 @@ impl Critical { } } + pub fn peek_pending_components( + &self, + block_root: &Hash256, + ) -> Option<&PendingComponents> { + self.in_memory.peek(block_root) + } + /// Puts the pending components in the LRU cache. If the cache /// is at capacity, the LRU entry is written to the store first pub fn put_pending_components( @@ -409,6 +421,11 @@ impl OverflowLRUCache { }) } + /// Returns true if the block root is known, without altering the LRU ordering + pub fn has_block(&self, block_root: &Hash256) -> bool { + self.critical.read().has_block(block_root) + } + /// Fetch a blob from the cache without affecting the LRU ordering pub fn peek_blob( &self, @@ -425,6 +442,14 @@ impl OverflowLRUCache { } } + pub fn with_pending_components>) -> R>( + &self, + block_root: &Hash256, + f: F, + ) -> R { + f(self.critical.read().peek_pending_components(block_root)) + } + pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs deleted file mode 100644 index e09b3083be5..00000000000 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::data_availability_checker::AvailabilityView; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; -use types::beacon_block_body::KzgCommitmentOpts; -use types::{EthSpec, Hash256, SignedBeaconBlock}; - -/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp -/// a view of what we have and what we require. This cache serves a slightly different purpose than -/// gossip caches because it allows us to process duplicate blobs that are valid in gossip. -/// See `AvailabilityView`'s trait definition. -#[derive(Default)] -pub struct ProcessingCache { - processing_cache: HashMap>, -} - -impl ProcessingCache { - pub fn get(&self, block_root: &Hash256) -> Option<&ProcessingComponents> { - self.processing_cache.get(block_root) - } - pub fn entry(&mut self, block_root: Hash256) -> Entry<'_, Hash256, ProcessingComponents> { - self.processing_cache.entry(block_root) - } - pub fn remove(&mut self, block_root: &Hash256) { - self.processing_cache.remove(block_root); - } - pub fn has_block(&self, block_root: &Hash256) -> bool { - self.processing_cache - .get(block_root) - .map_or(false, |b| b.block_exists()) - } - pub fn len(&self) -> usize { - self.processing_cache.len() - } -} - -#[derive(Default, Debug, Clone)] -pub struct ProcessingComponents { - /// Blobs required for a block can only be known if we have seen the block. So `Some` here - /// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure - /// out whether incoming blobs actually match the block. - pub block: Option>>, - /// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See - /// `AvailabilityView`'s trait definition for more details. - pub blob_commitments: KzgCommitmentOpts, -} - -impl ProcessingComponents { - pub fn new() -> Self { - Self::default() - } -} - -// Not safe for use outside of tests as this always required a slot. -#[cfg(test)] -impl ProcessingComponents { - pub fn empty(_block_root: Hash256) -> Self { - Self { - block: None, - blob_commitments: KzgCommitmentOpts::::default(), - } - } -} diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index b40f46da221..6cb0b6fd766 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -304,6 +304,14 @@ lazy_static! { "Count of times the early attester cache returns an attestation" ); + pub static ref BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE: Result = try_create_int_gauge( + "beacon_reqresp_pre_import_cache_size", + "Current count of items of the reqresp pre import cache" + ); + pub static ref BEACON_REQRESP_PRE_IMPORT_CACHE_HITS: Result = try_create_int_counter( + "beacon_reqresp_pre_import_cache_hits", + "Count of times the reqresp pre import cache returns an item" + ); } // Second lazy-static block is used to account for macro recursion limit. @@ -1129,15 +1137,9 @@ lazy_static! { Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) ); - /* * Data Availability cache metrics */ - pub static ref DATA_AVAILABILITY_PROCESSING_CACHE_SIZE: Result = - try_create_int_gauge( - "data_availability_processing_cache_size", - "Number of entries in the data availability processing cache." - ); pub static ref DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: Result = try_create_int_gauge( "data_availability_overflow_memory_block_cache_size", @@ -1196,11 +1198,12 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { ) } - let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); set_gauge_by_usize( - &DATA_AVAILABILITY_PROCESSING_CACHE_SIZE, - da_checker_metrics.processing_cache_size, + &BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE, + beacon_chain.reqresp_pre_import_cache.read().len(), ); + + let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); set_gauge_by_usize( &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, da_checker_metrics.block_cache_size, diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 7f3849c161b..a312f6e970a 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -278,19 +278,11 @@ impl SingleBlockLookup { if let Some(components) = self.child_components.as_ref() { self.da_checker.get_missing_blob_ids( block_root, - &components.downloaded_block, + components.downloaded_block.as_ref().map(|b| b.as_ref()), &components.downloaded_blobs, ) } else { - let Some(processing_components) = self.da_checker.get_processing_components(block_root) - else { - return MissingBlobs::new_without_block(block_root, self.da_checker.is_deneb()); - }; - self.da_checker.get_missing_blob_ids( - block_root, - &processing_components.block, - &processing_components.blob_commitments, - ) + self.da_checker.get_missing_blob_ids_with(block_root) } }