Skip to content

Commit

Permalink
Merge parent and current sync lookups (#5655)
Browse files Browse the repository at this point in the history
* Drop lookup type trait for a simple arg

* Drop reconstructed for processing

* Send parent blocks one by one

* Merge current and parent lookups

* Merge current and parent lookups clean up todos

* Merge current and parent lookups tests

* Merge remote-tracking branch 'origin/unstable' into sync-merged-lookup

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into sync-merged-lookup

* fix compile after merge

* #5655 pr review (#26)

* fix compile after merge

* remove todos, fix typos etc

* fix compile

* stable rng

* delete TODO and unfilled out test

* make download result a struct

* enums instead of bools as params

* fix comment

* Various fixes

* Track ignored child components

* Track dropped lookup reason as metric

* fix test

* add comment describing behavior of avail check error

*  update ordering
  • Loading branch information
dapplion authored Apr 30, 2024
1 parent 196d9fd commit ce66582
Show file tree
Hide file tree
Showing 17 changed files with 1,594 additions and 2,464 deletions.
16 changes: 8 additions & 8 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,14 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
});
}

// Kzg verification for gossip blob sidecar
let kzg = chain
.kzg
.as_ref()
.ok_or(GossipBlobError::KzgNotInitialized)?;
let kzg_verified_blob = KzgVerifiedBlob::new(blob_sidecar.clone(), kzg, seen_timestamp)
.map_err(GossipBlobError::KzgError)?;

chain
.observed_slashable
.write()
Expand Down Expand Up @@ -605,14 +613,6 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
});
}

// Kzg verification for gossip blob sidecar
let kzg = chain
.kzg
.as_ref()
.ok_or(GossipBlobError::KzgNotInitialized)?;
let kzg_verified_blob = KzgVerifiedBlob::new(blob_sidecar, kzg, seen_timestamp)
.map_err(GossipBlobError::KzgError)?;

Ok(GossipVerifiedBlob {
block_root,
blob: kzg_verified_blob,
Expand Down
147 changes: 28 additions & 119 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg
use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
};
pub use crate::data_availability_checker::child_components::ChildComponents;
use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache;
use crate::{BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use slasher::test_utils::E;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
use ssz_types::FixedVector;
use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
Expand All @@ -19,7 +16,6 @@ use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};

mod child_components;
mod error;
mod overflow_lru_cache;
mod state_lru_cache;
Expand Down Expand Up @@ -94,68 +90,27 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.has_block(block_root)
}

pub fn get_missing_blob_ids_with(&self, block_root: Hash256) -> MissingBlobs {
/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
pub fn num_expected_blobs(&self, block_root: &Hash256) -> Option<usize> {
self.availability_cache
.with_pending_components(&block_root, |pending_components| match pending_components {
Some(pending_components) => self.get_missing_blob_ids(
block_root,
pending_components
.get_cached_block()
.as_ref()
.map(|b| b.as_block()),
&pending_components.verified_blobs,
),
None => MissingBlobs::new_without_block(block_root, self.is_deneb()),
.peek_pending_components(block_root, |components| {
components.and_then(|components| components.num_expected_blobs())
})
}

/// If there's no block, all possible ids will be returned that don't exist in the given blobs.
/// If there no blobs, all possible ids will be returned.
pub fn get_missing_blob_ids<V>(
&self,
block_root: Hash256,
block: Option<&SignedBeaconBlock<T::EthSpec>>,
blobs: &FixedVector<Option<V>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
) -> MissingBlobs {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
self.log,
"Failed to read slot clock when checking for missing blob ids"
);
return MissingBlobs::BlobsNotRequired;
};

let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());

if self.da_check_required_for_epoch(current_epoch) {
match block {
Some(cached_block) => {
let block_commitments_len = cached_block
.message()
.body()
.blob_kzg_commitments()
.map(|v| v.len())
.unwrap_or(0);
let blob_ids = blobs
/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
/// component for `block_root`.
pub fn imported_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| {
components
.get_cached_blobs()
.iter()
.take(block_commitments_len)
.enumerate()
.filter_map(|(index, blob_commitment_opt)| {
blob_commitment_opt.is_none().then_some(BlobIdentifier {
block_root,
index: index as u64,
})
})
.collect();
MissingBlobs::KnownMissing(blob_ids)
}
None => {
MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::<E>(block_root))
}
}
} else {
MissingBlobs::BlobsNotRequired
}
.filter_map(|blob| blob.as_ref().map(|blob| blob.blob_index()))
.collect::<Vec<_>>()
})
})
}

/// Get a blob from the availability cache.
Expand Down Expand Up @@ -351,6 +306,18 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}

pub fn da_check_required_for_current_epoch(&self) -> bool {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
self.log,
"Failed to read slot clock when checking for missing blob ids"
);
return false;
};

self.da_check_required_for_epoch(current_slot.epoch(T::EthSpec::slots_per_epoch()))
}

/// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch.
pub fn is_deneb(&self) -> bool {
self.slot_clock.now().map_or(false, |slot| {
Expand Down Expand Up @@ -544,61 +511,3 @@ impl<E: EthSpec> MaybeAvailableBlock<E> {
}
}
}

#[derive(Debug, Clone)]
pub enum MissingBlobs {
/// We know for certain these blobs are missing.
KnownMissing(Vec<BlobIdentifier>),
/// We think these blobs might be missing.
PossibleMissing(Vec<BlobIdentifier>),
/// Blobs are not required.
BlobsNotRequired,
}

impl MissingBlobs {
pub fn new_without_block(block_root: Hash256, is_deneb: bool) -> Self {
if is_deneb {
MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::<E>(block_root))
} else {
MissingBlobs::BlobsNotRequired
}
}
pub fn is_empty(&self) -> bool {
match self {
MissingBlobs::KnownMissing(v) => v.is_empty(),
MissingBlobs::PossibleMissing(v) => v.is_empty(),
MissingBlobs::BlobsNotRequired => true,
}
}
pub fn contains(&self, blob_id: &BlobIdentifier) -> bool {
match self {
MissingBlobs::KnownMissing(v) => v.contains(blob_id),
MissingBlobs::PossibleMissing(v) => v.contains(blob_id),
MissingBlobs::BlobsNotRequired => false,
}
}
pub fn remove(&mut self, blob_id: &BlobIdentifier) {
match self {
MissingBlobs::KnownMissing(v) => v.retain(|id| id != blob_id),
MissingBlobs::PossibleMissing(v) => v.retain(|id| id != blob_id),
MissingBlobs::BlobsNotRequired => {}
}
}
pub fn indices(&self) -> Vec<u64> {
match self {
MissingBlobs::KnownMissing(v) => v.iter().map(|id| id.index).collect(),
MissingBlobs::PossibleMissing(v) => v.iter().map(|id| id.index).collect(),
MissingBlobs::BlobsNotRequired => vec![],
}
}
}

impl Into<Vec<BlobIdentifier>> for MissingBlobs {
fn into(self) -> Vec<BlobIdentifier> {
match self {
MissingBlobs::KnownMissing(v) => v,
MissingBlobs::PossibleMissing(v) => v,
MissingBlobs::BlobsNotRequired => vec![],
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum Error {
SlotClockError,
}

#[derive(PartialEq, Eq)]
pub enum ErrorCategory {
/// Internal Errors (not caused by peers)
Internal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}
}

pub fn with_pending_components<R, F: FnOnce(Option<&PendingComponents<T::EthSpec>>) -> R>(
pub fn peek_pending_components<R, F: FnOnce(Option<&PendingComponents<T::EthSpec>>) -> R>(
&self,
block_root: &Hash256,
f: F,
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ lazy_static! {
"sync_parent_block_lookups",
"Number of parent block lookups underway"
);
pub static ref SYNC_LOOKUP_DROPPED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_lookups_dropped_total",
"Total count of sync lookups dropped by reason",
&["reason"]
);

/*
* Block Delay Metrics
Expand Down
37 changes: 0 additions & 37 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum ChainSegmentProcessId {
RangeBatchId(ChainId, Epoch),
/// Processing ID for a backfill syncing batch.
BackSyncBatchId(Epoch),
/// Processing Id of the parent lookup of a block.
ParentLookup(Hash256),
}

/// Returned when a chain segment import fails.
Expand Down Expand Up @@ -396,41 +394,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
// this is a parent lookup request from the sync manager
ChainSegmentProcessId::ParentLookup(chain_head) => {
debug!(
self.log, "Processing parent lookup";
"chain_hash" => %chain_head,
"blocks" => downloaded_blocks.len()
);
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match self
.process_blocks(downloaded_blocks.iter().rev(), notify_execution_layer)
.await
{
(imported_blocks, Err(e)) => {
debug!(self.log, "Parent lookup failed"; "error" => %e.message);
match e.peer_action {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks: imported_blocks > 0,
penalty,
},
None => BatchProcessResult::NonFaultyFailure,
}
}
(imported_blocks, Ok(_)) => {
debug!(
self.log, "Parent lookup processed successfully";
"chain_hash" => %chain_head,
"imported_blocks" => imported_blocks
);
BatchProcessResult::Success {
was_non_empty: imported_blocks > 0,
}
}
}
}
};

self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ impl TestRig {
block_root,
RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()),
std::time::Duration::default(),
BlockProcessType::ParentLookup {
chain_hash: Hash256::random(),
},
BlockProcessType::SingleBlock { id: 0 },
)
.unwrap();
}
Expand Down
Loading

0 comments on commit ce66582

Please sign in to comment.