Skip to content

Commit

Permalink
Move sync lookup trait function to its caller (sigp#5704)
Browse files Browse the repository at this point in the history
* Move sync lookup trait function to its caller

* lint
  • Loading branch information
dapplion authored May 6, 2024
1 parent b87c36a commit 5135a77
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 64 deletions.
55 changes: 1 addition & 54 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
Expand All @@ -26,11 +24,6 @@ pub enum ResponseType {
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;

/// Wrapper around bool to prevent mixing this argument with `BlockIsProcessed`
pub(crate) struct AwaitingParent(pub bool);
/// Wrapper around bool to prevent mixing this argument with `AwaitingParent`
pub(crate) struct BlockIsProcessed(pub bool);

/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
Expand All @@ -43,52 +36,6 @@ pub trait RequestState<T: BeaconChainTypes> {
/// The type created after validation.
type VerifiedResponseType: Clone;

/// Potentially makes progress on this request if it's in a progress-able state
fn continue_request(
&mut self,
id: Id,
awaiting_parent: AwaitingParent,
downloaded_block_expected_blobs: Option<usize>,
block_is_processed: BlockIsProcessed,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Attempt to progress awaiting downloads
if self.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = self.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let peer_id = self
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request needs to be made
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
self.get_state_mut().on_download_start()?;
} else {
self.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent.0
&& (block_is_processed.0 || matches!(Self::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = self.get_state_mut().maybe_start_processing() {
return Self::send_for_processing(id, result, cx);
}
}

Ok(())
}

/// Request the network context to prepare a request of a component of `block_root`. If the
/// request is not necessary because the component is already known / processed, return false.
/// Return true if it sent a request and we can expect an event back from the network.
Expand Down
50 changes: 40 additions & 10 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::common::{AwaitingParent, BlockIsProcessed};
use super::{BlockComponent, PeerId};
use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}

/// Wrapper around `RequestState::continue_request` to inject lookup data
/// Potentially makes progress on this request if it's in a progress-able state
pub fn continue_request<R: RequestState<T>>(
&mut self,
cx: &mut SyncNetworkContext<T>,
Expand All @@ -163,13 +163,43 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.peek_downloaded_data()
.map(|block| block.num_expected_blobs());
let block_is_processed = self.block_request_state.state.is_processed();
R::request_state_mut(self).continue_request(
id,
AwaitingParent(awaiting_parent),
downloaded_block_expected_blobs,
BlockIsProcessed(block_is_processed),
cx,
)
let request = R::request_state_mut(self);

// Attempt to progress awaiting downloads
if request.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
let request_state = request.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let peer_id = request
.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request needs to be made
if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
request.get_state_mut().on_download_start()?;
} else {
request.get_state_mut().on_completed_request()?;
}

// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent
&& (block_is_processed || matches!(R::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {
return R::send_for_processing(id, result, cx);
}
}

Ok(())
}

/// Add peer to all request states. The peer must be able to serve this request.
Expand Down

0 comments on commit 5135a77

Please sign in to comment.