Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP DAS sampling + custody playground #22

Draft
wants to merge 17 commits into
base: das
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,10 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_caches == CheckCaches::Yes {
self.beacon_chain
.data_availability_checker
.get_block(&root)
.reqresp_pre_import_cache
.read()
.get(&root)
.map(|block| block.clone())
.or(self.beacon_chain.early_attester_cache.get_block(root))
} else {
None
Expand Down
74 changes: 67 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ pub type BeaconStore<T> = Arc<
>,
>;

/// Cache gossip verified blocks to serve over ReqResp before they are imported
type ReqRespPreImportCache<E> = HashMap<Hash256, Arc<SignedBeaconBlock<E>>>;

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
Expand Down Expand Up @@ -463,6 +466,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// Cache gossip verified blocks to serve over ReqResp before they are imported
pub reqresp_pre_import_cache: Arc<RwLock<ReqRespPreImportCache<T::EthSpec>>>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
Expand Down Expand Up @@ -2929,8 +2934,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.data_availability_checker
.notify_gossip_blob(blob.slot(), block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -2987,14 +2990,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
}

pub async fn process_rpc_data_column(
self: &Arc<Self>,
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let block_root = data_column.block_root();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

// TODO(das) emit data_column SEE event

let r = self
.check_rpc_data_column_availability_and_import(data_column)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand All @@ -3005,7 +3030,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.data_availability_checker.remove_notified(block_root);
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}
Expand All @@ -3018,8 +3043,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.data_availability_checker
.notify_block(block_root, unverified_block.block_cloned());
self.reqresp_pre_import_cache
.write()
.insert(block_root, unverified_block.block_cloned());

let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
Expand Down Expand Up @@ -3286,6 +3313,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

async fn check_rpc_data_column_availability_and_import(
self: &Arc<Self>,
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let block_root = data_column.block_root();
let slot = data_column.slot();

// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
let header = &data_column.signed_block_header;
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}

let availability = self
.data_availability_checker
.put_rpc_data_column(block_root, data_column)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down
25 changes: 22 additions & 3 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::beacon_chain::{
CanonicalHead, LightClientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY,
};
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::data_availability_checker::DataAvailabilityChecker;
use crate::data_availability_checker::{CustodyConfig, DataAvailabilityChecker, NodeIdRaw};
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::eth1_finalization_cache::Eth1FinalizationCache;
use crate::fork_choice_signal::ForkChoiceSignalTx;
Expand Down Expand Up @@ -104,6 +104,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
trusted_setup: Option<TrustedSetup>,
task_executor: Option<TaskExecutor>,
validator_monitor_config: Option<ValidatorMonitorConfig>,
node_id: Option<NodeIdRaw>,
}

impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
Expand Down Expand Up @@ -145,6 +146,7 @@ where
trusted_setup: None,
task_executor: None,
validator_monitor_config: None,
node_id: None,
}
}

Expand Down Expand Up @@ -687,6 +689,11 @@ where
self
}

pub fn node_id(mut self, node_id: NodeIdRaw) -> Self {
self.node_id = Some(node_id);
self
}

/// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied.
///
/// An error will be returned at runtime if all required parameters have not been configured.
Expand Down Expand Up @@ -880,6 +887,7 @@ where
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let custody_requirement = self.chain_config.custody_requirement;

// Calculate the weak subjectivity point in which to backfill blocks to.
let genesis_backfill_slot = if self.chain_config.genesis_backfill {
Expand Down Expand Up @@ -971,6 +979,7 @@ where
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
reqresp_pre_import_cache: <_>::default(),
light_client_server_cache: LightClientServerCache::new(),
light_client_server_tx: self.light_client_server_tx,
shutdown_sender: self
Expand All @@ -982,8 +991,18 @@ where
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, &log, self.spec)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
DataAvailabilityChecker::new(
slot_clock,
kzg.clone(),
store,
&log,
self.spec,
CustodyConfig::new(
self.node_id.ok_or("Cannot build without a node ID")?,
custody_requirement,
),
)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
),
kzg,
block_production_state: Arc::new(Mutex::new(None)),
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct ChainConfig {
pub epochs_per_migration: u64,
/// When set to true Light client server computes and caches state proofs for serving updates
pub enable_light_client_server: bool,
/// Columns to custody for PeerDAS, minimum 2
pub custody_requirement: u64,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -117,6 +119,7 @@ impl Default for ChainConfig {
progressive_balances_mode: ProgressiveBalancesMode::Fast,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
enable_light_client_server: false,
custody_requirement: 2,
}
}
}
Expand Down
Loading
Loading