Skip to content

Commit

Permalink
Merge of #5620
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 24, 2024
2 parents 992fbd1 + e0b862b commit f6c269d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 230 deletions.
63 changes: 22 additions & 41 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,12 @@ pub enum LookupType {
Parent,
}

/// This trait helps differentiate `SingleBlockLookup`s from `ParentLookup`s .This is useful in
/// ensuring requests and responses are handled separately and enables us to use different failure
/// tolerances for each, while re-using the same basic request and retry logic.
pub trait Lookup {
const MAX_ATTEMPTS: u8;
fn lookup_type() -> LookupType;
fn max_attempts() -> u8 {
Self::MAX_ATTEMPTS
}
}

/// A `Lookup` that is a part of a `ParentLookup`.
pub struct Parent;

impl Lookup for Parent {
const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE;
fn lookup_type() -> LookupType {
LookupType::Parent
}
}

/// A `Lookup` that part of a single block lookup.
pub struct Current;

impl Lookup for Current {
const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
fn lookup_type() -> LookupType {
LookupType::Current
impl LookupType {
fn max_attempts(&self) -> u8 {
match self {
LookupType::Current => SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
LookupType::Parent => PARENT_FAIL_TOLERANCE,
}
}
}

Expand All @@ -68,7 +46,7 @@ impl Lookup for Current {
/// The use of the `ResponseType` associated type gives us a degree of type
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// state.
pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
pub trait RequestState<T: BeaconChainTypes> {
/// The type of the request .
type RequestType;

Expand All @@ -81,9 +59,12 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/* Request building methods */

/// Construct a new request.
fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
fn build_request(
&mut self,
lookup_type: LookupType,
) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
// Verify and construct request.
self.too_many_attempts()?;
self.too_many_attempts(lookup_type)?;
let peer = self.get_peer()?;
let request = self.new_request();
Ok((peer, request))
Expand All @@ -93,6 +74,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn build_request_and_send(
&mut self,
id: Id,
lookup_type: LookupType,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Check if request is necessary.
Expand All @@ -101,7 +83,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
}

// Construct request.
let (peer_id, request) = self.build_request()?;
let (peer_id, request) = self.build_request(lookup_type)?;

// Update request state.
let req_counter = self.get_state_mut().on_download_start(peer_id);
Expand All @@ -110,17 +92,16 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
let id = SingleLookupReqId {
id,
req_counter,
lookup_type: L::lookup_type(),
lookup_type,
};
Self::make_request(id, peer_id, request, cx)
}

/// Verify the current request has not exceeded the maximum number of attempts.
fn too_many_attempts(&self) -> Result<(), LookupRequestError> {
let max_attempts = L::max_attempts();
fn too_many_attempts(&self, lookup_type: LookupType) -> Result<(), LookupRequestError> {
let request_state = self.get_state();

if request_state.failed_attempts() >= max_attempts {
if request_state.failed_attempts() >= lookup_type.max_attempts() {
let cannot_process = request_state.more_failed_processing_attempts();
Err(LookupRequestError::TooManyAttempts { cannot_process })
} else {
Expand Down Expand Up @@ -187,7 +168,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn response_type() -> ResponseType;

/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self;
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;

/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState;
Expand All @@ -196,7 +177,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState;
}

impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L> {
impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState {
type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type ReconstructedResponseType = RpcBlock<T::EthSpec>;
Expand Down Expand Up @@ -253,7 +234,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
fn response_type() -> ResponseType {
ResponseType::Block
}
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.block_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
Expand All @@ -264,7 +245,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
}
}

impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L, T::EthSpec> {
impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
Expand Down Expand Up @@ -328,7 +309,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
fn response_type() -> ResponseType {
ResponseType::Blob
}
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.blob_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
Expand Down
40 changes: 19 additions & 21 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ use beacon_chain::data_availability_checker::{
};
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::Current;
pub use common::Lookup;
pub use common::Parent;
pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
Expand Down Expand Up @@ -55,12 +52,12 @@ pub struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,

processing_parent_lookups: HashMap<Hash256, (Vec<Hash256>, SingleBlockLookup<Parent, T>)>,
processing_parent_lookups: HashMap<Hash256, (Vec<Hash256>, SingleBlockLookup<T>)>,

/// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUTimeCache<Hash256>,

single_block_lookups: FnvHashMap<Id, SingleBlockLookup<Current, T>>,
single_block_lookups: FnvHashMap<Id, SingleBlockLookup<T>>,

pub(crate) da_checker: Arc<DataAvailabilityChecker<T>>,

Expand Down Expand Up @@ -131,7 +128,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Attempts to trigger the request matching the given `block_root`.
pub fn trigger_single_lookup(
&mut self,
mut single_block_lookup: SingleBlockLookup<Current, T>,
mut single_block_lookup: SingleBlockLookup<T>,
cx: &mut SyncNetworkContext<T>,
) {
let block_root = single_block_lookup.block_root();
Expand All @@ -147,7 +144,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Adds a lookup to the `single_block_lookups` map.
pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup<Current, T>) {
pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup<T>) {
self.single_block_lookups
.insert(single_block_lookup.id, single_block_lookup);

Expand Down Expand Up @@ -212,6 +209,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peers,
self.da_checker.clone(),
cx.next_id(),
LookupType::Current,
);

debug!(
Expand Down Expand Up @@ -284,10 +282,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Get a single block lookup by its ID. This method additionally ensures the `req_counter`
/// matches the current `req_counter` for the lookup. This ensures any stale responses from requests
/// that have been retried are ignored.
fn get_single_lookup<R: RequestState<Current, T>>(
fn get_single_lookup<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
) -> Option<SingleBlockLookup<Current, T>> {
) -> Option<SingleBlockLookup<T>> {
let mut lookup = self.single_block_lookups.remove(&id.id)?;

let request_state = R::request_state_mut(&mut lookup);
Expand All @@ -314,7 +312,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Process a block or blob response received from a single lookup request.
pub fn single_lookup_response<R: RequestState<Current, T>>(
pub fn single_lookup_response<R: RequestState<T>>(
&mut self,
lookup_id: SingleLookupReqId,
peer_id: PeerId,
Expand Down Expand Up @@ -345,7 +343,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"response_type" => ?response_type,
);

match self.handle_verified_response::<Current, R>(
match self.handle_verified_response::<R>(
seen_timestamp,
cx,
BlockProcessType::SingleBlock { id: lookup.id },
Expand All @@ -372,13 +370,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean
/// the lookup is dropped.
fn handle_verified_response<L: Lookup, R: RequestState<L, T>>(
fn handle_verified_response<R: RequestState<T>>(
&self,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
process_type: BlockProcessType,
verified_response: R::VerifiedResponseType,
lookup: &mut SingleBlockLookup<L, T>,
lookup: &mut SingleBlockLookup<T>,
) -> Result<(), LookupRequestError> {
let id = lookup.id;
let block_root = lookup.block_root();
Expand All @@ -389,7 +387,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// If we have an outstanding parent request for this block, delay sending the response until
// all parent blocks have been processed, otherwise we will fail validation with an
// `UnknownParent`.
let delay_send = match L::lookup_type() {
let delay_send = match lookup.lookup_type {
LookupType::Parent => false,
LookupType::Current => self.has_pending_parent_request(lookup.block_root()),
};
Expand Down Expand Up @@ -453,7 +451,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Get a parent block lookup by its ID. This method additionally ensures the `req_counter`
/// matches the current `req_counter` for the lookup. This any stale responses from requests
/// that have been retried are ignored.
fn get_parent_lookup<R: RequestState<Parent, T>>(
fn get_parent_lookup<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
) -> Option<ParentLookup<T>> {
Expand All @@ -479,7 +477,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Process a response received from a parent lookup request.
pub fn parent_lookup_response<R: RequestState<Parent, T>>(
pub fn parent_lookup_response<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
Expand Down Expand Up @@ -523,7 +521,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/// Consolidates error handling for `parent_lookup_response`. An `Err` here should always mean
/// the lookup is dropped.
fn parent_lookup_response_inner<R: RequestState<Parent, T>>(
fn parent_lookup_response_inner<R: RequestState<T>>(
&mut self,
peer_id: PeerId,
response: R::VerifiedResponseType,
Expand Down Expand Up @@ -554,7 +552,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

self.handle_verified_response::<Parent, R>(
self.handle_verified_response::<R>(
seen_timestamp,
cx,
BlockProcessType::ParentLookup {
Expand Down Expand Up @@ -633,7 +631,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// An RPC error has occurred during a parent lookup. This function handles this case.
pub fn parent_lookup_failed<R: RequestState<Parent, T>>(
pub fn parent_lookup_failed<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
peer_id: &PeerId,
Expand Down Expand Up @@ -669,7 +667,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// An RPC error has occurred during a single lookup. This function handles this case.\
pub fn single_block_lookup_failed<R: RequestState<Current, T>>(
pub fn single_block_lookup_failed<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
peer_id: &PeerId,
Expand Down Expand Up @@ -717,7 +715,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/* Processing responses */

pub fn single_block_component_processed<R: RequestState<Current, T>>(
pub fn single_block_component_processed<R: RequestState<T>>(
&mut self,
target_id: Id,
result: BlockProcessingResult<T::EthSpec>,
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/network/src/sync/block_lookups/parent_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::common::LookupType;
use super::single_block_lookup::{LookupRequestError, SingleBlockLookup};
use super::{DownloadedBlock, PeerId};
use crate::sync::block_lookups::common::Parent;
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock;
Expand All @@ -24,7 +24,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownloadedBlock<T::EthSpec>>,
/// Request of the last parent.
pub current_parent_request: SingleBlockLookup<Parent, T>,
pub current_parent_request: SingleBlockLookup<T>,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -55,6 +55,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
&[peer_id],
da_checker,
cx.next_id(),
LookupType::Parent,
);

Self {
Expand Down Expand Up @@ -132,7 +133,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Hash256,
VecDeque<RpcBlock<T::EthSpec>>,
Vec<Hash256>,
SingleBlockLookup<Parent, T>,
SingleBlockLookup<T>,
) {
let ParentLookup {
chain_hash,
Expand Down
Loading

0 comments on commit f6c269d

Please sign in to comment.