From dbcd7d11c56d0c1b555ccf653bf04aa2fbb9d03d Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 24 May 2024 20:00:05 +0200 Subject: [PATCH 1/4] Ensure lookup sync checks caches correctly --- beacon_node/beacon_chain/src/beacon_chain.rs | 43 ++++++++++++++++ .../src/data_availability_checker.rs | 9 +++- .../overflow_lru_cache.rs | 29 ++++++++--- beacon_node/beacon_chain/src/lib.rs | 7 +-- .../network/src/sync/network_context.rs | 50 ++++++++++--------- 5 files changed, 102 insertions(+), 36 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9584b2e29f1..3dcb1b01c99 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -337,6 +337,22 @@ struct PartialBeaconBlock { bls_to_execution_changes: Vec, } +pub enum BlockProcessStatus { + /// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice. + Unknown, + /// Block is currently processing but not yet validated. + NotValidated { + slot: Slot, + blob_kzg_commitments_count: usize, + }, + /// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting + /// missing block components. + ExecutionValidated { + slot: Slot, + blob_kzg_commitments_count: usize, + }, +} + pub type LightClientProducerEvent = (Hash256, Slot, SyncAggregate); pub type BeaconForkChoice = ForkChoice< @@ -1237,6 +1253,33 @@ impl BeaconChain { Ok(self.store.get_blinded_block(block_root)?) } + /// Return the status of a block as it progresses through the various caches of the beacon + /// chain. Used by sync to learn the status of a block and prevent repeated downloads / + /// processing attempts. + pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus { + if let Some(execution_valid_block) = self + .data_availability_checker + .get_execution_valid_block_summary(block_root) + { + return BlockProcessStatus::ExecutionValidated { + slot: execution_valid_block.slot, + blob_kzg_commitments_count: execution_valid_block.blob_kzg_commitments_count, + }; + } + + if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) { + // A block is on the `reqresp_pre_import_cache` but NOT in the + // `data_availability_checker` only if it is actively processing. We can expect a future + // event with the result of processing + return BlockProcessStatus::NotValidated { + slot: block.slot(), + blob_kzg_commitments_count: block.num_expected_blobs(), + }; + } + + BlockProcessStatus::Unknown + } + /// Returns the state at the given root, if any. /// /// ## Errors diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index a981d31e554..d6f2d81de37 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -23,6 +23,8 @@ mod state_lru_cache; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; +use self::overflow_lru_cache::ExecutionValidBlockSummary; + /// The LRU Cache stores `PendingComponents` which can store up to /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this @@ -86,9 +88,12 @@ impl DataAvailabilityChecker { /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. - pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool { + pub fn get_execution_valid_block_summary( + &self, + block_root: &Hash256, + ) -> Option { self.availability_cache - .has_execution_valid_block(block_root) + .get_execution_valid_block_summary(block_root) } /// Return the required blobs `block_root` expects if the block is currenlty in the cache. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 2e3c4aac558..cfe12ba60dc 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; +use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, Slot}; /// This represents the components of a partially available block /// @@ -57,6 +57,11 @@ pub struct PendingComponents { pub executed_block: Option>, } +pub struct ExecutionValidBlockSummary { + pub slot: Slot, + pub blob_kzg_commitments_count: usize, +} + impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -544,12 +549,22 @@ impl OverflowLRUCache { } /// Returns true if the block root is known, without altering the LRU ordering - pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool { - if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) { - pending_components.executed_block.is_some() - } else { - false - } + pub fn get_execution_valid_block_summary( + &self, + block_root: &Hash256, + ) -> Option { + self.critical + .read() + .peek_pending_components(block_root) + .and_then(|pending_components| { + pending_components + .executed_block + .as_ref() + .map(|block| ExecutionValidBlockSummary { + slot: block.as_block().slot(), + blob_kzg_commitments_count: block.num_blobs_expected(), + }) + }) } /// Fetch a blob from the cache without affecting the LRU ordering diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 221bb8b2922..f419429e090 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -62,9 +62,10 @@ pub mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse, - BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification, - StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, + ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, + ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index fa1f50cee06..12b51a1341d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -12,7 +12,7 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::manager::{BlockProcessType, SingleLookupReqId}; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; @@ -337,26 +337,17 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, ) -> Result { - // da_checker includes block that are execution verified, but are missing components - if self - .chain - .data_availability_checker - .has_execution_valid_block(&block_root) - { - return Ok(LookupRequestResult::NoRequestNeeded); - } - - // reqresp_pre_import_cache includes blocks that may not be yet execution verified - if self - .chain - .reqresp_pre_import_cache - .read() - .contains_key(&block_root) - { - // A block is on the `reqresp_pre_import_cache` but NOT in the - // `data_availability_checker` only if it is actively processing. We can expect a future - // event with the result of processing - return Ok(LookupRequestResult::Pending); + match self.chain.get_block_process_status(&block_root) { + // Unknown block, continue request to download + BlockProcessStatus::Unknown => {} + // Block is known are currently processing, expect a future event with the result of + // processing. + BlockProcessStatus::NotValidated { .. } => return Ok(LookupRequestResult::Pending), + // Block is fully validated. If it's not yet imported it's waiting for missing block + // components. Consider this request completed and do nothing. + BlockProcessStatus::ExecutionValidated { .. } => { + return Ok(LookupRequestResult::NoRequestNeeded) + } } let req_id = self.next_id(); @@ -401,9 +392,20 @@ impl SyncNetworkContext { downloaded_block_expected_blobs: Option, ) -> Result { let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) + // If the block is already being processed or fully validated, retrieve how many blobs + // it expects. Consider any stage of the block. If the block root has been validated, we + // can assert that this is the correct value of `blob_kzg_commitments_count`. + match self.chain.get_block_process_status(&block_root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated { + blob_kzg_commitments_count, + .. + } + | BlockProcessStatus::ExecutionValidated { + blob_kzg_commitments_count, + .. + } => Some(blob_kzg_commitments_count), + } }) else { // Wait to download the block before downloading blobs. Then we can be sure that the // block has data, so there's no need to do "blind" requests for all possible blobs and From 7136412062efef3a6c92de6bb08ef01855e0db76 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 25 May 2024 01:01:23 +0200 Subject: [PATCH 2/4] Simplify BlockProcessStatus --- .../beacon_chain/src/beacon_block_streamer.rs | 17 ++++----- beacon_node/beacon_chain/src/beacon_chain.rs | 38 +++++++++---------- .../src/data_availability_checker.rs | 8 ++-- .../overflow_lru_cache.rs | 16 ++------ .../state_lru_cache.rs | 4 ++ beacon_node/beacon_chain/src/metrics.rs | 3 +- .../network/src/sync/network_context.rs | 10 +---- 7 files changed, 41 insertions(+), 55 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index 0c92b7c1f62..f0a68b6be55 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,4 +1,4 @@ -use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; +use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BlockProcessStatus}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; use slog::{crit, debug, error, Logger}; use std::collections::HashMap; @@ -410,15 +410,14 @@ impl BeaconBlockStreamer { fn check_caches(&self, root: Hash256) -> Option>> { if self.check_caches == CheckCaches::Yes { - self.beacon_chain - .reqresp_pre_import_cache - .read() - .get(&root) - .map(|block| { + match self.beacon_chain.get_block_process_status(&root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => { metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS); - block.clone() - }) - .or(self.beacon_chain.early_attester_cache.get_block(root)) + Some(block) + } + } } else { None } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3dcb1b01c99..0bc0a4a2d3d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -337,20 +337,18 @@ struct PartialBeaconBlock { bls_to_execution_changes: Vec, } -pub enum BlockProcessStatus { +pub enum BlockProcessStatus { /// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice. Unknown, /// Block is currently processing but not yet validated. - NotValidated { - slot: Slot, - blob_kzg_commitments_count: usize, - }, + NotValidated(Arc>), /// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting /// missing block components. - ExecutionValidated { - slot: Slot, - blob_kzg_commitments_count: usize, - }, + ExecutionValidated(Arc>), +} + +pub struct BeaconChainMetrics { + pub reqresp_pre_import_cache_len: usize, } pub type LightClientProducerEvent = (Hash256, Slot, SyncAggregate); @@ -1256,25 +1254,19 @@ impl BeaconChain { /// Return the status of a block as it progresses through the various caches of the beacon /// chain. Used by sync to learn the status of a block and prevent repeated downloads / /// processing attempts. - pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus { - if let Some(execution_valid_block) = self + pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus { + if let Some(block) = self .data_availability_checker - .get_execution_valid_block_summary(block_root) + .get_execution_valid_block(block_root) { - return BlockProcessStatus::ExecutionValidated { - slot: execution_valid_block.slot, - blob_kzg_commitments_count: execution_valid_block.blob_kzg_commitments_count, - }; + return BlockProcessStatus::ExecutionValidated(block); } if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) { // A block is on the `reqresp_pre_import_cache` but NOT in the // `data_availability_checker` only if it is actively processing. We can expect a future // event with the result of processing - return BlockProcessStatus::NotValidated { - slot: block.slot(), - blob_kzg_commitments_count: block.num_expected_blobs(), - }; + return BlockProcessStatus::NotValidated(block.clone()); } BlockProcessStatus::Unknown @@ -6673,6 +6665,12 @@ impl BeaconChain { ForkName::Base => Err(Error::UnsupportedFork), } } + + pub fn metrics(&self) -> BeaconChainMetrics { + BeaconChainMetrics { + reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(), + } + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index d6f2d81de37..7efb6884d0f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -23,8 +23,6 @@ mod state_lru_cache; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; -use self::overflow_lru_cache::ExecutionValidBlockSummary; - /// The LRU Cache stores `PendingComponents` which can store up to /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this @@ -88,12 +86,12 @@ impl DataAvailabilityChecker { /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. - pub fn get_execution_valid_block_summary( + pub fn get_execution_valid_block( &self, block_root: &Hash256, - ) -> Option { + ) -> Option>> { self.availability_cache - .get_execution_valid_block_summary(block_root) + .get_execution_valid_block(block_root) } /// Return the required blobs `block_root` expects if the block is currenlty in the cache. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index cfe12ba60dc..e350181c867 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, Slot}; +use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// This represents the components of a partially available block /// @@ -57,11 +57,6 @@ pub struct PendingComponents { pub executed_block: Option>, } -pub struct ExecutionValidBlockSummary { - pub slot: Slot, - pub blob_kzg_commitments_count: usize, -} - impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -549,10 +544,10 @@ impl OverflowLRUCache { } /// Returns true if the block root is known, without altering the LRU ordering - pub fn get_execution_valid_block_summary( + pub fn get_execution_valid_block( &self, block_root: &Hash256, - ) -> Option { + ) -> Option>> { self.critical .read() .peek_pending_components(block_root) @@ -560,10 +555,7 @@ impl OverflowLRUCache { pending_components .executed_block .as_ref() - .map(|block| ExecutionValidBlockSummary { - slot: block.as_block().slot(), - blob_kzg_commitments_count: block.num_blobs_expected(), - }) + .map(|block| block.block_cloned()) }) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index f8a243bd9e8..9775d54c024 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -37,6 +37,10 @@ impl DietAvailabilityPendingExecutedBlock { &self.block } + pub fn block_cloned(&self) -> Arc> { + self.block.clone() + } + pub fn num_blobs_expected(&self) -> usize { self.block .message() diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 3b2453c3112..4ceaf675cec 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1192,6 +1192,7 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { } let attestation_stats = beacon_chain.op_pool.attestation_stats(); + let chain_metrics = beacon_chain.metrics(); set_gauge_by_usize( &BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE, @@ -1200,7 +1201,7 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { set_gauge_by_usize( &BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE, - beacon_chain.reqresp_pre_import_cache.read().len(), + chain_metrics.reqresp_pre_import_cache_len, ); let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 12b51a1341d..1b6c820a2f6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -397,14 +397,8 @@ impl SyncNetworkContext { // can assert that this is the correct value of `blob_kzg_commitments_count`. match self.chain.get_block_process_status(&block_root) { BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated { - blob_kzg_commitments_count, - .. - } - | BlockProcessStatus::ExecutionValidated { - blob_kzg_commitments_count, - .. - } => Some(blob_kzg_commitments_count), + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => Some(block.num_expected_blobs()), } }) else { // Wait to download the block before downloading blobs. Then we can be sure that the From 7c125b848569714a1fc922a5edc908e93dfa70ce Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 25 May 2024 01:32:02 +0200 Subject: [PATCH 3/4] Keep PendingComponents in da_checker during import_block --- beacon_node/beacon_chain/src/beacon_chain.rs | 14 ++++++ .../src/data_availability_checker.rs | 5 +++ .../overflow_lru_cache.rs | 45 +++++++++++++++---- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0bc0a4a2d3d..88edf6d2c6b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3328,6 +3328,20 @@ impl BeaconChain { "payload_verification_handle", ) .await??; + + // Remove block components from da_checker AFTER completing block import. Then we can assert + // the following invariant: + // > A valid unfinalized block is either in fork-choice or da_checker. + // + // If we remove the block when it becomes available, there's some time window during + // `import_block` where the block is nowhere. Consumers of the da_checker can handle the + // extend time a block may exist in the da_checker. + // + // If `import_block` errors (only errors with internal errors), the pending components will + // be pruned on data_availability_checker maintenance as finality advances. + self.data_availability_checker + .remove_pending_components(block_root); + Ok(AvailabilityProcessingStatus::Imported(block_root)) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 7efb6884d0f..8e8f2565c94 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -172,6 +172,11 @@ impl DataAvailabilityChecker { .put_pending_executed_block(executed_block) } + pub fn remove_pending_components(&self, block_root: Hash256) { + self.availability_cache + .remove_pending_components(block_root) + } + /// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may /// include the fully available block. /// diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index e350181c867..f7aec523d75 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -475,18 +475,18 @@ impl Critical { Ok(()) } - /// Removes and returns the pending_components corresponding to - /// the `block_root` or `None` if it does not exist - pub fn pop_pending_components( + /// Returns the pending_components corresponding to the `block_root` or `None` if it does not + /// exist + pub fn get_pending_components( &mut self, block_root: Hash256, store: &OverflowStore, ) -> Result>, AvailabilityCheckError> { - match self.in_memory.pop_entry(&block_root) { - Some((_, pending_components)) => Ok(Some(pending_components)), + match self.in_memory.get(&block_root) { + Some(pending_components) => Ok(Some(pending_components.clone())), None => { // not in memory, is it in the store? - if self.store_keys.remove(&block_root) { + if self.store_keys.contains(&block_root) { // We don't need to remove the data from the store as we have removed it from // `store_keys` so we won't go looking for it on disk. The maintenance thread // will remove it from disk the next time it runs. @@ -498,6 +498,21 @@ impl Critical { } } + /// Removes and returns the pending_components corresponding to + /// the `block_root` or `None` if it does not exist + pub fn remove_pending_components(&mut self, block_root: Hash256) { + match self.in_memory.pop_entry(&block_root) { + Some { .. } => {} + None => { + // not in memory, is it in the store? + // We don't need to remove the data from the store as we have removed it from + // `store_keys` so we won't go looking for it on disk. The maintenance thread + // will remove it from disk the next time it runs. + self.store_keys.remove(&block_root); + } + } + } + /// Returns the number of pending component entries in memory. pub fn num_blocks(&self) -> usize { self.in_memory.len() @@ -600,13 +615,18 @@ impl OverflowLRUCache { // Grab existing entry or create a new entry. let mut pending_components = write_lock - .pop_pending_components(block_root, &self.overflow_store)? + .get_pending_components(block_root, &self.overflow_store)? .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); if pending_components.is_available() { + write_lock.put_pending_components( + block_root, + pending_components.clone(), + &self.overflow_store, + )?; // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -638,7 +658,7 @@ impl OverflowLRUCache { // Grab existing entry or create a new entry. let mut pending_components = write_lock - .pop_pending_components(block_root, &self.overflow_store)? + .get_pending_components(block_root, &self.overflow_store)? .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the block. @@ -646,6 +666,11 @@ impl OverflowLRUCache { // Check if we have all components and entire set is consistent. if pending_components.is_available() { + write_lock.put_pending_components( + block_root, + pending_components.clone(), + &self.overflow_store, + )?; // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -661,6 +686,10 @@ impl OverflowLRUCache { } } + pub fn remove_pending_components(&self, block_root: Hash256) { + self.critical.write().remove_pending_components(block_root); + } + /// write all in memory objects to disk pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> { let maintenance_lock = self.maintenance_lock.lock(); From c051218a7bc9e93d0a81cc1031013abfbb074341 Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Fri, 31 May 2024 13:38:19 +0200 Subject: [PATCH 4/4] Fix tests with DA checker new eviction policy (#34) --- .../overflow_lru_cache.rs | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index f7aec523d75..adc1a1e202c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -475,18 +475,18 @@ impl Critical { Ok(()) } - /// Returns the pending_components corresponding to the `block_root` or `None` if it does not - /// exist - pub fn get_pending_components( + /// Removes and returns the pending_components corresponding to + /// the `block_root` or `None` if it does not exist + pub fn pop_pending_components( &mut self, block_root: Hash256, store: &OverflowStore, ) -> Result>, AvailabilityCheckError> { - match self.in_memory.get(&block_root) { - Some(pending_components) => Ok(Some(pending_components.clone())), + match self.in_memory.pop_entry(&block_root) { + Some((_, pending_components)) => Ok(Some(pending_components)), None => { // not in memory, is it in the store? - if self.store_keys.contains(&block_root) { + if self.store_keys.remove(&block_root) { // We don't need to remove the data from the store as we have removed it from // `store_keys` so we won't go looking for it on disk. The maintenance thread // will remove it from disk the next time it runs. @@ -615,7 +615,7 @@ impl OverflowLRUCache { // Grab existing entry or create a new entry. let mut pending_components = write_lock - .get_pending_components(block_root, &self.overflow_store)? + .pop_pending_components(block_root, &self.overflow_store)? .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the blobs. @@ -658,7 +658,7 @@ impl OverflowLRUCache { // Grab existing entry or create a new entry. let mut pending_components = write_lock - .get_pending_components(block_root, &self.overflow_store)? + .pop_pending_components(block_root, &self.overflow_store)? .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the block. @@ -1224,10 +1224,17 @@ mod test { matches!(availability, Availability::Available(_)), "block doesn't have blobs, should be available" ); + assert_eq!( + cache.critical.read().in_memory.len(), + 1, + "cache should still have block as it hasn't been imported yet" + ); + // remove the blob to simulate successful import + cache.remove_pending_components(root); assert_eq!( cache.critical.read().in_memory.len(), 0, - "cache should be empty because we don't have blobs" + "cache should be empty now that block has been imported" ); } else { assert!( @@ -1292,6 +1299,12 @@ mod test { "block should be available: {:?}", availability ); + assert!( + cache.critical.read().in_memory.len() == 1, + "cache should still have available block until import" + ); + // remove the blob to simulate successful import + cache.remove_pending_components(root); assert!( cache.critical.read().in_memory.is_empty(), "cache should be empty now that all components available" @@ -1407,6 +1420,8 @@ mod test { .expect("should put blob"); if blob_index == expected_blobs - 1 { assert!(matches!(availability, Availability::Available(_))); + // remove the block from the cache to simulate import + cache.remove_pending_components(roots[0]); } else { // the first block should be brought back into memory assert!(