Skip to content

Commit

Permalink
Merge current and parent lookups clean up todos
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Apr 26, 2024
1 parent 611c37a commit 5d29618
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 418 deletions.
145 changes: 28 additions & 117 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ use crate::block_verification_types::{
use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache;
use crate::{BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use slasher::test_utils::E;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
use ssz_types::FixedVector;
use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -91,68 +89,27 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.has_block(block_root)
}

pub fn get_missing_blob_ids_with(&self, block_root: Hash256) -> MissingBlobs {
/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
pub fn num_expected_blobs(&self, block_root: &Hash256) -> Option<usize> {
self.availability_cache
.with_pending_components(&block_root, |pending_components| match pending_components {
Some(pending_components) => self.get_missing_blob_ids(
block_root,
pending_components
.get_cached_block()
.as_ref()
.map(|b| b.as_block()),
&pending_components.verified_blobs,
),
None => MissingBlobs::new_without_block(block_root, self.is_deneb()),
.peek_pending_components(block_root, |components| {
components.and_then(|components| components.num_expected_blobs())
})
}

/// If there's no block, all possible ids will be returned that don't exist in the given blobs.
/// If there no blobs, all possible ids will be returned.
pub fn get_missing_blob_ids<V>(
&self,
block_root: Hash256,
block: Option<&SignedBeaconBlock<T::EthSpec>>,
blobs: &FixedVector<Option<V>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
) -> MissingBlobs {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
self.log,
"Failed to read slot clock when checking for missing blob ids"
);
return MissingBlobs::BlobsNotRequired;
};

let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());

if self.da_check_required_for_epoch(current_epoch) {
match block {
Some(cached_block) => {
let block_commitments_len = cached_block
.message()
.body()
.blob_kzg_commitments()
.map(|v| v.len())
.unwrap_or(0);
let blob_ids = blobs
/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
/// component for `block_root`.
pub fn imported_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| {
components
.get_cached_blobs()
.iter()
.take(block_commitments_len)
.enumerate()
.filter_map(|(index, blob_commitment_opt)| {
blob_commitment_opt.is_none().then_some(BlobIdentifier {
block_root,
index: index as u64,
})
})
.collect();
MissingBlobs::KnownMissing(blob_ids)
}
None => {
MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::<E>(block_root))
}
}
} else {
MissingBlobs::BlobsNotRequired
}
.filter_map(|blob| blob.as_ref().map(|blob| blob.blob_index()))
.collect::<Vec<_>>()
})
})
}

/// Get a blob from the availability cache.
Expand Down Expand Up @@ -338,6 +295,18 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}

pub fn da_check_required_for_current_epoch(&self) -> bool {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
self.log,
"Failed to read slot clock when checking for missing blob ids"
);
return false;
};

self.da_check_required_for_epoch(current_slot.epoch(T::EthSpec::slots_per_epoch()))
}

/// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch.
pub fn is_deneb(&self) -> bool {
self.slot_clock.now().map_or(false, |slot| {
Expand Down Expand Up @@ -523,61 +492,3 @@ impl<E: EthSpec> MaybeAvailableBlock<E> {
}
}
}

#[derive(Debug, Clone)]
pub enum MissingBlobs {
/// We know for certain these blobs are missing.
KnownMissing(Vec<BlobIdentifier>),
/// We think these blobs might be missing.
PossibleMissing(Vec<BlobIdentifier>),
/// Blobs are not required.
BlobsNotRequired,
}

impl MissingBlobs {
pub fn new_without_block(block_root: Hash256, is_deneb: bool) -> Self {
if is_deneb {
MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::<E>(block_root))
} else {
MissingBlobs::BlobsNotRequired
}
}
pub fn is_empty(&self) -> bool {
match self {
MissingBlobs::KnownMissing(v) => v.is_empty(),
MissingBlobs::PossibleMissing(v) => v.is_empty(),
MissingBlobs::BlobsNotRequired => true,
}
}
pub fn contains(&self, blob_id: &BlobIdentifier) -> bool {
match self {
MissingBlobs::KnownMissing(v) => v.contains(blob_id),
MissingBlobs::PossibleMissing(v) => v.contains(blob_id),
MissingBlobs::BlobsNotRequired => false,
}
}
pub fn remove(&mut self, blob_id: &BlobIdentifier) {
match self {
MissingBlobs::KnownMissing(v) => v.retain(|id| id != blob_id),
MissingBlobs::PossibleMissing(v) => v.retain(|id| id != blob_id),
MissingBlobs::BlobsNotRequired => {}
}
}
pub fn indices(&self) -> Vec<u64> {
match self {
MissingBlobs::KnownMissing(v) => v.iter().map(|id| id.index).collect(),
MissingBlobs::PossibleMissing(v) => v.iter().map(|id| id.index).collect(),
MissingBlobs::BlobsNotRequired => vec![],
}
}
}

impl Into<Vec<BlobIdentifier>> for MissingBlobs {
fn into(self) -> Vec<BlobIdentifier> {
match self {
MissingBlobs::KnownMissing(v) => v,
MissingBlobs::PossibleMissing(v) => v,
MissingBlobs::BlobsNotRequired => vec![],
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}
}

pub fn with_pending_components<R, F: FnOnce(Option<&PendingComponents<T::EthSpec>>) -> R>(
pub fn peek_pending_components<R, F: FnOnce(Option<&PendingComponents<T::EthSpec>>) -> R>(
&self,
block_root: &Hash256,
f: F,
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{
get_block_root, BlobError, BlockError, ExecutionPayloadError, ExecutionPendingBlock,
GossipVerifiedBlock, IntoExecutionPendingBlock, IntoGossipVerifiedBlockContents,
PayloadVerificationOutcome, PayloadVerificationStatus,
get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock,
IntoExecutionPendingBlock, IntoGossipVerifiedBlockContents, PayloadVerificationOutcome,
PayloadVerificationStatus,
};
pub use block_verification_types::AvailabilityPendingExecutedBlock;
pub use block_verification_types::ExecutedBlock;
Expand Down
59 changes: 36 additions & 23 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ pub enum ResponseType {
Blob,
}

/// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
/// is further back than the most recent head slot.
Expand All @@ -45,27 +43,46 @@ pub trait RequestState<T: BeaconChainTypes> {
/// The type created after validation.
type VerifiedResponseType: Clone;

/// Potentially makes progress on this request if it's in a progress-able state
fn continue_request(
&mut self,
id: Id,
awaiting_parent: bool,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
if let Some(peer_id) = Self::get_state_mut(self).maybe_start_download()? {
// Attempt to progress awaiting downloads
if self.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = self.get_state();
// TODO: Okay to use `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS` for both current and parent
// lookups now? It not trivial to identify what is a "parent lookup" now.
let request_state = self.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

// Make request
return self.make_request(id, peer_id, cx);
}
if let Some(result) = Self::get_state_mut(self).maybe_start_processing() {
return Self::send_for_processing(id, result, cx);
let peer_id = self
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request was made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
}

// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent {
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = self.get_state_mut().maybe_start_processing() {
return Self::send_for_processing(id, result, cx);
}
}

Ok(())
}

Expand All @@ -74,8 +91,9 @@ pub trait RequestState<T: BeaconChainTypes> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>;
) -> Result<bool, LookupRequestError>;

/* Response handling methods */

Expand Down Expand Up @@ -113,8 +131,9 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
&self,
id: SingleLookupId,
peer_id: PeerId,
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<bool, LookupRequestError> {
cx.block_lookup_request(
id,
peer_id,
Expand All @@ -129,7 +148,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {

fn send_for_processing(
id: SingleLookupId,
(block, block_root, seen_timestamp): DownloadResult<Self::VerifiedResponseType>,
(block, block_root, seen_timestamp, _): DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.send_block_for_processing(
Expand Down Expand Up @@ -163,20 +182,14 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// TODO: Use cx to figure out which blobs are still to be downloaded
// - Check against the current cached block in the blocks response the required num of blobs
// - Check against da checker if there's a blob how many we need
// - Check against da checker if there are some blobs already downloaded

) -> Result<bool, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
BlobsByRootSingleBlockRequest {
block_root: self.block_root,
indices: self.requested_ids.indices(),
},
self.block_root,
downloaded_block_expected_blobs,
)
.map_err(LookupRequestError::SendFailed)
}
Expand All @@ -191,7 +204,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {

fn send_for_processing(
id: Id,
(verified, block_root, seen_timestamp): DownloadResult<Self::VerifiedResponseType>,
(verified, block_root, seen_timestamp, _): DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.send_blobs_for_processing(
Expand Down
Loading

0 comments on commit 5d29618

Please sign in to comment.