Skip to content

Commit

Permalink
Ensure lookup sync checks caches correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 24, 2024
1 parent 3070cb7 commit dbcd7d1
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 36 deletions.
43 changes: 43 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,22 @@ struct PartialBeaconBlock<E: EthSpec> {
bls_to_execution_changes: Vec<SignedBlsToExecutionChange>,
}

pub enum BlockProcessStatus {
/// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice.
Unknown,
/// Block is currently processing but not yet validated.
NotValidated {
slot: Slot,
blob_kzg_commitments_count: usize,
},
/// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting
/// missing block components.
ExecutionValidated {
slot: Slot,
blob_kzg_commitments_count: usize,
},
}

pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);

pub type BeaconForkChoice<T> = ForkChoice<
Expand Down Expand Up @@ -1237,6 +1253,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get_blinded_block(block_root)?)
}

/// Return the status of a block as it progresses through the various caches of the beacon
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
/// processing attempts.
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus {
if let Some(execution_valid_block) = self
.data_availability_checker
.get_execution_valid_block_summary(block_root)
{
return BlockProcessStatus::ExecutionValidated {
slot: execution_valid_block.slot,
blob_kzg_commitments_count: execution_valid_block.blob_kzg_commitments_count,
};
}

if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return BlockProcessStatus::NotValidated {
slot: block.slot(),
blob_kzg_commitments_count: block.num_expected_blobs(),
};
}

BlockProcessStatus::Unknown
}

/// Returns the state at the given root, if any.
///
/// ## Errors
Expand Down
9 changes: 7 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod state_lru_cache;
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

use self::overflow_lru_cache::ExecutionValidBlockSummary;

/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
Expand Down Expand Up @@ -86,9 +88,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
pub fn get_execution_valid_block_summary(
&self,
block_root: &Hash256,
) -> Option<ExecutionValidBlockSummary> {
self.availability_cache
.has_execution_valid_block(block_root)
.get_execution_valid_block_summary(block_root)
}

/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256};
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, Slot};

/// This represents the components of a partially available block
///
Expand All @@ -57,6 +57,11 @@ pub struct PendingComponents<E: EthSpec> {
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}

pub struct ExecutionValidBlockSummary {
pub slot: Slot,
pub blob_kzg_commitments_count: usize,
}

impl<E: EthSpec> PendingComponents<E> {
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
Expand Down Expand Up @@ -544,12 +549,22 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
pending_components.executed_block.is_some()
} else {
false
}
pub fn get_execution_valid_block_summary(
&self,
block_root: &Hash256,
) -> Option<ExecutionValidBlockSummary> {
self.critical
.read()
.peek_pending_components(block_root)
.and_then(|pending_components| {
pending_components
.executed_block
.as_ref()
.map(|block| ExecutionValidBlockSummary {
slot: block.as_block().slot(),
blob_kzg_commitments_count: block.num_blobs_expected(),
})
})
}

/// Fetch a blob from the cache without affecting the LRU ordering
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ pub mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification,
StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus,
ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate,
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
};
pub use self::beacon_snapshot::BeaconSnapshot;
Expand Down
50 changes: 26 additions & 24 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
Expand Down Expand Up @@ -337,26 +337,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
// da_checker includes block that are execution verified, but are missing components
if self
.chain
.data_availability_checker
.has_execution_valid_block(&block_root)
{
return Ok(LookupRequestResult::NoRequestNeeded);
}

// reqresp_pre_import_cache includes blocks that may not be yet execution verified
if self
.chain
.reqresp_pre_import_cache
.read()
.contains_key(&block_root)
{
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return Ok(LookupRequestResult::Pending);
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => return Ok(LookupRequestResult::Pending),
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated { .. } => {
return Ok(LookupRequestResult::NoRequestNeeded)
}
}

let req_id = self.next_id();
Expand Down Expand Up @@ -401,9 +392,20 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
downloaded_block_expected_blobs: Option<usize>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| {
self.chain
.data_availability_checker
.num_expected_blobs(&block_root)
// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated {
blob_kzg_commitments_count,
..
}
| BlockProcessStatus::ExecutionValidated {
blob_kzg_commitments_count,
..
} => Some(blob_kzg_commitments_count),
}
}) else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
Expand Down

0 comments on commit dbcd7d1

Please sign in to comment.