diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index 4f4f8ed1fe0..6efe3ddf612 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -410,8 +410,10 @@ impl BeaconBlockStreamer { fn check_caches(&self, root: Hash256) -> Option>> { if self.check_caches == CheckCaches::Yes { self.beacon_chain - .data_availability_checker - .get_block(&root) + .reqresp_pre_import_cache + .read() + .get(&root) + .map(|block| block.clone()) .or(self.beacon_chain.early_attester_cache.get_block(root)) } else { None diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 58960ed8078..7291e6b9cd3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -361,6 +361,9 @@ pub type BeaconStore = Arc< >, >; +/// Cache gossip verified blocks to serve over ReqResp before they are imported +type ReqRespPreImportCache = HashMap>>; + /// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block /// operations and chooses a canonical head. pub struct BeaconChain { @@ -463,6 +466,8 @@ pub struct BeaconChain { pub(crate) attester_cache: Arc, /// A cache used when producing attestations whilst the head block is still being imported. pub early_attester_cache: EarlyAttesterCache, + /// Cache gossip verified blocks to serve over ReqResp before they are imported + pub reqresp_pre_import_cache: Arc>>, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. @@ -2929,8 +2934,6 @@ impl BeaconChain { } } - self.data_availability_checker - .notify_gossip_blob(blob.slot(), block_root, &blob); let r = self.check_gossip_blob_availability_and_import(blob).await; self.remove_notified(&block_root, r) } @@ -2987,14 +2990,36 @@ impl BeaconChain { } } - self.data_availability_checker - .notify_rpc_blobs(slot, block_root, &blobs); let r = self .check_rpc_blob_availability_and_import(slot, block_root, blobs) .await; self.remove_notified(&block_root, r) } + pub async fn process_rpc_data_column( + self: &Arc, + data_column: Arc>, + ) -> Result> { + let block_root = data_column.block_root(); + + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its blobs again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown); + } + + // TODO(das) emit data_column SEE event + + let r = self + .check_rpc_data_column_availability_and_import(data_column) + .await; + self.remove_notified(&block_root, r) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3005,7 +3030,7 @@ impl BeaconChain { let has_missing_components = matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); if !has_missing_components { - self.data_availability_checker.remove_notified(block_root); + self.reqresp_pre_import_cache.write().remove(block_root); } r } @@ -3018,8 +3043,10 @@ impl BeaconChain { unverified_block: B, notify_execution_layer: NotifyExecutionLayer, ) -> Result> { - self.data_availability_checker - .notify_block(block_root, unverified_block.block_cloned()); + self.reqresp_pre_import_cache + .write() + .insert(block_root, unverified_block.block_cloned()); + let r = self .process_block(block_root, unverified_block, notify_execution_layer, || { Ok(()) @@ -3286,6 +3313,39 @@ impl BeaconChain { self.process_availability(slot, availability).await } + async fn check_rpc_data_column_availability_and_import( + self: &Arc, + data_column: Arc>, + ) -> Result> { + let block_root = data_column.block_root(); + let slot = data_column.slot(); + + // Need to scope this to ensure the lock is dropped before calling `process_availability` + // Even an explicit drop is not enough to convince the borrow checker. + { + let mut slashable_cache = self.observed_slashable.write(); + let header = &data_column.signed_block_header; + if verify_header_signature::>(self, &header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header.clone()); + } + } + } + + let availability = self + .data_availability_checker + .put_rpc_data_column(block_root, data_column)?; + + self.process_availability(slot, availability).await + } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a1d2706726b..a23baa8e4de 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -2,7 +2,7 @@ use crate::beacon_chain::{ CanonicalHead, LightClientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY, }; use crate::beacon_proposer_cache::BeaconProposerCache; -use crate::data_availability_checker::DataAvailabilityChecker; +use crate::data_availability_checker::{CustodyConfig, DataAvailabilityChecker, NodeIdRaw}; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; @@ -104,6 +104,7 @@ pub struct BeaconChainBuilder { trusted_setup: Option, task_executor: Option, validator_monitor_config: Option, + node_id: Option, } impl @@ -145,6 +146,7 @@ where trusted_setup: None, task_executor: None, validator_monitor_config: None, + node_id: None, } } @@ -687,6 +689,11 @@ where self } + pub fn node_id(mut self, node_id: NodeIdRaw) -> Self { + self.node_id = Some(node_id); + self + } + /// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied. /// /// An error will be returned at runtime if all required parameters have not been configured. @@ -880,6 +887,7 @@ where let head_for_snapshot_cache = head_snapshot.clone(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); let shuffling_cache_size = self.chain_config.shuffling_cache_size; + let custody_requirement = self.chain_config.custody_requirement; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -971,6 +979,7 @@ where validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), early_attester_cache: <_>::default(), + reqresp_pre_import_cache: <_>::default(), light_client_server_cache: LightClientServerCache::new(), light_client_server_tx: self.light_client_server_tx, shutdown_sender: self @@ -982,8 +991,18 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, &log, self.spec) - .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, + DataAvailabilityChecker::new( + slot_clock, + kzg.clone(), + store, + &log, + self.spec, + CustodyConfig::new( + self.node_id.ok_or("Cannot build without a node ID")?, + custody_requirement, + ), + ) + .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, ), kzg, block_production_state: Arc::new(Mutex::new(None)), diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 23e17a6efad..aff42417909 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -85,6 +85,8 @@ pub struct ChainConfig { pub epochs_per_migration: u64, /// When set to true Light client server computes and caches state proofs for serving updates pub enable_light_client_server: bool, + /// Columns to custody for PeerDAS, minimum 2 + pub custody_requirement: u64, } impl Default for ChainConfig { @@ -117,6 +119,7 @@ impl Default for ChainConfig { progressive_balances_mode: ProgressiveBalancesMode::Fast, epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, enable_light_client_server: false, + custody_requirement: 2, } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9a4f5eea048..d3d3b2b4592 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,42 +2,62 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -pub use crate::data_availability_checker::availability_view::{ - AvailabilityView, GetCommitment, GetCommitments, -}; pub use crate::data_availability_checker::child_components::ChildComponents; use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache; -use crate::data_availability_checker::processing_cache::ProcessingCache; +pub use crate::data_availability_checker::overflow_lru_cache::{ + compute_custody_requirements, compute_sample_requirements, +}; use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; -use parking_lot::RwLock; -pub use processing_cache::ProcessingComponents; use slasher::test_utils::E; use slog::{debug, error, Logger}; use slot_clock::SlotClock; +use ssz_types::FixedVector; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use task_executor::TaskExecutor; -use types::beacon_block_body::KzgCommitmentOpts; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, }; -mod availability_view; mod child_components; mod error; mod overflow_lru_cache; -mod processing_cache; mod state_lru_cache; -use crate::data_column_verification::{verify_kzg_for_data_column_list, GossipVerifiedDataColumn}; +use crate::data_column_verification::{ + verify_kzg_for_data_column_list, GossipVerifiedDataColumn, KzgVerifiedDataColumn, +}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; -use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList}; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier, DataColumnSidecarList}; use types::non_zero_usize::new_non_zero_usize; +#[derive(Clone, Copy)] +pub struct NodeIdRaw(pub [u8; 32]); + +pub struct CustodyConfig { + node_id: NodeIdRaw, + custody_requirement: u64, +} + +impl CustodyConfig { + pub fn new(node_id: NodeIdRaw, custody_requirement: u64) -> Self { + Self { + node_id, + custody_requirement, + } + } +} + +impl From<[u8; 32]> for NodeIdRaw { + fn from(value: [u8; 32]) -> Self { + Self(value) + } +} + /// The LRU Cache stores `PendingComponents` which can store up to /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this @@ -53,7 +73,6 @@ pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get(); /// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as /// checking whether a "availability check" is required at all. pub struct DataAvailabilityChecker { - processing_cache: RwLock>, availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Option>, @@ -89,10 +108,11 @@ impl DataAvailabilityChecker { store: BeaconStore, log: &Logger, spec: ChainSpec, + custody_config: CustodyConfig, ) -> Result { - let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + let overflow_cache = + OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone(), custody_config)?; Ok(Self { - processing_cache: <_>::default(), availability_cache: Arc::new(overflow_cache), slot_clock, log: log.clone(), @@ -101,27 +121,49 @@ impl DataAvailabilityChecker { }) } - /// Checks if the given block root is cached. + pub fn get_custody_config(&self) -> &CustodyConfig { + self.availability_cache.get_custody_config() + } + + /// Return this node's custody column requirements at `slot` + pub fn custody_columns_at_slot(&self, slot: Slot) -> Vec { + self.availability_cache.custody_columns_at_slot(slot) + } + + /// Checks if the block root is currenlty in the availability cache awaiting processing because + /// of missing components. + pub fn block_slot(&self, block_root: &Hash256) -> Option { + self.availability_cache.block_slot(block_root) + } + + /// Checks if the block root is currenlty in the availability cache awaiting processing because + /// of missing components. pub fn has_block(&self, block_root: &Hash256) -> bool { - self.processing_cache.read().has_block(block_root) + self.availability_cache.has_block(block_root) } - /// Get the processing info for a block. - pub fn get_processing_components( - &self, - block_root: Hash256, - ) -> Option> { - self.processing_cache.read().get(&block_root).cloned() + pub fn get_missing_blob_ids_with(&self, block_root: Hash256) -> MissingBlobs { + self.availability_cache + .with_pending_components(&block_root, |pending_components| { + self.get_missing_blob_ids( + block_root, + &pending_components + .and_then(|p| p.executed_block.clone()) + .map(|b| b.as_block_cloned()), + &pending_components.map(|p| p.verified_blobs.clone()), + ) + }) } /// A `None` indicates blobs are not required. /// /// If there's no block, all possible ids will be returned that don't exist in the given blobs. /// If there no blobs, all possible ids will be returned. - pub fn get_missing_blob_ids>( + pub fn get_missing_blob_ids( &self, block_root: Hash256, - availability_view: &V, + block: &Option>>, + blobs: &Option, ::MaxBlobsPerBlock>>, ) -> MissingBlobs { let Some(current_slot) = self.slot_clock.now_or_genesis() else { error!( @@ -134,54 +176,35 @@ impl DataAvailabilityChecker { let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); if self.da_check_required_for_epoch(current_epoch) { - match availability_view.get_cached_block() { - Some(cached_block) => { - let block_commitments = cached_block.get_commitments(); - let blob_commitments = availability_view.get_cached_blobs(); - - let num_blobs_expected = block_commitments.len(); - let mut blob_ids = Vec::with_capacity(num_blobs_expected); - - // Zip here will always limit the number of iterations to the size of - // `block_commitment` because `blob_commitments` will always be populated - // with `Option` values up to `MAX_BLOBS_PER_BLOCK`. - for (index, (block_commitment, blob_commitment_opt)) in block_commitments - .into_iter() - .zip(blob_commitments.iter()) - .enumerate() - { - // Always add a missing blob. - let Some(blob_commitment) = blob_commitment_opt else { - blob_ids.push(BlobIdentifier { - block_root, - index: index as u64, - }); - continue; - }; - - let blob_commitment = *blob_commitment.get_commitment(); - - // Check for consistency, but this shouldn't happen, an availability view - // should guaruntee consistency. - if blob_commitment != block_commitment { - error!(self.log, - "Inconsistent availability view"; - "block_root" => ?block_root, - "block_commitment" => ?block_commitment, - "blob_commitment" => ?blob_commitment, - "index" => index - ); - blob_ids.push(BlobIdentifier { - block_root, - index: index as u64, - }); - } - } - MissingBlobs::KnownMissing(blob_ids) - } - None => { - MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::(block_root)) + if let (Some(block), Some(blobs)) = (block, blobs) { + let block_commitments = block + .message() + .body() + .blob_kzg_commitments() + .ok() + .cloned() + .unwrap_or_default(); + + let num_blobs_expected = block_commitments.len(); + let mut blob_ids = Vec::with_capacity(num_blobs_expected); + + // Zip here will always limit the number of iterations to the size of + // `block_commitment` because `blob_commitments` will always be populated + // with `Option` values up to `MAX_BLOBS_PER_BLOCK`. + for (index, (_, blob_commitment_opt)) in + block_commitments.into_iter().zip(blobs.iter()).enumerate() + { + // Always add a missing blob. + if blob_commitment_opt.is_none() { + blob_ids.push(BlobIdentifier { + block_root, + index: index as u64, + }); + }; } + MissingBlobs::KnownMissing(blob_ids) + } else { + MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::(block_root)) } } else { MissingBlobs::BlobsNotRequired @@ -196,14 +219,6 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } - /// Get a block from the availability cache. Includes any blocks we are currently processing. - pub fn get_block(&self, block_root: &Hash256) -> Option>> { - self.processing_cache - .read() - .get(block_root) - .and_then(|cached| cached.block.clone()) - } - /// Get a data column from the availability cache. pub fn get_data_column( &self, @@ -230,6 +245,23 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, verified_blobs) } + pub fn put_rpc_data_column( + &self, + block_root: Hash256, + data_column: Arc>, + ) -> Result, AvailabilityCheckError> { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + // TODO(das): batch verify data columns + let verified_data_column = KzgVerifiedDataColumn::new(data_column.clone(), kzg) + .map_err(AvailabilityCheckError::Kzg)?; + + self.availability_cache + .put_kzg_verified_data_column(block_root, verified_data_column) + } + /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. @@ -252,9 +284,9 @@ impl DataAvailabilityChecker { &self, gossip_data_column: GossipVerifiedDataColumn, ) -> Result, AvailabilityCheckError> { - self.availability_cache.put_kzg_verified_data_columns( + self.availability_cache.put_kzg_verified_data_column( gossip_data_column.block_root(), - vec![gossip_data_column.into_inner()], + gossip_data_column.into_inner(), ) } @@ -287,7 +319,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: None, - data_columns: None, + custody_data_columns: None, })) } } @@ -315,7 +347,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: verified_blobs, - data_columns: verified_data_column, + custody_data_columns: verified_data_column, })) } } @@ -361,7 +393,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: None, - data_columns: None, + custody_data_columns: None, })) } } @@ -377,7 +409,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: verified_blobs, - data_columns: verified_data_columns, + custody_data_columns: verified_data_columns, })) } } @@ -392,71 +424,6 @@ impl DataAvailabilityChecker { block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch()) } - /// Adds a block to the processing cache. This block's commitments are unverified but caching - /// them here is useful to avoid duplicate downloads of blocks, as well as understanding - /// our blob download requirements. We will also serve this over RPC. - pub fn notify_block(&self, block_root: Hash256, block: Arc>) { - let slot = block.slot(); - self.processing_cache - .write() - .entry(block_root) - .or_insert_with(|| ProcessingComponents::new(slot)) - .merge_block(block); - } - - /// Add a single blob commitment to the processing cache. This commitment is unverified but caching - /// them here is useful to avoid duplicate downloads of blobs, as well as understanding - /// our block and blob download requirements. - pub fn notify_gossip_blob( - &self, - slot: Slot, - block_root: Hash256, - blob: &GossipVerifiedBlob, - ) { - let index = blob.index(); - let commitment = blob.kzg_commitment(); - self.processing_cache - .write() - .entry(block_root) - .or_insert_with(|| ProcessingComponents::new(slot)) - .merge_single_blob(index as usize, commitment); - } - - /// Adds blob commitments to the processing cache. These commitments are unverified but caching - /// them here is useful to avoid duplicate downloads of blobs, as well as understanding - /// our block and blob download requirements. - pub fn notify_rpc_blobs( - &self, - slot: Slot, - block_root: Hash256, - blobs: &FixedBlobSidecarList, - ) { - let mut commitments = KzgCommitmentOpts::::default(); - for blob in blobs.iter().flatten() { - if let Some(commitment) = commitments.get_mut(blob.index as usize) { - *commitment = Some(blob.kzg_commitment); - } - } - self.processing_cache - .write() - .entry(block_root) - .or_insert_with(|| ProcessingComponents::new(slot)) - .merge_blobs(commitments); - } - - /// Clears the block and all blobs from the processing cache for a give root if they exist. - pub fn remove_notified(&self, block_root: &Hash256) { - self.processing_cache.write().remove(block_root) - } - - /// Gather all block roots for which we are not currently processing all components for the - /// given slot. - pub fn incomplete_processing_components(&self, slot: Slot) -> Vec { - self.processing_cache - .read() - .incomplete_processing_components(slot) - } - /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { @@ -498,7 +465,6 @@ impl DataAvailabilityChecker { /// Collects metrics from the data availability checker. pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { DataAvailabilityCheckerMetrics { - processing_cache_size: self.processing_cache.read().len(), num_store_entries: self.availability_cache.num_store_entries(), state_cache_size: self.availability_cache.state_cache_size(), block_cache_size: self.availability_cache.block_cache_size(), @@ -508,7 +474,6 @@ impl DataAvailabilityChecker { /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { - pub processing_cache_size: usize, pub num_store_entries: usize, pub state_cache_size: usize, pub block_cache_size: usize, @@ -604,7 +569,7 @@ pub struct AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, - data_columns: Option>, + custody_data_columns: Option>, } impl AvailableBlock { @@ -612,13 +577,13 @@ impl AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, - data_columns: Option>, + custody_data_columns: Option>, ) -> Self { Self { block_root, block, blobs, - data_columns, + custody_data_columns, } } @@ -633,8 +598,8 @@ impl AvailableBlock { self.blobs.as_ref() } - pub fn data_columns(&self) -> Option<&DataColumnSidecarList> { - self.data_columns.as_ref() + pub fn custody_data_columns(&self) -> Option<&DataColumnSidecarList> { + self.custody_data_columns.as_ref() } #[allow(clippy::type_complexity)] @@ -650,9 +615,9 @@ impl AvailableBlock { block_root, block, blobs, - data_columns, + custody_data_columns, } = self; - (block_root, block, blobs, data_columns) + (block_root, block, blobs, custody_data_columns) } } @@ -735,3 +700,32 @@ impl Into> for MissingBlobs { } } } + +#[derive(Debug, Clone)] +pub enum MissingDataColumns { + /// We know for certain we must fetch this column ids + KnownMissing(Vec), + /// We don't know yet the full list of column ids to fetch + KnownMissingIncomplete(Vec), + /// Not required. + NotRequired, +} + +impl MissingDataColumns { + pub fn is_empty(&self) -> bool { + match self { + MissingDataColumns::KnownMissing(v) => v.is_empty(), + MissingDataColumns::KnownMissingIncomplete(_) => false, + MissingDataColumns::NotRequired => true, + } + } + + pub fn indices(&self) -> Vec { + match self { + MissingDataColumns::KnownMissing(v) | MissingDataColumns::KnownMissingIncomplete(v) => { + v.iter().map(|id| id.index).collect() + } + MissingDataColumns::NotRequired => vec![], + } + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs deleted file mode 100644 index f79f28b1cad..00000000000 --- a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs +++ /dev/null @@ -1,624 +0,0 @@ -use super::child_components::ChildComponents; -use super::state_lru_cache::DietAvailabilityPendingExecutedBlock; -use crate::blob_verification::KzgVerifiedBlob; -use crate::block_verification_types::AsBlock; -use crate::data_availability_checker::overflow_lru_cache::PendingComponents; -use crate::data_availability_checker::ProcessingComponents; -use crate::data_column_verification::KzgVerifiedDataColumn; -use kzg::KzgCommitment; -use ssz_types::FixedVector; -use std::sync::Arc; -use types::beacon_block_body::KzgCommitments; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; - -/// Defines an interface for managing data availability with two key invariants: -/// -/// 1. If we haven't seen a block yet, we will insert the first blob for a given (block_root, index) -/// but we won't insert subsequent blobs for the same (block_root, index) if they have a different -/// commitment. -/// 2. On block insertion, any non-matching blob commitments are evicted. -/// -/// Types implementing this trait can be used for validating and managing availability -/// of blocks and blobs in a cache-like data structure. -pub trait AvailabilityView { - /// The type representing a block in the implementation. - type BlockType: GetCommitments; - - /// The type representing a blob in the implementation. Must implement `Clone`. - type BlobType: Clone + GetCommitment; - - /// The type representing a data column in the implementation. - type DataColumnType: Clone; - - /// Returns an immutable reference to the cached block. - fn get_cached_block(&self) -> &Option; - - /// Returns an immutable reference to the fixed vector of cached blobs. - fn get_cached_blobs(&self) -> &FixedVector, E::MaxBlobsPerBlock>; - - /// Returns an immutable reference to the fixed vector of cached data columns. - fn get_cached_data_columns( - &self, - ) -> &FixedVector, E::DataColumnCount>; - - /// Returns a mutable reference to the cached block. - fn get_cached_block_mut(&mut self) -> &mut Option; - - /// Returns a mutable reference to the fixed vector of cached blobs. - fn get_cached_blobs_mut( - &mut self, - ) -> &mut FixedVector, E::MaxBlobsPerBlock>; - - /// Returns a mutable reference to the fixed vector of cached data columns. - fn get_cached_data_columns_mut( - &mut self, - ) -> &mut FixedVector, E::DataColumnCount>; - - /// Checks if a block exists in the cache. - /// - /// Returns: - /// - `true` if a block exists. - /// - `false` otherwise. - fn block_exists(&self) -> bool { - self.get_cached_block().is_some() - } - - /// Checks if a blob exists at the given index in the cache. - /// - /// Returns: - /// - `true` if a blob exists at the given index. - /// - `false` otherwise. - fn blob_exists(&self, blob_index: usize) -> bool { - self.get_cached_blobs() - .get(blob_index) - .map(|b| b.is_some()) - .unwrap_or(false) - } - - /// Checks if a data column exists at the given index in the cache. - /// - /// Returns: - /// - `true` if a data column exists at the given index. - /// - `false` otherwise. - fn data_column_exists(&self, data_colum_index: usize) -> bool { - self.get_cached_data_columns() - .get(data_colum_index) - .map(|d| d.is_some()) - .unwrap_or(false) - } - - /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a - /// block. - /// - /// This corresponds to the number of commitments that are present in a block. - fn num_expected_blobs(&self) -> Option { - self.get_cached_block() - .as_ref() - .map(|b| b.get_commitments().len()) - } - - /// Returns the number of blobs that have been received and are stored in the cache. - fn num_received_blobs(&self) -> usize { - self.get_cached_blobs().iter().flatten().count() - } - - /// Inserts a block into the cache. - fn insert_block(&mut self, block: Self::BlockType) { - *self.get_cached_block_mut() = Some(block) - } - - /// Inserts a blob at a specific index in the cache. - /// - /// Existing blob at the index will be replaced. - fn insert_blob_at_index(&mut self, blob_index: usize, blob: Self::BlobType) { - if let Some(b) = self.get_cached_blobs_mut().get_mut(blob_index) { - *b = Some(blob); - } - } - - /// Inserts a data column at a specific index in the cache. - /// - /// Existing data column at the index will be replaced. - fn insert_data_column_at_index( - &mut self, - data_column_index: usize, - data_column: Self::DataColumnType, - ) { - if let Some(b) = self - .get_cached_data_columns_mut() - .get_mut(data_column_index) - { - *b = Some(data_column); - } - } - - /// Merges a given set of data columns into the cache. - /// - /// Data columns are only inserted if: - /// 1. The data column entry at the index is empty and no block exists. - /// 2. The block exists and its commitments matches the data column's commitments. - fn merge_data_columns( - &mut self, - data_columns: FixedVector, E::DataColumnCount>, - ) { - for (index, data_column) in data_columns.iter().cloned().enumerate() { - let Some(data_column) = data_column else { - continue; - }; - // TODO(das): Add equivalent checks for data columns if necessary - if !self.data_column_exists(index) { - self.insert_data_column_at_index(index, data_column) - } - } - } - - /// Merges a given set of blobs into the cache. - /// - /// Blobs are only inserted if: - /// 1. The blob entry at the index is empty and no block exists. - /// 2. The block exists and its commitment matches the blob's commitment. - fn merge_blobs(&mut self, blobs: FixedVector, E::MaxBlobsPerBlock>) { - for (index, blob) in blobs.iter().cloned().enumerate() { - let Some(blob) = blob else { continue }; - self.merge_single_blob(index, blob); - } - } - - /// Merges a single blob into the cache. - /// - /// Blobs are only inserted if: - /// 1. The blob entry at the index is empty and no block exists, or - /// 2. The block exists and its commitment matches the blob's commitment. - fn merge_single_blob(&mut self, index: usize, blob: Self::BlobType) { - let commitment = *blob.get_commitment(); - if let Some(cached_block) = self.get_cached_block() { - let block_commitment_opt = cached_block.get_commitments().get(index).copied(); - if let Some(block_commitment) = block_commitment_opt { - if block_commitment == commitment { - self.insert_blob_at_index(index, blob) - } - } - } else if !self.blob_exists(index) { - self.insert_blob_at_index(index, blob) - } - } - - /// Inserts a new block and revalidates the existing blobs against it. - /// - /// Blobs that don't match the new block's commitments are evicted. - fn merge_block(&mut self, block: Self::BlockType) { - self.insert_block(block); - let reinsert = std::mem::take(self.get_cached_blobs_mut()); - self.merge_blobs(reinsert); - } - - /// Checks if the block and all of its expected blobs are available in the cache. - /// - /// Returns `true` if both the block exists and the number of received blobs matches the number - /// of expected blobs. - fn is_available(&self) -> bool { - if let Some(num_expected_blobs) = self.num_expected_blobs() { - num_expected_blobs == self.num_received_blobs() - } else { - false - } - } -} - -/// Implements the `AvailabilityView` trait for a given struct. -/// -/// - `$struct_name`: The name of the struct for which to implement `AvailabilityView`. -/// - `$block_type`: The type to use for `BlockType` in the `AvailabilityView` trait. -/// - `$blob_type`: The type to use for `BlobType` in the `AvailabilityView` trait. -/// - `$data_column_type`: The type to use for `DataColumnType` in the `AvailabilityView` trait. -/// - `$block_field`: The field name in the struct that holds the cached block. -/// - `$data_column_field`: The field name in the struct that holds the cached data columns. -#[macro_export] -macro_rules! impl_availability_view { - ($struct_name:ident, $block_type:ty, $blob_type:ty, $data_column_type:ty, $block_field:ident, $blob_field:ident, $data_column_field:ident) => { - impl AvailabilityView for $struct_name { - type BlockType = $block_type; - type BlobType = $blob_type; - type DataColumnType = $data_column_type; - - fn get_cached_block(&self) -> &Option { - &self.$block_field - } - - fn get_cached_blobs( - &self, - ) -> &FixedVector, E::MaxBlobsPerBlock> { - &self.$blob_field - } - - fn get_cached_data_columns( - &self, - ) -> &FixedVector, E::DataColumnCount> { - &self.$data_column_field - } - - fn get_cached_block_mut(&mut self) -> &mut Option { - &mut self.$block_field - } - - fn get_cached_blobs_mut( - &mut self, - ) -> &mut FixedVector, E::MaxBlobsPerBlock> { - &mut self.$blob_field - } - - fn get_cached_data_columns_mut( - &mut self, - ) -> &mut FixedVector, E::DataColumnCount> { - &mut self.$data_column_field - } - } - }; -} - -impl_availability_view!( - ProcessingComponents, - Arc>, - KzgCommitment, - (), - block, - blob_commitments, - data_column_opts -); - -impl_availability_view!( - PendingComponents, - DietAvailabilityPendingExecutedBlock, - KzgVerifiedBlob, - KzgVerifiedDataColumn, - executed_block, - verified_blobs, - verified_data_columns -); - -impl_availability_view!( - ChildComponents, - Arc>, - Arc>, - Arc>, - downloaded_block, - downloaded_blobs, - downloaded_data_columns -); - -pub trait GetCommitments { - fn get_commitments(&self) -> KzgCommitments; -} - -pub trait GetCommitment { - fn get_commitment(&self) -> &KzgCommitment; -} - -impl GetCommitment for KzgCommitment { - fn get_commitment(&self) -> &KzgCommitment { - self - } -} - -// These implementations are required to implement `AvailabilityView` for `PendingComponents`. -impl GetCommitments for DietAvailabilityPendingExecutedBlock { - fn get_commitments(&self) -> KzgCommitments { - self.as_block() - .message() - .body() - .blob_kzg_commitments() - .cloned() - .unwrap_or_default() - } -} - -impl GetCommitment for KzgVerifiedBlob { - fn get_commitment(&self) -> &KzgCommitment { - &self.as_blob().kzg_commitment - } -} - -// These implementations are required to implement `AvailabilityView` for `ChildComponents`. -impl GetCommitments for Arc> { - fn get_commitments(&self) -> KzgCommitments { - self.message() - .body() - .blob_kzg_commitments() - .ok() - .cloned() - .unwrap_or_default() - } -} - -impl GetCommitment for Arc> { - fn get_commitment(&self) -> &KzgCommitment { - &self.kzg_commitment - } -} - -#[cfg(test)] -pub mod tests { - use super::*; - use crate::block_verification_types::BlockImportData; - use crate::eth1_finalization_cache::Eth1FinalizationData; - use crate::test_utils::{generate_rand_block_and_blobs, NumBlobs}; - use crate::AvailabilityPendingExecutedBlock; - use crate::PayloadVerificationOutcome; - use fork_choice::PayloadVerificationStatus; - use rand::rngs::StdRng; - use rand::SeedableRng; - use state_processing::ConsensusContext; - use types::test_utils::TestRandom; - use types::{BeaconState, ChainSpec, ForkName, MainnetEthSpec, Slot}; - - type E = MainnetEthSpec; - - type Setup = ( - SignedBeaconBlock, - FixedVector>>, ::MaxBlobsPerBlock>, - FixedVector>>, ::MaxBlobsPerBlock>, - ); - - pub fn pre_setup() -> Setup { - let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - let (block, blobs_vec) = - generate_rand_block_and_blobs::(ForkName::Deneb, NumBlobs::Random, &mut rng); - let mut blobs: FixedVector<_, ::MaxBlobsPerBlock> = FixedVector::default(); - - for blob in blobs_vec { - if let Some(b) = blobs.get_mut(blob.index as usize) { - *b = Some(Arc::new(blob)); - } - } - - let mut invalid_blobs: FixedVector< - Option>>, - ::MaxBlobsPerBlock, - > = FixedVector::default(); - for (index, blob) in blobs.iter().enumerate() { - if let Some(invalid_blob) = blob { - let mut blob_copy = invalid_blob.as_ref().clone(); - blob_copy.kzg_commitment = KzgCommitment::random_for_test(&mut rng); - *invalid_blobs.get_mut(index).unwrap() = Some(Arc::new(blob_copy)); - } - } - - (block, blobs, invalid_blobs) - } - - type ProcessingViewSetup = ( - Arc>, - FixedVector, ::MaxBlobsPerBlock>, - FixedVector, ::MaxBlobsPerBlock>, - ); - - pub fn setup_processing_components( - block: SignedBeaconBlock, - valid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - invalid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - ) -> ProcessingViewSetup { - let blobs = FixedVector::from( - valid_blobs - .iter() - .map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment)) - .collect::>(), - ); - let invalid_blobs = FixedVector::from( - invalid_blobs - .iter() - .map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment)) - .collect::>(), - ); - (Arc::new(block), blobs, invalid_blobs) - } - - type PendingComponentsSetup = ( - DietAvailabilityPendingExecutedBlock, - FixedVector>, ::MaxBlobsPerBlock>, - FixedVector>, ::MaxBlobsPerBlock>, - ); - - pub fn setup_pending_components( - block: SignedBeaconBlock, - valid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - invalid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - ) -> PendingComponentsSetup { - let blobs = FixedVector::from( - valid_blobs - .iter() - .map(|blob_opt| { - blob_opt - .as_ref() - .map(|blob| KzgVerifiedBlob::__assumed_valid(blob.clone())) - }) - .collect::>(), - ); - let invalid_blobs = FixedVector::from( - invalid_blobs - .iter() - .map(|blob_opt| { - blob_opt - .as_ref() - .map(|blob| KzgVerifiedBlob::__assumed_valid(blob.clone())) - }) - .collect::>(), - ); - let dummy_parent = block.clone_as_blinded(); - let block = AvailabilityPendingExecutedBlock { - block: Arc::new(block), - import_data: BlockImportData { - block_root: Default::default(), - state: BeaconState::new(0, Default::default(), &ChainSpec::minimal()), - parent_block: dummy_parent, - parent_eth1_finalization_data: Eth1FinalizationData { - eth1_data: Default::default(), - eth1_deposit_index: 0, - }, - confirmed_state_roots: vec![], - consensus_context: ConsensusContext::new(Slot::new(0)), - }, - payload_verification_outcome: PayloadVerificationOutcome { - payload_verification_status: PayloadVerificationStatus::Verified, - is_valid_merge_transition_block: false, - }, - }; - (block.into(), blobs, invalid_blobs) - } - - type ChildComponentsSetup = ( - Arc>, - FixedVector>>, ::MaxBlobsPerBlock>, - FixedVector>>, ::MaxBlobsPerBlock>, - ); - - pub fn setup_child_components( - block: SignedBeaconBlock, - valid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - invalid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, - ) -> ChildComponentsSetup { - let blobs = FixedVector::from(valid_blobs.into_iter().cloned().collect::>()); - let invalid_blobs = - FixedVector::from(invalid_blobs.into_iter().cloned().collect::>()); - (Arc::new(block), blobs, invalid_blobs) - } - - pub fn assert_cache_consistent>(cache: V) { - if let Some(cached_block) = cache.get_cached_block() { - let cached_block_commitments = cached_block.get_commitments(); - for index in 0..E::max_blobs_per_block() { - let block_commitment = cached_block_commitments.get(index).copied(); - let blob_commitment_opt = cache.get_cached_blobs().get(index).unwrap(); - let blob_commitment = blob_commitment_opt.as_ref().map(|b| *b.get_commitment()); - assert_eq!(block_commitment, blob_commitment); - } - } else { - panic!("No cached block") - } - } - - pub fn assert_empty_blob_cache>(cache: V) { - for blob in cache.get_cached_blobs().iter() { - assert!(blob.is_none()); - } - } - - #[macro_export] - macro_rules! generate_tests { - ($module_name:ident, $type_name:ty, $block_field:ident, $blob_field:ident, $setup_fn:ident) => { - mod $module_name { - use super::*; - use types::Hash256; - - #[test] - fn valid_block_invalid_blobs_valid_blobs() { - let (block_commitments, blobs, random_blobs) = pre_setup(); - let (block_commitments, blobs, random_blobs) = - $setup_fn(block_commitments, blobs, random_blobs); - let block_root = Hash256::zero(); - let mut cache = <$type_name>::empty(block_root); - cache.merge_block(block_commitments); - cache.merge_blobs(random_blobs); - cache.merge_blobs(blobs); - - assert_cache_consistent(cache); - } - - #[test] - fn invalid_blobs_block_valid_blobs() { - let (block_commitments, blobs, random_blobs) = pre_setup(); - let (block_commitments, blobs, random_blobs) = - $setup_fn(block_commitments, blobs, random_blobs); - let block_root = Hash256::zero(); - let mut cache = <$type_name>::empty(block_root); - cache.merge_blobs(random_blobs); - cache.merge_block(block_commitments); - cache.merge_blobs(blobs); - - assert_cache_consistent(cache); - } - - #[test] - fn invalid_blobs_valid_blobs_block() { - let (block_commitments, blobs, random_blobs) = pre_setup(); - let (block_commitments, blobs, random_blobs) = - $setup_fn(block_commitments, blobs, random_blobs); - - let block_root = Hash256::zero(); - let mut cache = <$type_name>::empty(block_root); - cache.merge_blobs(random_blobs); - cache.merge_blobs(blobs); - cache.merge_block(block_commitments); - - assert_empty_blob_cache(cache); - } - - #[test] - fn block_valid_blobs_invalid_blobs() { - let (block_commitments, blobs, random_blobs) = pre_setup(); - let (block_commitments, blobs, random_blobs) = - $setup_fn(block_commitments, blobs, random_blobs); - - let block_root = Hash256::zero(); - let mut cache = <$type_name>::empty(block_root); - cache.merge_block(block_commitments); - cache.merge_blobs(blobs); - cache.merge_blobs(random_blobs); - - assert_cache_consistent(cache); - } - - #[test] - fn valid_blobs_block_invalid_blobs() { - let (block_commitments, blobs, random_blobs) = pre_setup(); - let (block_commitments, blobs, random_blobs) = - $setup_fn(block_commitments, blobs, random_blobs); - - let block_root = Hash256::zero(); - let mut cache = <$type_name>::empty(block_root); - cache.merge_blobs(blobs); - cache.merge_block(block_commitments); - cache.merge_blobs(random_blobs); - - assert_cache_consistent(cache); - } - - #[test] - fn valid_blobs_invalid_blobs_block() { - let (block_commitments, blobs, random_blobs) = pre_setup(); - let (block_commitments, blobs, random_blobs) = - $setup_fn(block_commitments, blobs, random_blobs); - - let block_root = Hash256::zero(); - let mut cache = <$type_name>::empty(block_root); - cache.merge_blobs(blobs); - cache.merge_blobs(random_blobs); - cache.merge_block(block_commitments); - - assert_cache_consistent(cache); - } - } - }; - } - - generate_tests!( - processing_components_tests, - ProcessingComponents::, - kzg_commitments, - processing_blobs, - setup_processing_components - ); - generate_tests!( - pending_components_tests, - PendingComponents, - executed_block, - verified_blobs, - setup_pending_components - ); - generate_tests!( - child_component_tests, - ChildComponents::, - downloaded_block, - downloaded_blobs, - setup_child_components - ); -} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs index 09cc5da9027..a4ce40ee3f4 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs @@ -1,10 +1,9 @@ use crate::block_verification_types::RpcBlock; -use crate::data_availability_checker::AvailabilityView; use bls::Hash256; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::data_column_sidecar::FixedDataColumnSidecarList; -use types::{EthSpec, SignedBeaconBlock}; +use types::data_column_sidecar::DataColumnSidecarList; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; /// For requests triggered by an `UnknownBlockParent` or `UnknownBlobParent`, this struct /// is used to cache components as they are sent to the network service. We can't use the @@ -14,7 +13,7 @@ pub struct ChildComponents { pub block_root: Hash256, pub downloaded_block: Option>>, pub downloaded_blobs: FixedBlobSidecarList, - pub downloaded_data_columns: FixedDataColumnSidecarList, + pub downloaded_data_columns: Vec>>, } impl From> for ChildComponents { @@ -23,10 +22,7 @@ impl From> for ChildComponents { let fixed_blobs = blobs.map(|blobs| { FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::>()) }); - let fixed_data_columns = data_columns.map(|data_columns| { - FixedDataColumnSidecarList::from(data_columns.into_iter().map(Some).collect::>()) - }); - Self::new(block_root, Some(block), fixed_blobs, fixed_data_columns) + Self::new(block_root, Some(block), fixed_blobs, data_columns) } } @@ -43,7 +39,7 @@ impl ChildComponents { block_root: Hash256, block: Option>>, blobs: Option>, - data_columns: Option>, + data_columns: Option>, ) -> Self { let mut cache = Self::empty(block_root); if let Some(block) = block { @@ -53,11 +49,44 @@ impl ChildComponents { cache.merge_blobs(blobs); } if let Some(data_columns) = data_columns { - cache.merge_data_columns(data_columns); + cache.merge_data_columns(data_columns.to_vec()) } cache } + pub fn merge_block(&mut self, block: Arc>) { + self.downloaded_block = Some(block); + } + + pub fn merge_blob(&mut self, blob: Arc>) { + if let Some(blob_ref) = self.downloaded_blobs.get_mut(blob.index as usize) { + *blob_ref = Some(blob); + } + } + + pub fn merge_blobs(&mut self, blobs: FixedBlobSidecarList) { + for blob in blobs.iter().flatten() { + self.merge_blob(blob.clone()); + } + } + + pub fn merge_data_columns(&mut self, data_columns: Vec>>) { + for data_column in data_columns { + self.merge_data_column(data_column); + } + } + + pub fn merge_data_column(&mut self, data_column: Arc>) { + if self + .downloaded_data_columns + .iter() + .find(|d| d.index == data_column.index) + .is_none() + { + self.downloaded_data_columns.push(data_column) + } + } + pub fn clear_blobs(&mut self) { self.downloaded_blobs = FixedBlobSidecarList::default(); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 246daf9579d..e974b0b709b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -28,12 +28,12 @@ //! the cache when they are accessed. use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache}; +use super::CustodyConfig; use crate::beacon_chain::BeaconStore; use crate::blob_verification::KzgVerifiedBlob; use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; -use crate::data_availability_checker::availability_view::AvailabilityView; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedDataColumn; use crate::store::{DBColumn, KeyValueStore}; @@ -46,8 +46,8 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; -use types::data_column_sidecar::DataColumnIdentifier; -use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, Slot}; /// This represents the components of a partially available block /// @@ -57,16 +57,96 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, - pub verified_data_columns: FixedVector>, T::DataColumnCount>, + // TODO(das) after KZG verifying sample columns we don't need to keep them around anymore. As a + // memory optimization we can drop the data of sampled columns, and keep the data for custody + // columns only. Introduce a new type: + // ``` + // #[derive(Encode, Decode, Clone)] + // pub struct KzgVerifiedSampledDataColumn { + // pub index: u64, + // pub slot: Slot, + // pub data: Option>, + // } + // ``` + // and convert to it based on this `PendingComponents` custody assignments + pub verified_data_columns: Vec>, pub executed_block: Option>, } impl PendingComponents { + fn block_slot(&self) -> Option { + self.executed_block + .as_ref() + .map(|b| b.as_block().message().slot()) + } + + fn data_column_exists(&self, data_colum_index: ColumnIndex) -> bool { + self.verified_data_columns + .iter() + .find(|c| c.data_column_index() == data_colum_index) + .is_some() + } + + /// Returns the number of blobs that have been received and are stored in the cache. + fn num_received_blobs(&self) -> usize { + self.verified_blobs.iter().flatten().count() + } + + /// Inserts a blob at a specific index in the cache. + /// + /// Existing blob at the index will be replaced. + fn insert_blob_at_index(&mut self, blob_index: usize, blob: KzgVerifiedBlob) { + if let Some(b) = self.verified_blobs.get_mut(blob_index) { + *b = Some(blob); + } + } + + /// Merges a given set of data columns into the cache. + /// + /// Data columns are only inserted if: + /// 1. The data column entry at the index is empty and no block exists. + /// 2. The block exists and its commitments matches the data column's commitments. + fn merge_data_column(&mut self, data_column: KzgVerifiedDataColumn) { + let index = data_column.data_column_index(); + // TODO(das): Add equivalent checks for data columns if necessary + if !self.data_column_exists((index).into()) { + self.verified_data_columns.push(data_column); + } + } + + /// Merges a given set of blobs into the cache. + /// + /// Blobs are only inserted if: + /// 1. The blob entry at the index is empty and no block exists. + /// 2. The block exists and its commitment matches the blob's commitment. + fn merge_blobs(&mut self, blobs: FixedVector>, T::MaxBlobsPerBlock>) { + for (index, blob) in blobs.iter().cloned().enumerate() { + let Some(blob) = blob else { continue }; + self.merge_single_blob(index, blob); + } + } + + /// Merges a single blob into the cache. + /// + /// Blobs are only inserted if: + /// 1. The blob entry at the index is empty and no block exists, or + /// 2. The block exists and its commitment matches the blob's commitment. + fn merge_single_blob(&mut self, index: usize, blob: KzgVerifiedBlob) { + self.insert_blob_at_index(index, blob) + } + + /// Inserts a new block and revalidates the existing blobs against it. + /// + /// Blobs that don't match the new block's commitments are evicted. + fn merge_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { + self.executed_block = Some(block); + } + pub fn empty(block_root: Hash256) -> Self { Self { block_root, - verified_blobs: FixedVector::default(), - verified_data_columns: FixedVector::default(), + verified_blobs: <_>::default(), + verified_data_columns: <_>::default(), executed_block: None, } } @@ -88,12 +168,14 @@ impl PendingComponents { verified_blobs, verified_data_columns, executed_block, + .. } = self; let Some(diet_executed_block) = executed_block else { return Err(AvailabilityCheckError::Unexpected); }; let num_blobs_expected = diet_executed_block.num_blobs_expected(); + // TODO(das): blobs may be empty post PeerDAS but the blob can have commitments let Some(verified_blobs) = verified_blobs .into_iter() .cloned() @@ -105,13 +187,13 @@ impl PendingComponents { }; let verified_blobs = VariableList::new(verified_blobs)?; - // TODO(das) Do we need a check here for number of expected custody columns? - let verified_data_columns = verified_data_columns - .into_iter() - .cloned() - .filter_map(|d| d.map(|d| d.to_data_column())) - .collect::>() - .into(); + // TODO(das) Add check later - we don't expect data columns to be available until we transition to PeerDAS. + let custody_data_columns = VariableList::new( + verified_data_columns + .iter() + .map(|d| d.clone_data_column()) + .collect::>(), + )?; let executed_block = recover(diet_executed_block)?; @@ -125,7 +207,8 @@ impl PendingComponents { block_root, block, blobs: Some(verified_blobs), - data_columns: Some(verified_data_columns), + // TODO(das) Add check later - we don't expect data columns to be available until we transition to PeerDAS. + custody_data_columns: Some(custody_data_columns), }; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), @@ -147,21 +230,27 @@ impl PendingComponents { }); } } - for maybe_data_column in self.verified_data_columns.iter() { - if maybe_data_column.is_some() { - return maybe_data_column.as_ref().map(|kzg_verified_data_column| { - kzg_verified_data_column - .as_data_column() - .slot() - .epoch(T::slots_per_epoch()) - }); - } - } - None + self.verified_data_columns.iter().next().map(|data_column| { + data_column + .as_data_column() + .slot() + .epoch(T::slots_per_epoch()) + }) }) } } +pub fn compute_sample_requirements(slot: Slot) -> Vec { + todo!() +} + +pub fn compute_custody_requirements( + slot: Slot, + custody_config: &CustodyConfig, +) -> Vec { + todo!() +} + /// Blocks and blobs are stored in the database sequentially so that it's /// fast to iterate over all the data for a particular block. #[derive(Debug, PartialEq)] @@ -204,8 +293,7 @@ impl OverflowKey { pub fn root(&self) -> &Hash256 { match self { Self::Block(root) => root, - Self::Blob(root, _) => root, - Self::DataColumn(root, _) => root, + Self::Blob(root, _) | Self::DataColumn(root, _) => root, } } } @@ -245,21 +333,7 @@ impl OverflowStore { .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? } - for data_column in Vec::from(pending_components.verified_data_columns) - .into_iter() - .flatten() - { - let key = OverflowKey::from_data_column_id::(DataColumnIdentifier { - block_root, - index: data_column.data_column_index(), - })?; - - self.0.hot_db.put_bytes( - col.as_str(), - &key.as_ssz_bytes(), - &data_column.as_ssz_bytes(), - )? - } + todo!("does not support persisting data columns"); Ok(()) } @@ -295,14 +369,7 @@ impl OverflowStore { Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); } OverflowKey::DataColumn(_, index) => { - *maybe_pending_components - .get_or_insert_with(|| PendingComponents::empty(block_root)) - .verified_data_columns - .get_mut(index as usize) - .ok_or(AvailabilityCheckError::DataColumnIndexInvalid(index as u64))? = - Some(KzgVerifiedDataColumn::from_ssz_bytes( - value_bytes.as_slice(), - )?); + todo!("does not support persisting data columns"); } } } @@ -342,6 +409,7 @@ impl OverflowStore { ) -> Result>>, AvailabilityCheckError> { let key = OverflowKey::from_data_column_id::(*data_column_id)?; + // TODO: Test this code path, probably broken self.0 .hot_db .get_bytes(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())? @@ -390,6 +458,15 @@ impl Critical { Ok(()) } + pub fn block_slot(&self, block_root: &Hash256) -> Option { + self.in_memory.peek(block_root).and_then(|b| b.block_slot()) + } + + /// Returns true if the block root is known, without altering the LRU ordering + pub fn has_block(&self, block_root: &Hash256) -> bool { + self.in_memory.peek(block_root).is_some() || self.store_keys.get(block_root).is_some() + } + /// This only checks for the blobs in memory pub fn peek_blob( &self, @@ -407,6 +484,13 @@ impl Critical { } } + pub fn peek_pending_components( + &self, + block_root: &Hash256, + ) -> Option<&PendingComponents> { + self.in_memory.peek(block_root) + } + /// This only checks for the data columns in memory pub fn peek_data_column( &self, @@ -415,10 +499,8 @@ impl Critical { if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { Ok(pending_components .verified_data_columns - .get(data_column_id.index as usize) - .ok_or(AvailabilityCheckError::DataColumnIndexInvalid( - data_column_id.index, - ))? + .iter() + .find(|c| c.data_column_index() == data_column_id.index) .as_ref() .map(|data_column| data_column.clone_data_column())) } else { @@ -493,6 +575,9 @@ pub struct OverflowLRUCache { maintenance_lock: Mutex<()>, /// The capacity of the LRU cache capacity: NonZeroUsize, + + custody_config: CustodyConfig, + spec: ChainSpec, } impl OverflowLRUCache { @@ -500,6 +585,7 @@ impl OverflowLRUCache { capacity: NonZeroUsize, beacon_store: BeaconStore, spec: ChainSpec, + custody_config: CustodyConfig, ) -> Result { let overflow_store = OverflowStore(beacon_store.clone()); let mut critical = Critical::new(capacity); @@ -507,12 +593,27 @@ impl OverflowLRUCache { Ok(Self { critical: RwLock::new(critical), overflow_store, - state_cache: StateLRUCache::new(beacon_store, spec), + state_cache: StateLRUCache::new(beacon_store, spec.clone()), maintenance_lock: Mutex::new(()), capacity, + spec, + custody_config, }) } + pub fn get_custody_config(&self) -> &CustodyConfig { + &self.custody_config + } + + pub fn block_slot(&self, block_root: &Hash256) -> Option { + self.critical.read().block_slot(block_root) + } + + /// Returns true if the block root is known, without altering the LRU ordering + pub fn has_block(&self, block_root: &Hash256) -> bool { + self.critical.read().has_block(block_root) + } + /// Fetch a blob from the cache without affecting the LRU ordering pub fn peek_blob( &self, @@ -529,6 +630,14 @@ impl OverflowLRUCache { } } + pub fn with_pending_components>) -> R>( + &self, + block_root: &Hash256, + f: F, + ) -> R { + f(self.critical.read().peek_pending_components(block_root)) + } + /// Fetch a data column from the cache without affecting the LRU ordering pub fn peek_data_column( &self, @@ -545,23 +654,11 @@ impl OverflowLRUCache { } } - pub fn put_kzg_verified_data_columns< - I: IntoIterator>, - >( + pub fn put_kzg_verified_data_column( &self, block_root: Hash256, - kzg_verified_data_columns: I, + kzg_verified_data_column: KzgVerifiedDataColumn, ) -> Result, AvailabilityCheckError> { - let mut fixed_data_columns = FixedVector::default(); - - for data_column in kzg_verified_data_columns { - if let Some(data_column_opt) = - fixed_data_columns.get_mut(data_column.data_column_index() as usize) - { - *data_column_opt = Some(data_column); - } - } - let mut write_lock = self.critical.write(); // Grab existing entry or create a new entry. @@ -570,12 +667,22 @@ impl OverflowLRUCache { .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the data columns. - pending_components.merge_data_columns(fixed_data_columns); + pending_components.merge_data_column(kzg_verified_data_column); - write_lock.put_pending_components(block_root, pending_components, &self.overflow_store)?; - - // TODO(das): Currently this does not change availability status and nor import yet. - Ok(Availability::MissingComponents(block_root)) + if self.is_available(&pending_components) { + // No need to hold the write lock anymore + drop(write_lock); + pending_components.make_available(|diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) + } else { + write_lock.put_pending_components( + block_root, + pending_components, + &self.overflow_store, + )?; + Ok(Availability::MissingComponents(block_root)) + } } pub fn put_kzg_verified_blobs>>( @@ -601,7 +708,7 @@ impl OverflowLRUCache { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if pending_components.is_available() { + if self.is_available(&pending_components) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -640,7 +747,7 @@ impl OverflowLRUCache { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - if pending_components.is_available() { + if self.is_available(&pending_components) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -687,6 +794,43 @@ impl OverflowLRUCache { Ok(()) } + /// Return this node's custody column requirements at `slot` + pub fn custody_columns_at_slot(&self, slot: Slot) -> Vec { + compute_custody_requirements(slot, &self.custody_config) + } + + fn is_available(&self, pending_components: &PendingComponents) -> bool { + let Some(block) = &pending_components.executed_block else { + return false; + }; + + let block = block.as_block().message(); + let is_post_peerdas = if let Some(deneb_fork_epoch) = self.spec.deneb_fork_epoch { + block.slot().epoch(T::EthSpec::slots_per_epoch()) >= deneb_fork_epoch + } else { + false + }; + + if is_post_peerdas { + // PeerDAS data availability + // TODO: Should reject blobs? + compute_sample_requirements(block.slot()) + .iter() + .chain(compute_custody_requirements(block.slot(), &self.custody_config).iter()) + // TODO: This is O(n^2) complexity check, optimize + .all(|index| pending_components.data_column_exists(*index)) + } else { + // Deneb data availability + let num_expected_blobs = block + .body() + .blob_kzg_commitments() + .cloned() + .unwrap_or_default() + .len(); + return num_expected_blobs == pending_components.num_received_blobs(); + } + } + /// Enforce that the size of the cache is below a given threshold by /// moving the least recently used items to disk. fn maintain_threshold( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs deleted file mode 100644 index 7abbd700104..00000000000 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::data_availability_checker::AvailabilityView; -use ssz_types::FixedVector; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; -use types::beacon_block_body::KzgCommitmentOpts; -use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; - -/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp -/// a view of what we have and what we require. This cache serves a slightly different purpose than -/// gossip caches because it allows us to process duplicate blobs that are valid in gossip. -/// See `AvailabilityView`'s trait definition. -#[derive(Default)] -pub struct ProcessingCache { - processing_cache: HashMap>, -} - -impl ProcessingCache { - pub fn get(&self, block_root: &Hash256) -> Option<&ProcessingComponents> { - self.processing_cache.get(block_root) - } - pub fn entry(&mut self, block_root: Hash256) -> Entry<'_, Hash256, ProcessingComponents> { - self.processing_cache.entry(block_root) - } - pub fn remove(&mut self, block_root: &Hash256) { - self.processing_cache.remove(block_root); - } - pub fn has_block(&self, block_root: &Hash256) -> bool { - self.processing_cache - .get(block_root) - .map_or(false, |b| b.block_exists()) - } - pub fn incomplete_processing_components(&self, slot: Slot) -> Vec { - let mut roots_missing_components = vec![]; - for (&block_root, info) in self.processing_cache.iter() { - if info.slot == slot && !info.is_available() { - roots_missing_components.push(block_root); - } - } - roots_missing_components - } - pub fn len(&self) -> usize { - self.processing_cache.len() - } -} - -#[derive(Debug, Clone)] -pub struct ProcessingComponents { - slot: Slot, - /// Blobs required for a block can only be known if we have seen the block. So `Some` here - /// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure - /// out whether incoming blobs actually match the block. - pub block: Option>>, - /// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See - /// `AvailabilityView`'s trait definition for more details. - pub blob_commitments: KzgCommitmentOpts, - // TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them - // again here and a `()` may be sufficient to indicate what we have. - pub data_column_opts: FixedVector, E::DataColumnCount>, -} - -impl ProcessingComponents { - pub fn new(slot: Slot) -> Self { - Self { - slot, - block: None, - blob_commitments: KzgCommitmentOpts::::default(), - data_column_opts: FixedVector::default(), - } - } -} - -// Not safe for use outside of tests as this always required a slot. -#[cfg(test)] -impl ProcessingComponents { - pub fn empty(_block_root: Hash256) -> Self { - Self { - slot: Slot::new(0), - block: None, - blob_commitments: KzgCommitmentOpts::::default(), - data_column_opts: FixedVector::default(), - } - } -} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 35c114db542..03fc219ab3d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -35,6 +35,10 @@ impl DietAvailabilityPendingExecutedBlock { &self.block } + pub fn as_block_cloned(&self) -> Arc> { + self.block.clone() + } + pub fn num_blobs_expected(&self) -> usize { self.block .message() diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 9ab2a0d395e..50399ada8a1 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1209,10 +1209,6 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { } let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); - set_gauge_by_usize( - &DATA_AVAILABILITY_PROCESSING_CACHE_SIZE, - da_checker_metrics.processing_cache_size, - ); set_gauge_by_usize( &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, da_checker_metrics.block_cache_size, diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5ac1aaac4c7..24d20e9635a 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -157,6 +157,9 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024; +// TODO(das): pick number +const MAX_RPC_DATA_COLUMN_QUEUE_LEN: usize = 1_024; + /// The maximum number of queued `Vec` objects received during syncing that will /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; @@ -244,6 +247,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const RPC_BLOBS: &str = "rpc_blob"; +pub const RPC_DATA_COLUMNS: &str = "rpc_data_columns"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; @@ -619,6 +623,9 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcDataColumns { + process_fn: AsyncFn, + }, IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -663,6 +670,7 @@ impl Work { Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOBS, + Work::RpcDataColumns { .. } => RPC_DATA_COLUMNS, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, @@ -819,6 +827,7 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); + let mut rpc_data_columns_queue = FifoQueue::new(MAX_RPC_DATA_COLUMN_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); @@ -1255,6 +1264,9 @@ impl BeaconProcessor { rpc_block_queue.push(work, work_id, &self.log) } Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcDataColumns { .. } => { + rpc_data_columns_queue.push(work, work_id, &self.log) + } Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } @@ -1481,9 +1493,9 @@ impl BeaconProcessor { beacon_block_root: _, process_fn, } => task_spawner.spawn_async(process_fn), - Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => { - task_spawner.spawn_async(process_fn) - } + Work::RpcBlock { process_fn } + | Work::RpcBlobs { process_fn } + | Work::RpcDataColumns { process_fn } => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 558e5cbc84f..0328f6235e0 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -30,6 +30,7 @@ use eth2::{ use execution_layer::ExecutionLayer; use futures::channel::mpsc::Receiver; use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH}; +use lighthouse_network::NodeId; use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals}; use monitoring_api::{MonitoringHttpClient, ProcessType}; use network::{NetworkConfig, NetworkSenders, NetworkService}; @@ -158,6 +159,7 @@ where mut self, client_genesis: ClientGenesis, config: ClientConfig, + node_id: NodeId, ) -> Result { let store = self.store.clone(); let chain_spec = self.chain_spec.clone(); @@ -206,7 +208,8 @@ where .graffiti(graffiti) .event_handler(event_handler) .execution_layer(execution_layer) - .validator_monitor_config(config.validator_monitor.clone()); + .validator_monitor_config(config.validator_monitor.clone()) + .node_id(node_id.raw().into()); let builder = if let Some(slasher) = self.slasher.clone() { builder.slasher(slasher) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 6e0c00e42b8..62b8820f5fb 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -73,6 +73,15 @@ impl Default for PeerInfo { } impl PeerInfo { + pub fn node_id(&self) -> Option<[u8; 32]> { + self.enr().as_ref().map(|enr| enr.node_id().raw()) + } + + pub fn custody_requirements(&self) -> u64 { + // TODO: read from ENR + 2 + } + /// Return a PeerInfo struct for a trusted peer. pub fn trusted_peer_info() -> Self { PeerInfo { diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index efc80d55faa..a4554b1ba4d 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -374,6 +374,16 @@ pub struct DataColumnsByRootRequest { pub data_column_ids: RuntimeVariableList, } +impl DataColumnsByRootRequest { + pub fn new(data_column_ids: Vec, spec: &ChainSpec) -> Self { + let data_column_ids = RuntimeVariableList::from_vec( + data_column_ids, + spec.max_request_data_column_sidecars as usize, + ); + Self { data_column_ids } + } +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index de367436ec6..a10abf73cb8 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -1,10 +1,12 @@ -use crate::gossipsub; use crate::multiaddr::Protocol; use crate::rpc::{MetaData, MetaDataV1, MetaDataV2}; use crate::types::{ error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind, }; +use crate::{gossipsub, CombinedKeyExt}; use crate::{GossipTopic, NetworkConfig}; +pub use discv5::enr::NodeId; +use discv5::enr::{CombinedKey, EnrKey}; use futures::future::Either; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::identity::{secp256k1, Keypair}; @@ -108,6 +110,12 @@ fn keypair_from_bytes(mut bytes: Vec) -> error::Result { .map_err(|e| format!("Unable to parse p2p secret key: {:?}", e).into()) } +/// NodeID from a local keypair +pub fn node_id_from_keypair(keypair: Keypair) -> Result { + let key = CombinedKey::from_libp2p(keypair)?; + Ok(NodeId::from(key.public())) +} + /// Loads a private key from disk. If this fails, a new key is /// generated and is then saved to disk. /// diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 62a1216f13b..b9f3781ab4a 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1297,6 +1297,14 @@ impl NetworkBeaconProcessor { self.chain.recompute_head_at_current_slot().await; } Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + // Send request for sampling + // TODO: Could attach a property to `MissingComponents` to tell this code path + // when to not sample + self.send_sync_message(SyncMessage::SampleBlock { + block_root: *block_root, + slot: block.message().slot(), + }); + trace!( self.log, "Processed block, waiting for other components"; diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7c444b8b52e..03e6a46d1cd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,6 +1,9 @@ use crate::{ service::NetworkMessage, - sync::{manager::BlockProcessType, SyncMessage}, + sync::{ + manager::{BlockProcessType, SampleReqId}, + SyncMessage, + }, }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ @@ -482,6 +485,23 @@ impl NetworkBeaconProcessor { }) } + pub fn send_rpc_data_column( + self: &Arc, + data_column: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let process_fn = self.clone().generate_rpc_data_column_process_fn( + data_column, + seen_timestamp, + process_type, + ); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcDataColumns { process_fn }, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, @@ -654,6 +674,10 @@ impl NetworkBeaconProcessor { }) } + pub fn sampling_request(&self, block_root: Hash256, slot: Slot) { + self.send_sync_message(SyncMessage::SampleBlock { block_root, slot }) + } + /// Send a message to `sync_tx`. /// /// Creates a log if there is an internal error. diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 7acb99a616e..ab25e80665f 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,5 +1,6 @@ use crate::metrics; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; +use crate::sync::manager::SampleReqId; use crate::sync::BatchProcessResult; use crate::sync::{ manager::{BlockProcessType, SyncMessage}, @@ -24,7 +25,7 @@ use store::KzgCommitment; use tokio::sync::mpsc; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{Epoch, Hash256}; +use types::{DataColumnSidecar, Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -302,6 +303,37 @@ impl NetworkBeaconProcessor { }); } + pub fn generate_rpc_data_column_process_fn( + self: Arc, + data_column: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> AsyncFn { + let process_fn = async move { + self.clone() + .process_rpc_data_column(data_column, seen_timestamp, process_type) + .await; + }; + Box::pin(process_fn) + } + + pub async fn process_rpc_data_column( + self: Arc>, + data_column: Arc>, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + // TODO(das): log and metrics + + let result = self.chain.process_rpc_data_column(data_column).await; + + // Sync handles these results + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 23b14ac1439..84f81c87f51 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -505,6 +505,8 @@ impl Router { RequestId::Sync(sync_id) => match sync_id { SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } + | SyncId::SingleDataColumn { .. } + | SyncId::SingleBlockSample { .. } | SyncId::ParentLookup { .. } | SyncId::ParentLookupBlob { .. } => { crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id); @@ -583,6 +585,10 @@ impl Router { crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id); return; } + SyncId::SingleDataColumn { .. } | SyncId::SingleBlockSample { .. } => { + crit!(self.log, "Data column response to block by roots request"; "peer_id" => %peer_id); + return; + } }, RequestId::Router => { crit!(self.log, "All BBRoot requests belong to sync"; "peer_id" => %peer_id); @@ -617,6 +623,10 @@ impl Router { crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id); return; } + SyncId::SingleDataColumn { .. } | SyncId::SingleBlockSample { .. } => { + crit!(self.log, "Data column response to blobs by roots request"; "peer_id" => %peer_id); + return; + } SyncId::BackFillBlocks { .. } | SyncId::RangeBlocks { .. } | SyncId::RangeBlockAndBlobs { .. } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 2186f8ac896..17c97544c86 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -757,7 +757,7 @@ impl NetworkService { for column_subnet in DataColumnSubnetId::compute_subnets_for_data_column::( self.network_globals.local_enr().node_id().raw().into(), - &self.beacon_chain.spec, + self.beacon_chain.spec.custody_requirement, ) { for fork_digest in self.required_gossip_fork_digests() { diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index d989fbb3362..adb324185ca 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -5,19 +5,23 @@ use crate::sync::block_lookups::single_block_lookup::{ use crate::sync::block_lookups::{ BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; -use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; +use crate::sync::manager::{BlockProcessType, Id, SampleReqId, SingleLookupReqId}; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}; +use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::{get_block_root, BeaconChainTypes}; -use lighthouse_network::rpc::methods::BlobsByRootRequest; +use lighthouse_network::rpc::methods::{BlobsByRootRequest, DataColumnsByRootRequest}; use lighthouse_network::rpc::BlocksByRootRequest; use rand::prelude::IteratorRandom; use std::ops::IndexMut; use std::sync::Arc; use std::time::Duration; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock}; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock}; + +use super::single_block_lookup::ColumnsRequestState; +use super::ColumnRequestState; #[derive(Debug, Copy, Clone)] pub enum ResponseType { @@ -83,6 +87,9 @@ pub trait RequestState { /// We convert a `VerifiedResponseType` to this type prior to sending it to the beacon processor. type ReconstructedResponseType; + /// Data to retrieve specific request from lookup + type RequestIdType: Copy; + /* Request building methods */ /// Construct a new request. @@ -222,6 +229,7 @@ pub trait RequestState { /// Send the response to the beacon processor. fn send_reconstructed_for_processing( id: Id, + request_id: Self::RequestIdType, bl: &BlockLookups, block_root: Hash256, verified: Self::ReconstructedResponseType, @@ -240,7 +248,10 @@ pub trait RequestState { fn response_type() -> ResponseType; /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + fn request_state_mut( + request: &mut SingleBlockLookup, + request_id: Self::RequestIdType, + ) -> Option<&mut Self>; /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. fn get_state(&self) -> &SingleLookupRequestState; @@ -254,6 +265,7 @@ impl RequestState for BlockRequestState type ResponseType = Arc>; type VerifiedResponseType = Arc>; type ReconstructedResponseType = RpcBlock; + type RequestIdType = (); fn new_request(&self, spec: &ChainSpec) -> BlocksByRootRequest { BlocksByRootRequest::new(vec![self.requested_block_root], spec) @@ -319,6 +331,7 @@ impl RequestState for BlockRequestState fn send_reconstructed_for_processing( id: Id, + _: Self::RequestIdType, bl: &BlockLookups, block_root: Hash256, constructed: RpcBlock, @@ -337,8 +350,11 @@ impl RequestState for BlockRequestState fn response_type() -> ResponseType { ResponseType::Block } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.block_request_state + fn request_state_mut( + request: &mut SingleBlockLookup, + _: Self::RequestIdType, + ) -> Option<&mut Self> { + Some(&mut request.block_request_state) } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -353,6 +369,7 @@ impl RequestState for BlobRequestState>; type VerifiedResponseType = FixedBlobSidecarList; type ReconstructedResponseType = FixedBlobSidecarList; + type RequestIdType = (); fn new_request(&self, spec: &ChainSpec) -> BlobsByRootRequest { let blob_id_vec: Vec = self.requested_ids.clone().into(); @@ -371,27 +388,36 @@ impl RequestState for BlobRequestState, peer_id: PeerId, ) -> Result>, LookupVerifyError> { match blob { Some(blob) => { let received_id = blob.id(); - if !self.requested_ids.contains(&received_id) { - self.state.register_failure_downloading(); + let blob_index = blob.index; + let r = if !self.requested_ids.contains(&received_id) { Err(LookupVerifyError::UnrequestedBlobId) + } else if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + Err(LookupVerifyError::InvalidInclusionProof) + } else if blob.block_root() != expected_block_root { + Err(LookupVerifyError::UnrequestedHeader) + } else if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { + Err(LookupVerifyError::InvalidIndex(blob.index)) } else { - // State should remain downloading until we receive the stream terminator. - self.requested_ids.remove(&received_id); - let blob_index = blob.index; + Ok(()) + }; - if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { - return Err(LookupVerifyError::InvalidIndex(blob.index)); - } - *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); - Ok(None) + if let Err(e) = r { + self.state.register_failure_downloading(); + return Err(e); } + + // State should remain downloading until we receive the stream terminator. + self.requested_ids.remove(&received_id); + + *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); + Ok(None) } None => { self.state.state = State::Processing { peer_id }; @@ -425,6 +451,7 @@ impl RequestState for BlobRequestState, block_root: Hash256, verified: FixedBlobSidecarList, @@ -443,8 +470,129 @@ impl RequestState for BlobRequestState ResponseType { ResponseType::Blob } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.blob_request_state + fn request_state_mut( + request: &mut SingleBlockLookup, + _: Self::RequestIdType, + ) -> Option<&mut Self> { + Some(&mut request.blob_request_state) + } + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} + +impl RequestState for ColumnRequestState { + type RequestType = DataColumnsByRootRequest; + type ResponseType = Arc>; + type VerifiedResponseType = Arc>; + type ReconstructedResponseType = Arc>; + type RequestIdType = ColumnIndex; + + fn new_request(&self, spec: &ChainSpec) -> Self::RequestType { + let id_vec: Vec = vec![self.requested_id.clone()]; + DataColumnsByRootRequest::new(id_vec, spec) + } + + fn make_request( + id: SingleLookupReqId, + peer_id: PeerId, + request: Self::RequestType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + cx.data_column_lookup_request( + id, + // TODO: Should track a single column_index or multiple + request.data_column_ids.to_vec().first().unwrap().index, + peer_id, + request, + ) + .map_err(LookupRequestError::SendFailed) + } + + fn verify_response_inner( + &mut self, + expected_block_root: Hash256, + data_column: Option, + peer_id: PeerId, + ) -> Result, LookupVerifyError> { + match data_column { + Some(data_column) => { + let received_id = data_column.id(); + let r = { + if self.requested_id != received_id { + Err(LookupVerifyError::UnrequestedBlobId) + } else if !data_column.verify_inclusion_proof().unwrap_or(false) { + Err(LookupVerifyError::InvalidInclusionProof) + } else if data_column.block_root() != expected_block_root { + Err(LookupVerifyError::UnrequestedHeader) + } else { + Ok(Some(data_column)) + } + }; + + if r.is_ok() { + self.state.state = State::Processing { peer_id }; + } else { + self.state.register_failure_downloading(); + } + r + } + None => Err(LookupVerifyError::NoBlockReturned), + } + } + + fn get_parent_root(verified_response: &Arc>) -> Option { + Some(verified_response.signed_block_header.message.parent_root) + } + + fn add_to_child_components( + verified_response: Arc>, + components: &mut ChildComponents, + ) { + todo!() + // components.merge_blobs(verified_response); + } + + fn verified_to_reconstructed( + _block_root: Hash256, + data_column: Arc>, + ) -> Arc> { + data_column + } + + fn send_reconstructed_for_processing( + id: Id, + _: Self::RequestIdType, + bl: &BlockLookups, + block_root: Hash256, + verified: Arc>, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let index = verified.index; + bl.send_data_column_for_processing( + block_root, + verified, + duration, + BlockProcessType::SingleDataColumn { id, index }, + cx, + ) + } + + fn response_type() -> ResponseType { + ResponseType::Blob + } + fn request_state_mut( + request: &mut SingleBlockLookup, + id: Self::RequestIdType, + ) -> Option<&mut Self> { + match &mut request.columns_request_state { + ColumnsRequestState::UnknownSlot => None, + ColumnsRequestState::KnownSlot { requests, .. } => requests.get_mut(id as usize), + } } fn get_state(&self) -> &SingleLookupRequestState { &self.state diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 62cdc4fa223..dfa9582f509 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -7,12 +7,15 @@ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::common::LookupType; use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError}; -use crate::sync::block_lookups::single_block_lookup::{CachedChild, LookupRequestError}; +pub use crate::sync::block_lookups::single_block_lookup::{ + CachedChild, ColumnRequestState, LookupRequestError, +}; use crate::sync::manager::{Id, SingleLookupReqId}; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; pub use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::data_availability_checker::{ - AvailabilityCheckErrorCategory, DataAvailabilityChecker, + compute_custody_requirements, AvailabilityCheckErrorCategory, CustodyConfig, + DataAvailabilityChecker, }; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -32,7 +35,7 @@ use std::sync::Arc; use std::time::Duration; use store::Hash256; use types::blob_sidecar::FixedBlobSidecarList; -use types::Slot; +use types::{DataColumnSidecar, Slot}; pub mod common; mod parent_lookup; @@ -76,6 +79,32 @@ impl BlockLookups { } } + /// Add a column custodial peer to all search requests that maybe need to fetch from this peer. + pub fn add_custody_peer( + &mut self, + peer_id: &PeerId, + peer_custody_config: &CustodyConfig, + cx: &mut SyncNetworkContext, + ) { + for (_, lookup) in self.single_block_lookups.iter_mut() { + if let Some(slot) = lookup.block_slot() { + let peer_custody_columns = compute_custody_requirements(slot, peer_custody_config); + lookup.add_custody_peer(peer_id, &peer_custody_columns); + + // When a peer is added, potentially some request will have peers and can be + // attempted again. + // TODO: Only request for lookups that added relevant peers + if let Err(e) = lookup.request_all_components(cx) { + // TODO: What to in case of error? Drop request? + debug!(self.log, + "Failed to request block and blobs after adding custody peer"; + "error" => ?e + ); + } + } + } + } + /* Lookup requests */ /// Creates a lookup for the block with the given `block_root` and immediately triggers it. @@ -111,7 +140,7 @@ impl BlockLookups { cx: &SyncNetworkContext, ) { let block_root = single_block_lookup.block_root(); - match single_block_lookup.request_block_and_blobs(cx) { + match single_block_lookup.request_all_components(cx) { Ok(()) => self.add_single_lookup(single_block_lookup), Err(e) => { debug!(self.log, "Single block lookup failed"; @@ -249,14 +278,16 @@ impl BlockLookups { /// that have been retried are ignored. fn get_single_lookup>( &mut self, - id: SingleLookupReqId, + lookup_id: SingleLookupReqId, + request_id: R::RequestIdType, ) -> Option> { - let mut lookup = self.single_block_lookups.remove(&id.id)?; + let mut lookup = self.single_block_lookups.remove(&lookup_id.id)?; - let request_state = R::request_state_mut(&mut lookup); - if id.req_counter != request_state.get_state().req_counter { + // TODO: okay to propagate None if request id is unknown? + let request_state = R::request_state_mut(&mut lookup, request_id)?; + if lookup_id.req_counter != request_state.get_state().req_counter { // We don't want to drop the lookup, just ignore the old response. - self.single_block_lookups.insert(id.id, lookup); + self.single_block_lookups.insert(lookup_id.id, lookup); return None; } Some(lookup) @@ -276,6 +307,7 @@ impl BlockLookups { pub fn single_lookup_response>( &mut self, lookup_id: SingleLookupReqId, + request_id: R::RequestIdType, peer_id: PeerId, response: Option, seen_timestamp: Duration, @@ -284,7 +316,7 @@ impl BlockLookups { let id = lookup_id.id; let response_type = R::response_type(); - let Some(lookup) = self.get_single_lookup::(lookup_id) else { + let Some(lookup) = self.get_single_lookup::(lookup_id, request_id) else { if response.is_some() { // We don't have the ability to cancel in-flight RPC requests. So this can happen // if we started this RPC request, and later saw the block/blobs via gossip. @@ -299,8 +331,14 @@ impl BlockLookups { let expected_block_root = lookup.block_root(); - match self.single_lookup_response_inner::(peer_id, response, seen_timestamp, cx, lookup) - { + match self.single_lookup_response_inner::( + peer_id, + response, + seen_timestamp, + cx, + lookup, + request_id, + ) { Ok(lookup) => { self.single_block_lookups.insert(id, lookup); } @@ -328,11 +366,13 @@ impl BlockLookups { seen_timestamp: Duration, cx: &SyncNetworkContext, mut lookup: SingleBlockLookup, + request_id: R::RequestIdType, ) -> Result, LookupRequestError> { let response_type = R::response_type(); let log = self.log.clone(); let expected_block_root = lookup.block_root(); - let request_state = R::request_state_mut(&mut lookup); + let request_state = R::request_state_mut(&mut lookup, request_id) + .ok_or(LookupRequestError::UnknownRequest)?; match request_state.verify_response(expected_block_root, response) { Ok(Some(verified_response)) => { @@ -342,6 +382,7 @@ impl BlockLookups { BlockProcessType::SingleBlock { id: lookup.id }, verified_response, &mut lookup, + request_id, )?; } Ok(None) => {} @@ -358,7 +399,7 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); request_state.register_failure_downloading(); - lookup.request_block_and_blobs(cx)?; + lookup.request_all_components(cx)?; } } Ok(lookup) @@ -371,11 +412,13 @@ impl BlockLookups { process_type: BlockProcessType, verified_response: R::VerifiedResponseType, lookup: &mut SingleBlockLookup, + request_id: R::RequestIdType, ) -> Result<(), LookupRequestError> { let id = lookup.id; let block_root = lookup.block_root(); - R::request_state_mut(lookup) + R::request_state_mut(lookup, request_id) + .ok_or(LookupRequestError::UnknownRequest)? .get_state_mut() .component_downloaded = true; @@ -412,10 +455,11 @@ impl BlockLookups { .state .register_failure_downloading(); } - lookup.request_block_and_blobs(cx)?; + lookup.request_all_components(cx)?; } CachedChild::NotRequired => R::send_reconstructed_for_processing( id, + request_id, self, block_root, R::verified_to_reconstructed(block_root, verified_response), @@ -428,7 +472,7 @@ impl BlockLookups { "block_root" => ?block_root ); lookup.handle_consistency_failure(cx); - lookup.request_block_and_blobs(cx)?; + lookup.request_all_components(cx)?; } } Ok(()) @@ -440,6 +484,7 @@ impl BlockLookups { fn get_parent_lookup>( &mut self, id: SingleLookupReqId, + request_id: R::RequestIdType, ) -> Option> { let mut parent_lookup = if let Some(pos) = self .parent_lookups @@ -451,7 +496,8 @@ impl BlockLookups { return None; }; - if R::request_state_mut(&mut parent_lookup.current_parent_request) + // TODO: Should just return None if request retrieval fails? + if R::request_state_mut(&mut parent_lookup.current_parent_request, request_id)? .get_state() .req_counter != id.req_counter @@ -466,12 +512,13 @@ impl BlockLookups { pub fn parent_lookup_response>( &mut self, id: SingleLookupReqId, + request_id: R::RequestIdType, peer_id: PeerId, response: Option, seen_timestamp: Duration, cx: &SyncNetworkContext, ) { - let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { + let Some(mut parent_lookup) = self.get_parent_lookup::(id, request_id) else { if response.is_some() { debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); } @@ -484,6 +531,7 @@ impl BlockLookups { seen_timestamp, cx, &mut parent_lookup, + request_id, ) { Ok(()) => { self.parent_lookups.push(parent_lookup); @@ -508,8 +556,9 @@ impl BlockLookups { seen_timestamp: Duration, cx: &SyncNetworkContext, parent_lookup: &mut ParentLookup, + request_id: R::RequestIdType, ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(response, &mut self.failed_chains) { + match parent_lookup.verify_response::(response, &mut self.failed_chains, request_id) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -519,6 +568,7 @@ impl BlockLookups { }, verified_response, &mut parent_lookup.current_parent_request, + request_id, )?; } Ok(None) => {} @@ -541,6 +591,8 @@ impl BlockLookups { | ParentVerifyError::NotEnoughBlobsReturned | ParentVerifyError::ExtraBlocksReturned | ParentVerifyError::UnrequestedBlobId + | ParentVerifyError::InvalidInclusionProof + | ParentVerifyError::UnrequestedHeader | ParentVerifyError::ExtraBlobsReturned | ParentVerifyError::InvalidIndex(_) => { let e = e.into(); @@ -569,6 +621,9 @@ impl BlockLookups { "bbroot_failed_chains", ); } + ParentVerifyError::UnknownRequest => { + // Internal fault, what to do? + } } Ok(()) } @@ -609,6 +664,9 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } + RequestError::UnknownRequest => { + // Internal fault, should never happen? + } } } @@ -638,13 +696,14 @@ impl BlockLookups { /// An RPC error has occurred during a parent lookup. This function handles this case. pub fn parent_lookup_failed>( &mut self, - id: SingleLookupReqId, + lookup_id: SingleLookupReqId, + request_id: R::RequestIdType, peer_id: PeerId, cx: &SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); - let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { + let Some(mut parent_lookup) = self.get_parent_lookup::(lookup_id, request_id) else { debug!(self.log, "RPC failure for a block parent lookup request that was not found"; "peer_id" => %peer_id, @@ -652,8 +711,14 @@ impl BlockLookups { ); return; }; - R::request_state_mut(&mut parent_lookup.current_parent_request) - .register_failure_downloading(); + if let Some(parent_request) = + R::request_state_mut(&mut parent_lookup.current_parent_request, request_id) + { + parent_request.register_failure_downloading(); + } else { + // TODO: Can this case happen? Should downscore peers + todo!("unknwon request id") + } trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg); self.request_parent(parent_lookup, cx); @@ -667,19 +732,24 @@ impl BlockLookups { /// An RPC error has occurred during a single lookup. This function handles this case.\ pub fn single_block_lookup_failed>( &mut self, - id: SingleLookupReqId, + lookup_id: SingleLookupReqId, + request_id: R::RequestIdType, peer_id: &PeerId, cx: &SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); let log = self.log.clone(); - let Some(mut lookup) = self.get_single_lookup::(id) else { + let Some(mut lookup) = self.get_single_lookup::(lookup_id, request_id) else { debug!(log, "Error response to dropped lookup"; "error" => ?error); return; }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(&mut lookup); + let Some(request_state) = R::request_state_mut(&mut lookup, request_id) else { + // TODO: Can this case happen? Should downscore peers + todo!("unknwon request id") + }; + let response_type = R::response_type(); trace!(log, "Single lookup failed"; @@ -688,9 +758,9 @@ impl BlockLookups { "peer_id" => %peer_id, "response_type" => ?response_type ); - let id = id.id; + let id = lookup_id.id; request_state.register_failure_downloading(); - if let Err(e) = lookup.request_block_and_blobs(cx) { + if let Err(e) = lookup.request_all_components(cx) { debug!(self.log, "Single lookup retry failed"; "error" => ?e, @@ -712,13 +782,16 @@ impl BlockLookups { target_id: Id, result: BlockProcessingResult, cx: &mut SyncNetworkContext, + request_id: R::RequestIdType, ) { let Some(mut lookup) = self.single_block_lookups.remove(&target_id) else { return; }; let root = lookup.block_root(); - let request_state = R::request_state_mut(&mut lookup); + let Some(request_state) = R::request_state_mut(&mut lookup, request_id) else { + todo!("downscore peer for unknown request?"); + }; let Ok(peer_id) = request_state.get_state().processing_peer() else { return; @@ -736,7 +809,7 @@ impl BlockLookups { trace!(self.log, "Single block processing succeeded"; "block" => %root); } AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { - match self.handle_missing_components::(cx, &mut lookup) { + match self.handle_missing_components::(cx, &mut lookup, request_id) { Ok(()) => { self.single_block_lookups.insert(target_id, lookup); } @@ -782,8 +855,12 @@ impl BlockLookups { &self, cx: &SyncNetworkContext, lookup: &mut SingleBlockLookup, + request_id: R::RequestIdType, ) -> Result<(), LookupRequestError> { - let request_state = R::request_state_mut(lookup); + let Some(request_state) = R::request_state_mut(lookup, request_id) else { + // Should never happen, and the processing request is done internally + return Err(LookupRequestError::UnknownRequest); + }; request_state.get_state_mut().component_processed = true; if lookup.both_components_processed() { @@ -794,7 +871,7 @@ impl BlockLookups { .blob_request_state .state .register_failure_processing(); - lookup.request_block_and_blobs(cx)?; + lookup.request_all_components(cx)?; } Ok(()) } @@ -824,7 +901,7 @@ impl BlockLookups { let slot = block.slot(); let parent_root = block.parent_root(); lookup.add_child_components(block.into()); - lookup.request_block_and_blobs(cx)?; + lookup.request_all_components(cx)?; self.search_parent(slot, root, parent_root, peer_id, cx); } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { @@ -849,12 +926,12 @@ impl BlockLookups { .blob_request_state .state .register_failure_downloading(); - lookup.request_block_and_blobs(cx)? + lookup.request_all_components(cx)? } AvailabilityCheckErrorCategory::Malicious => { warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); lookup.handle_availability_check_failure(cx); - lookup.request_block_and_blobs(cx)? + lookup.request_all_components(cx)? } }, other => { @@ -871,7 +948,7 @@ impl BlockLookups { .block_request_state .state .register_failure_processing(); - lookup.request_block_and_blobs(cx)? + lookup.request_all_components(cx)? } } } @@ -943,7 +1020,7 @@ impl BlockLookups { .register_failure_processing(); match parent_lookup .current_parent_request - .request_block_and_blobs(cx) + .request_all_components(cx) { Ok(()) => self.parent_lookups.push(parent_lookup), Err(e) => self.handle_parent_request_error(&mut parent_lookup, cx, e.into()), @@ -1055,7 +1132,7 @@ impl BlockLookups { "chain_hash" => ?chain_hash ); child_lookup.handle_consistency_failure(cx); - if let Err(e) = child_lookup.request_block_and_blobs(cx) { + if let Err(e) = child_lookup.request_all_components(cx) { debug!(self.log, "Failed to request block and blobs, dropping lookup"; "error" => ?e @@ -1181,7 +1258,7 @@ impl BlockLookups { "error" => ?e ); lookup.handle_consistency_failure(cx); - if let Err(e) = lookup.request_block_and_blobs(cx) { + if let Err(e) = lookup.request_all_components(cx) { debug!(self.log, "Failed to request block and blobs, dropping lookup"; "error" => ?e @@ -1286,6 +1363,41 @@ impl BlockLookups { } } + fn send_data_column_for_processing( + &self, + block_root: Hash256, + data_column: Arc>, + duration: Duration, + process_type: BlockProcessType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + match cx.beacon_processor_if_enabled() { + Some(beacon_processor) => { + trace!(self.log, "Sending data columns for processing"; "block" => ?block_root, "process_type" => ?process_type); + if let Err(e) = + beacon_processor.send_rpc_data_column(data_column, duration, process_type) + { + error!( + self.log, + "Failed to send sync data columns to processor"; + "error" => ?e + ); + Err(LookupRequestError::SendFailed( + "beacon processor send failure", + )) + } else { + Ok(()) + } + } + None => { + trace!(self.log, "Dropping data columns ready for processing. Beacon processor not available"; "block_root" => %block_root); + Err(LookupRequestError::SendFailed( + "beacon processor unavailable", + )) + } + } + } + /// Attempts to request the next unknown parent. This method handles peer scoring and dropping /// the lookup in the event of failure. fn request_parent(&mut self, mut parent_lookup: ParentLookup, cx: &SyncNetworkContext) { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 5c2e90b48c9..997ec1ace56 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -37,9 +37,12 @@ pub enum ParentVerifyError { NotEnoughBlobsReturned, ExtraBlocksReturned, UnrequestedBlobId, + InvalidInclusionProof, + UnrequestedHeader, ExtraBlobsReturned, InvalidIndex(u64), PreviousFailure { parent_root: Hash256 }, + UnknownRequest, } #[derive(Debug, PartialEq, Eq)] @@ -53,6 +56,7 @@ pub enum RequestError { cannot_process: bool, }, NoPeers, + UnknownRequest, } impl ParentLookup { @@ -96,7 +100,7 @@ impl ParentLookup { } self.current_parent_request - .request_block_and_blobs(cx) + .request_all_components(cx) .map_err(Into::into) } @@ -189,9 +193,11 @@ impl ParentLookup { &mut self, block: Option, failed_chains: &mut lru_cache::LRUTimeCache, + request_id: R::RequestIdType, ) -> Result, ParentVerifyError> { let expected_block_root = self.current_parent_request.block_root(); - let request_state = R::request_state_mut(&mut self.current_parent_request); + let request_state = R::request_state_mut(&mut self.current_parent_request, request_id) + .ok_or(ParentVerifyError::UnknownRequest)?; let root_and_verified = request_state.verify_response(expected_block_root, block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should @@ -243,6 +249,8 @@ impl From for ParentVerifyError { E::NoBlockReturned => ParentVerifyError::NoBlockReturned, E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned, E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId, + E::InvalidInclusionProof => ParentVerifyError::InvalidInclusionProof, + E::UnrequestedHeader => ParentVerifyError::UnrequestedHeader, E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned, @@ -259,6 +267,7 @@ impl From for RequestError { } E::NoPeers => RequestError::NoPeers, E::SendFailed(msg) => RequestError::SendFailed(msg), + E::UnknownRequest => RequestError::UnknownRequest, } } } @@ -286,6 +295,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", + RequestError::UnknownRequest => "unknown_request", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 989bfab00f0..c6aeea88e35 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -4,20 +4,21 @@ use crate::sync::block_lookups::Id; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ - AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs, + compute_custody_requirements, compute_sample_requirements, AvailabilityCheckError, + ChildComponents, DataAvailabilityChecker, MissingBlobs, MissingDataColumns, }; -use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}; use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerAction; use slog::{trace, Logger}; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::EthSpec; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{DataColumnSidecar, EthSpec, Slot}; #[derive(Debug, PartialEq, Eq)] pub enum State { @@ -32,6 +33,8 @@ pub enum LookupVerifyError { NoBlockReturned, ExtraBlocksReturned, UnrequestedBlobId, + InvalidInclusionProof, + UnrequestedHeader, ExtraBlobsReturned, NotEnoughBlobsReturned, InvalidIndex(u64), @@ -46,12 +49,15 @@ pub enum LookupRequestError { }, NoPeers, SendFailed(&'static str), + UnknownRequest, } pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, + pub columns_request_state: ColumnsRequestState, + pub da_checker: Arc>, /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` /// because any blocks or blobs without parents won't hit the data availability cache. @@ -67,10 +73,12 @@ impl SingleBlockLookup { id: Id, ) -> Self { let is_deneb = da_checker.is_deneb(); + Self { id, block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(requested_block_root, peers, is_deneb), + columns_request_state: ColumnsRequestState::UnknownSlot, da_checker, child_components, } @@ -89,6 +97,7 @@ impl SingleBlockLookup { /// Update the requested block, this should only be used in a chain of parent lookups to request /// the next parent. pub fn update_requested_parent_block(&mut self, block_root: Hash256) { + // TODO: What's this code doing, and should be updated for sampling? self.block_request_state.requested_block_root = block_root; self.block_request_state.state.state = State::AwaitingDownload; self.blob_request_state.state.state = State::AwaitingDownload; @@ -110,7 +119,7 @@ impl SingleBlockLookup { /// downloaded the block and/or blobs already and will not send requests if so. It will also /// inspect the request state or blocks and blobs to ensure we are not already processing or /// downloading the block and/or blobs. - pub fn request_block_and_blobs( + pub fn request_all_components( &mut self, cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { @@ -125,6 +134,63 @@ impl SingleBlockLookup { self.blob_request_state .build_request_and_send(self.id, cx)?; } + + // Can't request columns until known the slot for this request + // Note: `da_checker` has access to this node's NodeID and custody_requirements, but they + // could also be computed from network globals + // TODO: Cache the custody columns once the slot is known + if let ColumnsRequestState::UnknownSlot = self.columns_request_state { + if let Some(slot) = self.block_slot() { + // `custody_columns` are all columns we need to download for this specific slot, + // regardless of the DA checker state. `get_custody_config` is just a convenience + // getter to read our node's ID. De-duplication of requests is done below. + let custody_columns = + compute_custody_requirements(slot, self.da_checker.get_custody_config()); + let sample_columns = compute_sample_requirements(slot); + + let peers_by_custody = cx.peers_by_custody_at_slot(slot); + + let requests = custody_columns + .iter() + .chain(sample_columns.iter()) + .collect::>() + .into_iter() + .map(|index| { + ColumnRequestState::new( + DataColumnIdentifier { + block_root: self.block_root(), + index: *index, + }, + // Populate column requests with all peers known at the moment that have + // custody of our column of interest + peers_by_custody.get(index).unwrap_or(&vec![]), + ) + }) + .collect::>(); + + self.columns_request_state = ColumnsRequestState::KnownSlot { + custody_columns, + sample_columns, + requests, + }; + } + } + + if let ColumnsRequestState::KnownSlot { requests, .. } = &mut self.columns_request_state { + for item in requests.iter_mut() { + // TODO(das) do not send requests if columns are already received from gossip + // and are in the DA checker. + // Note: `build_request_and_send` does not send the request if it's already in + // downloading state i.e. without the TODO above, each column should be + // downloaded twice max. + match item.build_request_and_send(self.id, cx) { + Ok(_) => {} + Err(LookupRequestError::NoPeers) => {} // Ok expected, wait for peers on subnet + Err(e) => return Err(e), + } + } + } + Ok(()) } @@ -144,6 +210,12 @@ impl SingleBlockLookup { return CachedChild::DownloadIncomplete; } + // TODO(das): consider column custody and sampling. I don't want to duplicate logic, and + // it's odd that child component download has different paths than regular lookups. I + // feels like this can be simplified by unifying both paths. The whole child block + // business is regular download that should be delayed for processing until latter when + // all parents are known. + match RpcBlock::new_from_fixed( self.block_request_state.requested_block_root, block.clone(), @@ -180,6 +252,7 @@ impl SingleBlockLookup { downloaded_block, downloaded_blobs, downloaded_data_columns, + .. } = components; if let Some(block) = downloaded_block { existing_components.merge_block(block); @@ -204,6 +277,19 @@ impl SingleBlockLookup { } } + /// Add a peer that costudies a set of columns at this lookup's slot. Matching peers for the + /// columns of interest are added into that column index pool of peers to fetch + pub fn add_custody_peer(&mut self, peer_id: &PeerId, peer_custody_columns: &[ColumnIndex]) { + // TODO: O(n^2) complexity, optimize + if let ColumnsRequestState::KnownSlot { requests, .. } = &mut self.columns_request_state { + for request in requests.iter_mut() { + if peer_custody_columns.contains(&request.requested_id.index) { + request.state.add_peer(peer_id); + } + } + } + } + /// Returns true if the block has already been downloaded. pub fn both_components_downloaded(&self) -> bool { self.block_request_state.state.component_downloaded @@ -238,7 +324,7 @@ impl SingleBlockLookup { .is_err(); if block_peer_disconnected || blob_peer_disconnected { - if let Err(e) = self.request_block_and_blobs(cx) { + if let Err(e) = self.request_all_components(cx) { trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); return true; } @@ -246,10 +332,21 @@ impl SingleBlockLookup { false } + pub(crate) fn block_slot(&self) -> Option { + if let Some(components) = self.child_components.as_ref() { + components + .downloaded_block + .as_ref() + .map(|b| b.message().slot()) + } else { + self.da_checker.block_slot(&self.block_root()) + } + } + /// Returns `true` if the block has already been downloaded. pub(crate) fn block_already_downloaded(&self) -> bool { if let Some(components) = self.child_components.as_ref() { - components.block_exists() + components.downloaded_block.is_some() } else { self.da_checker.has_block(&self.block_root()) } @@ -274,19 +371,26 @@ impl SingleBlockLookup { pub(crate) fn missing_blob_ids(&self) -> MissingBlobs { let block_root = self.block_root(); if let Some(components) = self.child_components.as_ref() { - self.da_checker.get_missing_blob_ids(block_root, components) + self.da_checker.get_missing_blob_ids( + block_root, + &components.downloaded_block, + &Some(components.downloaded_blobs.clone()), + ) } else { - let Some(processing_availability_view) = - self.da_checker.get_processing_components(block_root) - else { - return MissingBlobs::new_without_block(block_root, self.da_checker.is_deneb()); - }; - self.da_checker - .get_missing_blob_ids(block_root, &processing_availability_view) + // TODO(lion): This check is incomplete. The processing cache only reflects blobs that + // are starting to be processed (work event started) and are half way through the + // `process_gossip_blob` routine. Is the complexity of the processing cache justified + // for the rare case a block or blob is downloaded from multiple sources? Gossipsub + // dedups double downloads. Block lookups already track the state of block and blobs + // being downloaded. This feature seems only useful in the rare case a block lookup is + // triggered during a gossip block is in the middle of being processed. + // If that is the usecase, why is this processing-deduplication cache tied to the + // availability view? + self.da_checker.get_missing_blob_ids_with(block_root) } } - /// Penalizes a blob peer if it should have blobs but didn't return them to us. + /// Penalizes a blob peer if it should have blobs but didn't return them to us. pub fn penalize_blob_peer(&mut self, cx: &SyncNetworkContext) { if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() { cx.report_peer( @@ -318,6 +422,15 @@ impl SingleBlockLookup { } } +pub enum ColumnsRequestState { + UnknownSlot, + KnownSlot { + custody_columns: Vec, + sample_columns: Vec, + requests: Vec>, + }, +} + /// The state of the blob request component of a `SingleBlockLookup`. pub struct BlobRequestState { /// The latest picture of which blobs still need to be requested. This includes information @@ -342,6 +455,25 @@ impl BlobRequestState { } } +/// The state of the blob request component of a `SingleBlockLookup`. +pub struct ColumnRequestState { + pub state: SingleLookupRequestState, + pub requested_id: DataColumnIdentifier, + pub pending_chunk: Option>>, + _phantom: PhantomData, +} + +impl ColumnRequestState { + pub fn new(requested_id: DataColumnIdentifier, peer_source: &[PeerId]) -> Self { + Self { + state: SingleLookupRequestState::new(peer_source), + requested_id, + pending_chunk: None, + _phantom: PhantomData, + } + } +} + /// The state of the block request component of a `SingleBlockLookup`. pub struct BlockRequestState { pub requested_block_root: Hash256, @@ -432,6 +564,10 @@ impl SingleLookupRequestState { self.state = State::AwaitingDownload; } + pub fn register_successful_download(&mut self, peer_id: PeerId) { + self.state = State::Processing { peer_id } + } + /// The total number of failures, whether it be processing or downloading. pub fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading @@ -534,6 +670,8 @@ mod tests { ChainSpec, EthSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; + const NODE_ID: [u8; 32] = [0; 32]; + fn rand_block() -> SignedBeaconBlock { let mut rng = XorShiftRng::from_seed([42; 16]); SignedBeaconBlock::from_block( @@ -588,6 +726,7 @@ mod tests { None, &[peer_id], da_checker, + PeersByCustody::new(), 1, ); as RequestState>::build_request( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index f81f16dfb57..98b74352379 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -245,6 +245,7 @@ fn test_single_block_lookup_happy_path() { // for processing. bl.single_lookup_response::>( id, + (), peer_id, Some(block.into()), D, @@ -258,11 +259,12 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); + bl.single_lookup_response::>(id, (), peer_id, None, D, &cx); bl.single_block_component_processed::>( id.id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), &mut cx, + (), ); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); @@ -290,7 +292,7 @@ fn test_single_block_lookup_empty_response() { } // The peer does not have the block. It should be penalized. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); + bl.single_lookup_response::>(id, (), peer_id, None, D, &cx); rig.expect_penalty(); rig.expect_lookup_request(response_type); // it should be retried @@ -321,6 +323,7 @@ fn test_single_block_lookup_wrong_response() { let bad_block = rig.rand_block(fork_name); bl.single_lookup_response::>( id, + (), peer_id, Some(bad_block.into()), D, @@ -330,7 +333,7 @@ fn test_single_block_lookup_wrong_response() { rig.expect_lookup_request(response_type); // should be retried // Send the stream termination. This should not produce an additional penalty. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); + bl.single_lookup_response::>(id, (), peer_id, None, D, &cx); rig.expect_empty_network(); } @@ -358,6 +361,7 @@ fn test_single_block_lookup_failure() { // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. bl.single_block_lookup_failed::>( id, + (), &peer_id, &cx, RPCError::UnsupportedProtocol, @@ -391,6 +395,7 @@ fn test_single_block_lookup_becomes_parent_request() { // for processing. bl.single_lookup_response::>( id, + (), peer_id, Some(block.clone()), D, @@ -408,6 +413,7 @@ fn test_single_block_lookup_becomes_parent_request() { id.id, BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), &mut cx, + (), ); assert_eq!(bl.single_block_lookups.len(), 1); rig.expect_parent_request(response_type); @@ -449,6 +455,7 @@ fn test_parent_lookup_happy_path() { // Peer sends the right block, it should be sent for processing. Peer should not be penalized. bl.parent_lookup_response::>( id, + (), peer_id, Some(parent.into()), D, @@ -497,6 +504,7 @@ fn test_parent_lookup_wrong_response() { let bad_block = rig.rand_block(fork_name); bl.parent_lookup_response::>( id1, + (), peer_id, Some(bad_block.into()), D, @@ -506,12 +514,13 @@ fn test_parent_lookup_wrong_response() { let id2 = rig.expect_parent_request(response_type); // Send the stream termination for the first request. This should not produce extra penalties. - bl.parent_lookup_response::>(id1, peer_id, None, D, &cx); + bl.parent_lookup_response::>(id1, (), peer_id, None, D, &cx); rig.expect_empty_network(); // Send the right block this time. bl.parent_lookup_response::>( id2, + (), peer_id, Some(parent.into()), D, @@ -560,13 +569,14 @@ fn test_parent_lookup_empty_response() { } // Peer sends an empty response, peer should be penalized and the block re-requested. - bl.parent_lookup_response::>(id1, peer_id, None, D, &cx); + bl.parent_lookup_response::>(id1, (), peer_id, None, D, &cx); rig.expect_penalty(); let id2 = rig.expect_parent_request(response_type); // Send the right block this time. bl.parent_lookup_response::>( id2, + (), peer_id, Some(parent.into()), D, @@ -617,6 +627,7 @@ fn test_parent_lookup_rpc_failure() { // The request fails. It should be tried again. bl.parent_lookup_failed::>( id1, + (), peer_id, &cx, RPCError::ErrorResponse( @@ -629,6 +640,7 @@ fn test_parent_lookup_rpc_failure() { // Send the right block this time. bl.parent_lookup_response::>( id2, + (), peer_id, Some(parent.into()), D, @@ -681,6 +693,7 @@ fn test_parent_lookup_too_many_attempts() { // The request fails. It should be tried again. bl.parent_lookup_failed::>( id, + (), peer_id, &cx, RPCError::ErrorResponse( @@ -694,6 +707,7 @@ fn test_parent_lookup_too_many_attempts() { let bad_block = rig.rand_block(fork_name); bl.parent_lookup_response::>( id, + (), peer_id, Some(bad_block.into()), D, @@ -708,7 +722,14 @@ fn test_parent_lookup_too_many_attempts() { // block and a stream terminator with the same Id now results in two failed attempts, // I'm unsure if this is how it should behave? // - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); + bl.parent_lookup_response::>( + id, + (), + peer_id, + None, + D, + &cx, + ); rig.expect_penalty(); } } @@ -758,6 +779,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { // The request fails. It should be tried again. bl.parent_lookup_failed::>( id, + (), peer_id, &cx, RPCError::ErrorResponse( @@ -770,6 +792,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { let bad_block = rig.rand_block(fork_name); bl.parent_lookup_response::>( id, + (), peer_id, Some(bad_block.into()), D, @@ -825,6 +848,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { // The request fails. It should be tried again. bl.parent_lookup_failed::>( id, + (), peer_id, &cx, RPCError::ErrorResponse( @@ -846,13 +870,14 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { // send the right parent but fail processing bl.parent_lookup_response::>( id, + (), peer_id, Some(parent.clone()), D, &cx, ); bl.parent_block_processed(block_root, BlockError::InvalidSignature.into(), &mut cx); - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); + bl.parent_lookup_response::>(id, (), peer_id, None, D, &cx); rig.expect_penalty(); } @@ -903,13 +928,14 @@ fn test_parent_lookup_too_deep() { // the block bl.parent_lookup_response::>( id, + (), peer_id, Some(block.clone()), D, &cx, ); // the stream termination - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); + bl.parent_lookup_response::>(id, (), peer_id, None, D, &cx); // the processing request rig.expect_block_process(response_type); // the processing result @@ -973,6 +999,7 @@ fn test_single_block_lookup_ignored_response() { // for processing. bl.single_lookup_response::>( id, + (), peer_id, Some(block.into()), D, @@ -986,12 +1013,13 @@ fn test_single_block_lookup_ignored_response() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); + bl.single_lookup_response::>(id, (), peer_id, None, D, &cx); // Send an Ignored response, the request should be dropped bl.single_block_component_processed::>( id.id, BlockProcessingResult::Ignored, &mut cx, + (), ); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); @@ -1027,6 +1055,7 @@ fn test_parent_lookup_ignored_response() { // Peer sends the right block, it should be sent for processing. Peer should not be penalized. bl.parent_lookup_response::>( id, + (), peer_id, Some(parent.into()), D, @@ -1105,13 +1134,14 @@ fn test_same_chain_race_condition() { // the block bl.parent_lookup_response::>( id, + (), peer_id, Some(block.clone()), D, &cx, ); // the stream termination - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); + bl.parent_lookup_response::>(id, (), peer_id, None, D, &cx); // the processing request rig.expect_block_process(response_type); // the processing result @@ -1320,6 +1350,7 @@ mod deneb_only { let _ = self.unknown_parent_block.insert(block.clone()); self.bl.parent_lookup_response::>( self.parent_block_req_id.expect("parent request id"), + (), self.peer_id, Some(block), D, @@ -1337,6 +1368,7 @@ mod deneb_only { self.bl .parent_lookup_response::>( self.parent_blob_req_id.expect("parent blob request id"), + (), self.peer_id, Some(blob.clone()), D, @@ -1347,6 +1379,7 @@ mod deneb_only { self.bl .parent_lookup_response::>( self.parent_blob_req_id.expect("blob request id"), + (), self.peer_id, None, D, @@ -1371,6 +1404,7 @@ mod deneb_only { self.bl .single_lookup_response::>( self.block_req_id.expect("block request id"), + (), self.peer_id, Some(self.block.clone()), D, @@ -1388,6 +1422,7 @@ mod deneb_only { self.bl .single_lookup_response::>( self.blob_req_id.expect("blob request id"), + (), self.peer_id, Some(blob.clone()), D, @@ -1398,6 +1433,7 @@ mod deneb_only { self.bl .single_lookup_response::>( self.blob_req_id.expect("blob request id"), + (), self.peer_id, None, D, @@ -1423,6 +1459,7 @@ mod deneb_only { self.bl .single_lookup_response::>( self.block_req_id.expect("block request id"), + (), self.peer_id, None, D, @@ -1435,6 +1472,7 @@ mod deneb_only { self.bl .single_lookup_response::>( self.blob_req_id.expect("blob request id"), + (), self.peer_id, None, D, @@ -1446,6 +1484,7 @@ mod deneb_only { fn empty_parent_block_response(mut self) -> Self { self.bl.parent_lookup_response::>( self.parent_block_req_id.expect("block request id"), + (), self.peer_id, None, D, @@ -1458,6 +1497,7 @@ mod deneb_only { self.bl .parent_lookup_response::>( self.parent_blob_req_id.expect("blob request id"), + (), self.peer_id, None, D, @@ -1476,6 +1516,7 @@ mod deneb_only { self.block_root, )), &mut self.cx, + (), ); self.rig.expect_empty_network(); assert_eq!(self.bl.single_block_lookups.len(), 0); @@ -1526,6 +1567,7 @@ mod deneb_only { self.block_req_id.expect("block request id").id, BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), &mut self.cx, + (), ); assert_eq!(self.bl.single_block_lookups.len(), 1); self @@ -1539,6 +1581,7 @@ mod deneb_only { AvailabilityCheckError::KzgVerificationFailed, )), &mut self.cx, + (), ); assert_eq!(self.bl.single_block_lookups.len(), 1); self @@ -1553,6 +1596,7 @@ mod deneb_only { self.block_root, )), &mut self.cx, + (), ); assert_eq!(self.bl.single_block_lookups.len(), 1); self @@ -1567,6 +1611,7 @@ mod deneb_only { self.block_root, )), &mut self.cx, + (), ); assert_eq!(self.bl.single_block_lookups.len(), 1); self diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7fff76dd9eb..035f1c4e046 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,7 +34,7 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; -use super::block_lookups::BlockLookups; +use super::block_lookups::{BlockLookups, ColumnRequestState}; use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -64,7 +64,8 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::data_column_sidecar::ColumnIndex; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -83,6 +84,12 @@ pub struct SingleLookupReqId { pub req_counter: Id, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct SampleReqId { + pub id: Id, + pub column_index: ColumnIndex, +} + /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { @@ -90,6 +97,8 @@ pub enum RequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request sampling a data column of a block + SingleDataColumn { id: SingleLookupReqId, index: u64 }, /// Request searching for a block's parent. The id is the chain, share with the corresponding /// blob id. ParentLookup { id: SingleLookupReqId }, @@ -104,6 +113,8 @@ pub enum RequestId { RangeBlocks { id: Id }, /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, + /// Sample block request + SingleBlockSample { id: SampleReqId }, } #[derive(Debug)] @@ -128,6 +139,13 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + RpcDataColumn { + request_id: RequestId, + peer_id: PeerId, + data_column: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, RpcBlock, Hash256), @@ -159,6 +177,9 @@ pub enum SyncMessage { process_type: BlockProcessType, result: BlockProcessingResult, }, + + /// Request sync to perform PeerDAS 1D sampling of the block's data + SampleBlock { block_root: Hash256, slot: Slot }, } /// The type of processing specified for a received block. @@ -166,6 +187,7 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, + SingleDataColumn { id: Id, index: ColumnIndex }, ParentLookup { chain_hash: Hash256 }, } @@ -297,6 +319,13 @@ impl SyncManager { self.update_sync_state(); } + fn add_custody_peer(&mut self, peer_id: &PeerId) { + if let Some(custody_config) = self.network.get_custody_config_of_peer(peer_id) { + self.block_lookups + .add_custody_peer(peer_id, &custody_config, &mut self.network) + } + } + /// Handles RPC errors related to requests that were emitted from the sync manager. fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { trace!(self.log, "Sync manager received a failed RPC"); @@ -305,6 +334,7 @@ impl SyncManager { self.block_lookups .single_block_lookup_failed::>( id, + (), &peer_id, &self.network, error, @@ -314,15 +344,30 @@ impl SyncManager { self.block_lookups .single_block_lookup_failed::>( id, + (), &peer_id, &self.network, error, ); } + RequestId::SingleDataColumn { id, index } => { + self.block_lookups + .single_block_lookup_failed::>( + id, + index, + &peer_id, + &self.network, + error, + ); + } + RequestId::SingleBlockSample { id } => { + todo!("handle sampling errors"); + } RequestId::ParentLookup { id } => { self.block_lookups .parent_lookup_failed::>( id, + (), peer_id, &self.network, error, @@ -332,6 +377,7 @@ impl SyncManager { self.block_lookups .parent_lookup_failed::>( id, + (), peer_id, &self.network, error, @@ -602,6 +648,7 @@ impl SyncManager { match sync_message { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); + self.add_custody_peer(&peer_id); } SyncMessage::RpcBlock { request_id, @@ -617,6 +664,12 @@ impl SyncManager { blob_sidecar, seen_timestamp, } => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp), + SyncMessage::RpcDataColumn { + request_id, + peer_id, + data_column, + seen_timestamp, + } => self.rpc_data_column_received(request_id, peer_id, data_column, seen_timestamp), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -672,6 +725,7 @@ impl SyncManager { id, result, &mut self.network, + (), ), BlockProcessType::SingleBlob { id } => self .block_lookups @@ -679,7 +733,19 @@ impl SyncManager { id, result, &mut self.network, + (), ), + BlockProcessType::SingleDataColumn { id, index } => self + .block_lookups + .single_block_component_processed::>( + id, + result, + &mut self.network, + index, + ), BlockProcessType::ParentLookup { chain_hash } => self .block_lookups .parent_block_processed(chain_hash, result, &mut self.network), @@ -713,6 +779,15 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &self.network), }, + SyncMessage::SampleBlock { block_root, .. } => { + // search block will attempt to fetch missing samples from the block, plus fetch + // missing custody columns. + // Note: we pass no peers to the lookup, because the block is already downloaded, + // and the rest of components will be fetched from other peers. + // TODO: Use the slot to start sampling already + self.block_lookups + .search_block(block_root, &vec![], &mut self.network); + } } } @@ -836,18 +911,23 @@ impl SyncManager { .block_lookups .single_lookup_response::>( id, + (), peer_id, block, seen_timestamp, &self.network, ), - RequestId::SingleBlob { .. } => { + RequestId::SingleBlob { .. } | RequestId::SingleBlockSample { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } + RequestId::SingleDataColumn { .. } => { + crit!(self.log, "Block received during data column request"; "peer_id" => %peer_id ); + } RequestId::ParentLookup { id } => self .block_lookups .parent_lookup_response::>( id, + (), peer_id, block, seen_timestamp, @@ -913,7 +993,7 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { .. } => { + RequestId::SingleBlock { .. } | RequestId::SingleBlockSample { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } RequestId::SingleBlob { id } => { @@ -927,12 +1007,16 @@ impl SyncManager { self.block_lookups .single_lookup_response::>( id, + (), peer_id, blob, seen_timestamp, &self.network, ) } + RequestId::SingleDataColumn { .. } => { + crit!(self.log, "Single blob received during data column request"; "peer_id" => %peer_id ); + } RequestId::ParentLookup { id: _ } => { crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id ); @@ -948,6 +1032,7 @@ impl SyncManager { self.block_lookups .parent_lookup_response::>( id, + (), peer_id, blob, seen_timestamp, @@ -969,6 +1054,37 @@ impl SyncManager { } } + fn rpc_data_column_received( + &mut self, + request_id: RequestId, + peer_id: PeerId, + data_column: Option>>, + seen_timestamp: Duration, + ) { + match request_id { + RequestId::SingleDataColumn { id, index } => { + if let Some(blob) = data_column.as_ref() { + debug!(self.log, + "Peer returned data_column for single lookup"; + "peer_id" => %peer_id, + ); + } + self.block_lookups + .single_lookup_response::>( + id, + index, + peer_id, + data_column, + seen_timestamp, + &self.network, + ) + } + _ => { + crit!(self.log, "Single data column received during non data column request"; "peer_id" => %peer_id ); + } + } + } + /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. fn range_block_and_blobs_response( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 04feb8fdc2a..8742543d4e9 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,7 @@ //! channel and stores a global RPC ID to perform requests. use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; -use super::manager::{Id, RequestId as SyncRequestId}; +use super::manager::{Id, RequestId as SyncRequestId, SampleReqId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; @@ -10,16 +10,23 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::common::LookupType; use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_availability_checker::{compute_custody_requirements, CustodyConfig}; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; -use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +use lighthouse_network::{ + Client, NetworkGlobals, PeerAction, PeerId, PeerInfo, ReportSource, Request, +}; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::data_column_sidecar::ColumnIndex; +use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; pub struct BlocksAndBlobsByRangeResponse { pub batch_id: BatchId, @@ -84,6 +91,12 @@ impl From>>> for BlockOrBlob { } } +fn custody_config_from_peer_info(peer_info: &PeerInfo) -> Option { + peer_info + .node_id() + .map(|node_id| CustodyConfig::new(node_id.into(), peer_info.custody_requirements())) +} + impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -109,6 +122,34 @@ impl SyncNetworkContext { &self.network_beacon_processor.network_globals } + pub fn get_custody_config_of_peer(&self, peer_id: &PeerId) -> Option { + self.network_globals() + .peers + .read() + .peer_info(peer_id) + .and_then(|peer_info| custody_config_from_peer_info(peer_info)) + } + + pub fn peers_by_custody_at_slot(&self, slot: Slot) -> HashMap> { + let mut peers_by_custody = HashMap::>::new(); + + for (peer_id, peer_info) in self.network_globals().peers.read().peers() { + // Custody config returns None if the ENR of the peer is unknown. However we should + // always know the peer's Node ID + // TODO: Retrieve custody config from Node ID + if let Some(custody_config) = custody_config_from_peer_info(peer_info) { + for column_index in compute_custody_requirements(slot, &custody_config) { + peers_by_custody + .entry(column_index) + .or_default() + .push(*peer_id); + } + } + } + + peers_by_custody + } + /// Returns the Client type of the peer if known pub fn client_type(&self, peer_id: &PeerId) -> Client { self.network_globals() @@ -506,6 +547,38 @@ impl SyncNetworkContext { Ok(()) } + pub fn data_column_lookup_request( + &self, + id: SingleLookupReqId, + column_index: ColumnIndex, + peer_id: PeerId, + request: DataColumnsByRootRequest, + ) -> Result<(), &'static str> { + // TODO(das): why the distinction between current and parent lookups? + let request_id = RequestId::Sync(SyncRequestId::SingleDataColumn { + id: id, + index: column_index, + }); + + debug!( + self.log, + "Sending DataColumnsByRoot Request"; + "request" => ?request, + "peer" => %peer_id, + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::DataColumnsByRoot(request), + request_id, + })?; + Ok(()) + } + + pub fn sampling_request(&self, id: SingleLookupReqId, block_root: Hash256, slot: Slot) { + self.beacon_processor().sampling_request(block_root, slot); + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index ee782c650e2..e0dc31cfd24 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -16,6 +16,7 @@ pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; pub use config::{get_config, get_data_dir, get_slots_per_restore_point, set_network_config}; use environment::RuntimeContext; pub use eth2_config::Eth2Config; +use lighthouse_network::{load_private_key, node_id_from_keypair}; use slasher::{DatabaseBackendOverride, Slasher}; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; @@ -132,8 +133,12 @@ impl ProductionBeaconNode { builder }; + // Read networking private key for PeerDAS custody + let enr_local_keypair = load_private_key(&client_config.network, &log); + let node_id = node_id_from_keypair(enr_local_keypair)?; + let builder = builder - .beacon_chain_builder(client_genesis, client_config.clone()) + .beacon_chain_builder(client_genesis, client_config.clone(), node_id) .await?; let builder = if client_config.sync_eth1_chain && !client_config.dummy_eth1_backend { info!( diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index a6fc4c56745..a64bd6d1e77 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -6,6 +6,7 @@ use crate::{ use bls::Signature; use derivative::Derivative; use kzg::{KzgCommitment, KzgProof}; +use merkle_proof::MerkleTreeError; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use safe_arith::ArithError; @@ -152,6 +153,11 @@ impl DataColumnSidecar { self.signed_block_header.message.tree_hash_root() } + pub fn verify_inclusion_proof(&self) -> Result { + // TODO(das): Verify proof + Ok(true) + } + pub fn min_size() -> usize { // min size is one cell Self { @@ -215,8 +221,6 @@ impl From for DataColumnSidecarError { pub type DataColumnSidecarList = VariableList>, ::DataColumnCount>; -pub type FixedDataColumnSidecarList = - FixedVector>>, ::DataColumnCount>; #[cfg(test)] mod test { diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 5a42b323895..ea95d7b9109 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,5 +1,5 @@ //! Identifies each data column subnet by an integer identifier. -use crate::{ChainSpec, EthSpec}; +use crate::EthSpec; use ethereum_types::U256; use safe_arith::{ArithError, SafeArith}; use serde::{Deserialize, Serialize}; @@ -43,16 +43,20 @@ impl DataColumnSubnetId { Ok(id.into()) } + pub fn as_usize(&self) -> usize { + self.0 as usize + } + #[allow(clippy::arithmetic_side_effects)] /// Compute required subnets to subscribe to given the node id. /// TODO(das): Add epoch param /// TODO(das): Add num of subnets (from ENR) pub fn compute_subnets_for_data_column( node_id: U256, - spec: &ChainSpec, + custody_requirement: u64, ) -> impl Iterator { let num_of_column_subnets = T::data_column_subnet_count() as u64; - (0..spec.custody_requirement) + (0..custody_requirement) .map(move |i| { let node_offset = (node_id % U256::from(num_of_column_subnets)).as_u64(); node_offset.saturating_add(i) % num_of_column_subnets