From 3b662ded8f044ba51af48dc3ee1ea2184447fd81 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 23 Apr 2024 15:14:17 +0900 Subject: [PATCH] Send parent blocks one by one --- .../child_components.rs | 7 + .../network_beacon_processor/sync_methods.rs | 37 -- .../network/src/sync/block_lookups/mod.rs | 369 +++++++++--------- .../src/sync/block_lookups/parent_lookup.rs | 85 ++-- .../sync/block_lookups/single_block_lookup.rs | 45 ++- .../network/src/sync/block_lookups/tests.rs | 265 ++++++++----- beacon_node/network/src/sync/manager.rs | 5 +- 7 files changed, 419 insertions(+), 394 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs index 184dfc45001..b33e90bbe1d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs @@ -66,4 +66,11 @@ impl ChildComponents { pub fn clear_blobs(&mut self) { self.downloaded_blobs = FixedBlobSidecarList::default(); } + + pub fn downloaded_blobs_count(&self) -> usize { + self.downloaded_blobs + .iter() + .filter(|blob| blob.is_some()) + .count() + } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 887974c6e0b..daa9a2cf197 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -33,8 +33,6 @@ pub enum ChainSegmentProcessId { RangeBatchId(ChainId, Epoch), /// Processing ID for a backfill syncing batch. BackSyncBatchId(Epoch), - /// Processing Id of the parent lookup of a block. - ParentLookup(Hash256), } /// Returned when a chain segment import fails. @@ -396,41 +394,6 @@ impl NetworkBeaconProcessor { } } } - // this is a parent lookup request from the sync manager - ChainSegmentProcessId::ParentLookup(chain_head) => { - debug!( - self.log, "Processing parent lookup"; - "chain_hash" => %chain_head, - "blocks" => downloaded_blocks.len() - ); - // parent blocks are ordered from highest slot to lowest, so we need to process in - // reverse - match self - .process_blocks(downloaded_blocks.iter().rev(), notify_execution_layer) - .await - { - (imported_blocks, Err(e)) => { - debug!(self.log, "Parent lookup failed"; "error" => %e.message); - match e.peer_action { - Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: imported_blocks > 0, - penalty, - }, - None => BatchProcessResult::NonFaultyFailure, - } - } - (imported_blocks, Ok(_)) => { - debug!( - self.log, "Parent lookup processed successfully"; - "chain_hash" => %chain_head, - "imported_blocks" => imported_blocks - ); - BatchProcessResult::Success { - was_non_empty: imported_blocks > 0, - } - } - } - } }; self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 5e369d376da..5091536cfcd 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,10 +1,8 @@ use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; use super::network_context::{LookupFailure, LookupVerifyError}; -use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; -use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::common::LookupType; use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError}; use crate::sync::block_lookups::single_block_lookup::{CachedChild, LookupRequestError}; @@ -23,7 +21,6 @@ use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; -use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::Duration; use store::Hash256; @@ -36,8 +33,6 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type DownloadedBlock = (Hash256, RpcBlock); - const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; @@ -52,8 +47,6 @@ pub struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, - processing_parent_lookups: HashMap, SingleBlockLookup)>, - /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, @@ -69,7 +62,6 @@ impl BlockLookups { pub fn new(da_checker: Arc>, log: Logger) -> Self { Self { parent_lookups: Default::default(), - processing_parent_lookups: Default::default(), failed_chains: LRUTimeCache::new(Duration::from_secs( FAILED_CHAINS_CACHE_EXPIRY_SECONDS, )), @@ -80,8 +72,11 @@ impl BlockLookups { } #[cfg(test)] - pub(crate) fn active_single_lookups(&self) -> Vec { - self.single_block_lookups.keys().cloned().collect() + pub(crate) fn active_single_lookups(&self) -> Vec<(Id, Hash256)> { + self.single_block_lookups + .iter() + .map(|(id, e)| (*id, e.block_root())) + .collect() } #[cfg(test)] @@ -169,6 +164,7 @@ impl BlockLookups { .iter_mut() .find(|(_id, lookup)| lookup.is_for_block(block_root)) { + trace!(self.log, "Adding peer to existing single block lookup"; "block_root" => %block_root); lookup.add_peers(peers); if let Some(components) = child_components { lookup.add_child_components(components); @@ -187,16 +183,6 @@ impl BlockLookups { return; } - if self - .processing_parent_lookups - .values() - .any(|(hashes, _last_parent_request)| hashes.contains(&block_root)) - { - // we are already processing this block, ignore it. - trace!(self.log, "Already processing block in a parent request"; "block_root" => ?block_root); - return; - } - let msg = if child_components.is_some() { "Searching for components of a block with unknown parent" } else { @@ -250,20 +236,6 @@ impl BlockLookups { return; } - if self - .processing_parent_lookups - .iter() - .any(|(chain_hash, (hashes, _peers))| { - chain_hash == &block_root - || hashes.contains(&block_root) - || hashes.contains(&parent_root) - }) - { - // we are already processing this block, ignore it. - debug!(self.log, "Already processing parent block"; - "block_root" => ?block_root, "parent_root" => ?parent_root); - return; - } let parent_lookup = ParentLookup::new( block_root, parent_root, @@ -392,7 +364,9 @@ impl BlockLookups { LookupType::Current => self.has_pending_parent_request(lookup.block_root()), }; - if !delay_send { + if delay_send { + debug!(self.log, "Delaying import of response"; "block_root" => %lookup.block_root(), "lookup_type" => ?lookup.lookup_type, "type" => ?R::response_type()); + } else { R::request_state_mut(lookup) .get_state_mut() .on_download_success() @@ -406,7 +380,9 @@ impl BlockLookups { )? } } - CachedChild::DownloadIncomplete => { + CachedChild::DownloadIncomplete(reason) => { + debug!(self.log, "CacheChild download incomplete"; "block_root" => %lookup.block_root(), "lookup_type" => ?lookup.lookup_type, "type" => ?R::response_type(), "reason" => reason); + R::request_state_mut(lookup) .get_state_mut() .on_download_success() @@ -441,6 +417,63 @@ impl BlockLookups { Ok(()) } + /// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean + /// the lookup is dropped. + fn handle_parent_lookup_cached_child( + &self, + seen_timestamp: Duration, + cx: &mut SyncNetworkContext, + process_type: BlockProcessType, + lookup: &mut SingleBlockLookup, + cached_child: CachedChild, + ) -> Result<(), LookupRequestError> { + let block_root = lookup.block_root(); + match cached_child { + CachedChild::Ok(block) => { + // If we have an outstanding parent request for this block, delay sending the response until + // all parent blocks have been processed, otherwise we will fail validation with an + // `UnknownParent`. + let delay_send = match lookup.lookup_type { + LookupType::Parent => false, + LookupType::Current => self.has_pending_parent_request(block_root), + }; + + if !delay_send { + self.send_block_for_processing( + block_root, + block, + seen_timestamp, + process_type, + cx, + )? + } + } + CachedChild::DownloadIncomplete(_) => { + // If this was the result of a block request, we can't determine if the block peer + // did anything wrong. If we already had both a block and blobs response processed, + // we should penalize the blobs peer because they did not provide all blobs on the + // initial request. + if lookup.both_components_downloaded() { + lookup.penalize_blob_peer(cx); + lookup.blob_request_state.state.on_download_failure(); + } + lookup.request_block_and_blobs(cx)?; + } + CachedChild::NotRequired => { + warn!(self.log, "Child not cached for parent lookup"; "block_root" => ?block_root); + } + CachedChild::Err(e) => { + warn!(self.log, "Consistency error in cached block"; + "error" => ?e, + "block_root" => ?block_root, + ); + lookup.handle_consistency_failure(cx); + lookup.request_block_and_blobs(cx)?; + } + } + Ok(()) + } + /// Get a parent block lookup by its ID. This method additionally ensures the `req_counter` /// matches the current `req_counter` for the lookup. This any stale responses from requests /// that have been retried are ignored. @@ -595,7 +628,9 @@ impl BlockLookups { // processed. Drop the request without extra penalty } RequestError::BadState(..) => { - warn!(self.log, "Failed to request parent"; "error" => e.as_static()); + warn!(self.log, "Request parent on bad state"; "error" => e.as_static()); + #[cfg(test)] + panic!("bad state"); } } } @@ -608,6 +643,10 @@ impl BlockLookups { let should_drop_lookup = req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); + if should_drop_lookup { + debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); + } + !should_drop_lookup }); @@ -618,7 +657,7 @@ impl BlockLookups { .position(|req| req.check_peer_disconnected(peer_id).is_err()) { let parent_lookup = self.parent_lookups.remove(pos); - debug!(self.log, "Dropping parent lookup after peer disconnected"; &parent_lookup); + debug!(self.log, "Dropping parent lookup after peer disconnected"; "chain_hash" => %parent_lookup.chain_hash()); self.request_parent(parent_lookup, cx); } } @@ -725,7 +764,15 @@ impl BlockLookups { let peer_id = match request_state.get_state().processing_peer() { Ok(peer_id) => peer_id, Err(e) => { - debug!(self.log, "Attempting to process single block lookup in bad state"; "id" => target_id, "response_type" => ?R::response_type(), "error" => e); + // TODO: This log will fire when processing the child block that triggered a parent + // lookup. If the result is Ok(Imported) or Err(BlockIsAlreadyKnown), there's no + // effect since the lookup is dropped anyway. However, for other variants there may + // be inconsistencies. The problem lies on the lookup transitioning from a child + // components into a regular lookup. + warn!(self.log, "Attempting to process single block lookup in bad state"; "id" => target_id, "response_type" => ?R::response_type(), "error" => e); + #[cfg(test)] + panic!("bad state"); + #[cfg(not(test))] return; } }; @@ -940,38 +987,35 @@ impl BlockLookups { } } BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => { - parent_lookup.add_unknown_parent_block(block); + parent_lookup.add_unknown_parent_block(block, self.da_checker.clone(), cx); self.request_parent(parent_lookup, cx); } BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown(_)) => { - let (chain_hash, blocks, hashes, block_request) = - parent_lookup.parts_for_processing(); - - let blocks = self.add_child_block_to_chain(chain_hash, blocks, cx).into(); - - let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); - - // Check if the beacon processor is available - let Some(beacon_processor) = cx.beacon_processor_if_enabled() else { - return trace!( - self.log, - "Dropping parent chain segment that was ready for processing."; - "chain_hash" => %chain_hash, - ); - }; - - match beacon_processor.send_chain_segment(process_id, blocks) { - Ok(_) => { - self.processing_parent_lookups - .insert(chain_hash, (hashes, block_request)); - } - Err(e) => { - error!( - self.log, - "Failed to send chain segment to processor"; - "error" => ?e - ); + // Send the next block in the parent chain with + let chain_completed = parent_lookup.pop_completed_parent_request(); + + if chain_completed { + debug!(self.log, "Parent lookup chain ancestors imported"; "chain_hash" => %chain_hash); + // MUST recover the single lookup that triggered the chain + self.send_child_of_parent_chain_for_processing(chain_hash, cx); + // Do nothing, drop parent lookup + } else { + debug!(self.log, "Parent lookup chain importing ancestor"; "chain_hash" => %chain_hash); + let cached_child = parent_lookup + .current_parent_request + .get_cached_child_block(); + match self.handle_parent_lookup_cached_child( + timestamp_now(), + cx, + BlockProcessType::ParentLookup { chain_hash }, + &mut parent_lookup.current_parent_request, + cached_child, + ) { + Ok(()) => self.parent_lookups.push(parent_lookup), + Err(e) => { + self.handle_parent_request_error(&mut parent_lookup, cx, e.into()) + } } } } @@ -998,6 +1042,8 @@ impl BlockLookups { "Parent block processing was ignored, cpu might be overloaded"; "action" => "dropping parent request" ); + // Drop the trigger single block lookup, otherwise it will get stuck + self.drop_single_lookup(chain_hash); } } @@ -1007,60 +1053,6 @@ impl BlockLookups { ); } - /// Find the child block that spawned the parent lookup request and add it to the chain - /// to send for processing. - fn add_child_block_to_chain( - &mut self, - chain_hash: Hash256, - mut blocks: VecDeque>, - cx: &mut SyncNetworkContext, - ) -> VecDeque> { - // Find the child block that spawned the parent lookup request and add it to the chain - // to send for processing. - if let Some(child_lookup_id) = self - .single_block_lookups - .iter() - .find_map(|(id, lookup)| (lookup.block_root() == chain_hash).then_some(*id)) - { - let Some(child_lookup) = self.single_block_lookups.get_mut(&child_lookup_id) else { - debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); - return blocks; - }; - match child_lookup.get_cached_child_block() { - CachedChild::Ok(rpc_block) => { - // Insert this block at the front. This order is important because we later check - // for linear roots in `filter_chain_segment` - blocks.push_front(rpc_block); - } - CachedChild::DownloadIncomplete => { - trace!(self.log, "Parent lookup chain complete, awaiting child response"; "chain_hash" => ?chain_hash); - } - CachedChild::NotRequired => { - warn!(self.log, "Child not cached for parent lookup"; "chain_hash" => %chain_hash); - } - CachedChild::Err(e) => { - warn!( - self.log, - "Consistency error in child block triggering chain or parent lookups"; - "error" => ?e, - "chain_hash" => ?chain_hash - ); - child_lookup.handle_consistency_failure(cx); - if let Err(e) = child_lookup.request_block_and_blobs(cx) { - debug!(self.log, - "Failed to request block and blobs, dropping lookup"; - "error" => ?e - ); - self.single_block_lookups.remove(&child_lookup_id); - } - } - } - } else { - debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); - }; - blocks - } - /// Handle the peer scoring, retries, and logging related to a `BlockError` returned from /// processing a block + blobs for a parent lookup. fn handle_parent_block_error( @@ -1074,6 +1066,9 @@ impl BlockLookups { Ok(peer_id) => peer_id, Err(e) => { warn!(self.log, "Parent lookup in bad state"; "chain_hash" => %parent_lookup.chain_hash(), "error" => e); + #[cfg(test)] + panic!("bad state"); + #[cfg(not(test))] return; } }; @@ -1118,92 +1113,65 @@ impl BlockLookups { self.request_parent(parent_lookup, cx); } - pub fn parent_chain_processed( + pub fn send_child_of_parent_chain_for_processing( &mut self, chain_hash: Hash256, - result: BatchProcessResult, cx: &mut SyncNetworkContext, ) { - let Some((_hashes, request)) = self.processing_parent_lookups.remove(&chain_hash) else { - return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash, "result" => ?result); + let Some(id) = self + .single_block_lookups + .iter() + .find_map(|(id, req)| (req.block_root() == chain_hash).then_some(*id)) + else { + warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); + return; }; - debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); - match result { - BatchProcessResult::Success { .. } => { - let Some(id) = self - .single_block_lookups - .iter() - .find_map(|(id, req)| (req.block_root() == chain_hash).then_some(*id)) - else { - warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); - return; - }; - - let Some(lookup) = self.single_block_lookups.get_mut(&id) else { - warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); - return; - }; + let Some(lookup) = self.single_block_lookups.get_mut(&id) else { + warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); + return; + }; - match lookup.get_cached_child_block() { - CachedChild::Ok(rpc_block) => { - // This is the correct block, send it for processing - if self - .send_block_for_processing( - chain_hash, - rpc_block, - timestamp_now(), - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(&id); - } - } - CachedChild::DownloadIncomplete => { - trace!(self.log, "Parent chain complete, awaiting child response"; "chain_hash" => %chain_hash); - } - CachedChild::NotRequired => { - warn!(self.log, "Child not cached for parent lookup"; "chain_hash" => %chain_hash); - } - CachedChild::Err(e) => { - warn!( - self.log, - "Consistency error in child block triggering parent lookup"; - "chain_hash" => %chain_hash, - "error" => ?e - ); - lookup.handle_consistency_failure(cx); - if let Err(e) = lookup.request_block_and_blobs(cx) { - debug!(self.log, - "Failed to request block and blobs, dropping lookup"; - "error" => ?e - ); - self.single_block_lookups.remove(&id); - } - } + match lookup.get_cached_child_block() { + CachedChild::Ok(rpc_block) => { + // This is the correct block, send it for processing + if self + .send_block_for_processing( + chain_hash, + rpc_block, + timestamp_now(), + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(&id); } } - BatchProcessResult::FaultyFailure { - imported_blocks: _, - penalty, - } => { - self.failed_chains.insert(chain_hash); - for peer_source in request.all_used_peers() { - cx.report_peer(*peer_source, penalty, "parent_chain_failure") - } + CachedChild::DownloadIncomplete(reason) => { + debug!(self.log, "Parent chain complete, awaiting child response"; "chain_hash" => %chain_hash, "reason" => reason); } - BatchProcessResult::NonFaultyFailure => { - // We might request this chain again if there is need but otherwise, don't try again + CachedChild::NotRequired => { + warn!(self.log, "Child not cached for parent lookup"; "chain_hash" => %chain_hash); + } + CachedChild::Err(e) => { + warn!( + self.log, + "Consistency error in child block triggering parent lookup"; + "chain_hash" => %chain_hash, + "error" => ?e + ); + lookup.handle_consistency_failure(cx); + if let Err(e) = lookup.request_block_and_blobs(cx) { + debug!(self.log, + "Failed to request block and blobs, dropping lookup"; + "error" => ?e + ); + self.single_block_lookups.remove(&id); + } } } - - metrics::set_gauge( - &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_lookups.len() as i64, - ); } /* Helper functions */ @@ -1304,6 +1272,17 @@ impl BlockLookups { ); } + pub fn drop_single_lookup(&mut self, block_root: Hash256) { + if let Some(id) = self + .single_block_lookups + .iter() + .find_map(|(id, req)| (req.block_root() == block_root).then_some(*id)) + { + debug!(self.log, "Dropping single block lookup"; "id" => id, "block_root" => %block_root); + self.single_block_lookups.remove(&id); + }; + } + /// Drops all the single block requests and returns how many requests were dropped. pub fn drop_single_block_requests(&mut self) -> usize { let requests_to_drop = self.single_block_lookups.len(); diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 11eb908953f..87a04451748 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,12 +1,11 @@ use super::common::LookupType; use super::single_block_lookup::{LookupRequestError, SingleBlockLookup}; -use super::{DownloadedBlock, PeerId}; +use super::PeerId; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; -use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; @@ -22,7 +21,7 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>, + parent_requests: Vec>, /// Request of the last parent. pub current_parent_request: SingleBlockLookup, } @@ -60,15 +59,17 @@ impl ParentLookup { Self { chain_hash: block_root, - downloaded_blocks: vec![], + parent_requests: vec![], current_parent_request, } } pub fn contains_block(&self, block_root: &Hash256) -> bool { - self.downloaded_blocks - .iter() - .any(|(root, _d_block)| root == block_root) + &self.current_parent_request.block_root() == block_root + || self + .parent_requests + .iter() + .any(|request| &request.block_root() == block_root) } pub fn is_for_block(&self, block_root: Hash256) -> bool { @@ -78,7 +79,7 @@ impl ParentLookup { /// Attempts to request the next unknown parent. If the request fails, it should be removed. pub fn request_parent(&mut self, cx: &mut SyncNetworkContext) -> Result<(), RequestError> { // check to make sure this request hasn't failed - if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE { + if self.parent_requests.len() + 1 >= PARENT_DEPTH_TOLERANCE { return Err(RequestError::ChainTooLong); } @@ -100,15 +101,40 @@ impl ParentLookup { }) } - pub fn add_unknown_parent_block(&mut self, block: RpcBlock) { - let next_parent = block.parent_root(); - // Cache the block. - let current_root = self.current_parent_request.block_root(); - self.downloaded_blocks.push((current_root, block)); + pub fn add_unknown_parent_block( + &mut self, + block: RpcBlock, + da_checker: Arc>, + cx: &mut SyncNetworkContext, + ) { + // Create a new empty single block lookup for the parent, copying all peers + let parent_root = block.parent_root(); + let new_parent_request = SingleBlockLookup::new( + parent_root, + Some(ChildComponents::empty(parent_root)), + &self + .current_parent_request + .all_available_peers() + .cloned() + .collect::>(), + da_checker, + cx.next_id(), + LookupType::Parent, + ); - // Update the parent request. - self.current_parent_request - .update_requested_parent_block(next_parent) + // Replace current parent request and store in parent_requests queue + let previous_parent_request = + std::mem::replace(&mut self.current_parent_request, new_parent_request); + self.parent_requests.push(previous_parent_request); + } + + pub fn pop_completed_parent_request(&mut self) -> bool { + if let Some(new_parent_request) = self.parent_requests.pop() { + self.current_parent_request = new_parent_request; + false + } else { + true + } } pub fn block_processing_peer(&self) -> Result { @@ -125,31 +151,6 @@ impl ParentLookup { .processing_peer() } - /// Consumes the parent request and destructures it into it's parts. - #[allow(clippy::type_complexity)] - pub fn parts_for_processing( - self, - ) -> ( - Hash256, - VecDeque>, - Vec, - SingleBlockLookup, - ) { - let ParentLookup { - chain_hash, - downloaded_blocks, - current_parent_request, - } = self; - let block_count = downloaded_blocks.len(); - let mut blocks = VecDeque::with_capacity(block_count); - let mut hashes = Vec::with_capacity(block_count); - for (hash, block) in downloaded_blocks.into_iter() { - blocks.push_back(block); - hashes.push(hash); - } - (chain_hash, blocks, hashes, current_parent_request) - } - /// Get the parent lookup's chain hash. pub fn chain_hash(&self) -> Hash256 { self.chain_hash @@ -206,7 +207,7 @@ impl slog::KV for ParentLookup { ) -> slog::Result { serializer.emit_arguments("chain_hash", &format_args!("{}", self.chain_hash))?; slog::Value::serialize(&self.current_parent_request, record, "parent", serializer)?; - serializer.emit_usize("downloaded_blocks", self.downloaded_blocks.len())?; + serializer.emit_usize("downloaded_blocks", self.parent_requests.len())?; slog::Result::Ok(()) } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 077af7c3d19..c1b4139d826 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -74,16 +74,6 @@ impl SingleBlockLookup { self.block_root() == block_root } - /// Update the requested block, this should only be used in a chain of parent lookups to request - /// the next parent. - pub fn update_requested_parent_block(&mut self, block_root: Hash256) { - self.block_request_state.requested_block_root = block_root; - self.blob_request_state.block_root = block_root; - self.block_request_state.state.state = State::AwaitingDownload; - self.blob_request_state.state.state = State::AwaitingDownload; - self.child_components = Some(ChildComponents::empty(block_root)); - } - /// Get all unique used peers across block and blob requests. pub fn all_used_peers(&self) -> impl Iterator + '_ { self.block_request_state @@ -93,6 +83,15 @@ impl SingleBlockLookup { .unique() } + /// Get all unique available peers across block and blob requests. + pub fn all_available_peers(&self) -> impl Iterator + '_ { + self.block_request_state + .state + .get_available_peers() + .chain(self.blob_request_state.state.get_used_peers()) + .unique() + } + /// Send the necessary requests for blocks and/or blobs. This will check whether we have /// downloaded the block and/or blobs already and will not send requests if so. It will also /// inspect the request state or blocks and blobs to ensure we are not already processing or @@ -124,11 +123,17 @@ impl SingleBlockLookup { pub fn get_cached_child_block(&self) -> CachedChild { if let Some(components) = self.child_components.as_ref() { let Some(block) = components.downloaded_block.as_ref() else { - return CachedChild::DownloadIncomplete; + return CachedChild::DownloadIncomplete("missing block".to_owned()); }; - if !self.missing_blob_ids().is_empty() { - return CachedChild::DownloadIncomplete; + // CacheChild should include only block components for an unknown parent, so nothing is + // imported into the da_checker. The only possible contents are here. + let blobs_expected = block.num_expected_blobs(); + let blobs_downloaded = components.downloaded_blobs_count(); + if blobs_expected != blobs_downloaded { + return CachedChild::DownloadIncomplete(format!( + "missing blobs expected {blobs_expected} got {blobs_downloaded}" + )); } match RpcBlock::new_from_fixed( @@ -222,9 +227,15 @@ impl SingleBlockLookup { .check_peer_disconnected(peer_id) .is_err(); + if self.all_available_peers().count() == 0 { + return true; + } + + // If there was an active download request with this peer, send them again with another + // peer. We should receive an RPCError anyway, but this should speed things up? if block_peer_disconnected || blob_peer_disconnected { if let Err(e) = self.request_block_and_blobs(cx) { - debug!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); + debug!(log, "Single lookup failed on retry after peer disconnection"; "block_root" => ?block_root, "error" => ?e); return true; } } @@ -352,7 +363,7 @@ pub enum CachedChild { /// been performed and no kzg verification has been performed. Ok(RpcBlock), /// All child components have not yet been received. - DownloadIncomplete, + DownloadIncomplete(String), /// Child components should not be cached, send this directly for processing. NotRequired, /// There was an error during consistency checks between block and blobs. @@ -512,6 +523,10 @@ impl SingleLookupRequestState { self.used_peers.iter() } + pub fn get_available_peers(&self) -> impl Iterator { + self.available_peers.iter() + } + /// Selects a random peer from available peers if any, inserts it in used peers and returns it. pub fn use_rand_available_peer(&mut self) -> Option { let peer_id = self diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 8e3b35ee5d3..4fd8d625385 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -194,6 +194,10 @@ impl TestRig { self.sync_manager.handle_message(sync_message); } + fn active_single_lookups(&self) -> Vec<(Id, Hash256)> { + self.sync_manager.active_single_lookups() + } + fn active_single_lookups_count(&self) -> usize { self.sync_manager.active_single_lookups().len() } @@ -210,6 +214,35 @@ impl TestRig { self.sync_manager.failed_chains_contains(chain_hash) } + fn find_single_lookup_for(&self, block_root: Hash256) -> Id { + self.active_single_lookups() + .iter() + .find(|(_, b)| b == &block_root) + .unwrap_or_else(|| panic!("no single block lookup found for {block_root}")) + .0 + } + + fn expect_no_active_parent_lookups(&self) { + assert_eq!( + self.active_parent_lookups(), + vec![], + "expected no parent lookups" + ); + } + + fn expect_no_active_single_lookups(&self) { + assert!( + self.active_single_lookups().is_empty(), + "expect no single block lookups: {:?}", + self.active_single_lookups() + ); + } + + fn expect_no_active_lookups(&self) { + self.expect_no_active_parent_lookups(); + self.expect_no_active_single_lookups(); + } + #[track_caller] fn assert_parent_lookups_consistency(&self) { let hashes = self.active_parent_lookups(); @@ -233,20 +266,23 @@ impl TestRig { peer_id } - fn parent_chain_processed(&mut self, chain_hash: Hash256, result: BatchProcessResult) { - self.send_sync_message(SyncMessage::BatchProcessed { - sync_type: ChainSegmentProcessId::ParentLookup(chain_hash), - result, - }) - } - - fn parent_chain_processed_success(&mut self, chain_hash: Hash256) { - self.parent_chain_processed( - chain_hash, - BatchProcessResult::Success { - was_non_empty: true, - }, - ) + fn parent_chain_processed_success( + &mut self, + chain_hash: Hash256, + blocks: &[Arc>], + ) { + // Send import events for all pending parent blocks + for block in blocks { + self.parent_block_processed_imported(chain_hash); + } + // Send final import event for the block that triggered the lookup + let trigger_lookup = self + .active_single_lookups() + .iter() + .find(|(_, block_root)| block_root == &chain_hash) + .copied() + .unwrap_or_else(|| panic!("There should exist a single block lookup for {chain_hash}")); + self.single_block_component_processed_imported(trigger_lookup.0, chain_hash); } fn parent_block_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { @@ -263,22 +299,14 @@ impl TestRig { ); } - fn single_block_component_processed( - &mut self, - id: SingleLookupReqId, - result: BlockProcessingResult, - ) { + fn single_block_component_processed(&mut self, id: Id, result: BlockProcessingResult) { self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type: BlockProcessType::SingleBlock { id: id.id }, + process_type: BlockProcessType::SingleBlock { id }, result, }) } - fn single_block_component_processed_imported( - &mut self, - id: SingleLookupReqId, - block_root: Hash256, - ) { + fn single_block_component_processed_imported(&mut self, id: Id, block_root: Hash256) { self.single_block_component_processed( id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), @@ -544,9 +572,13 @@ impl TestRig { fn expect_parent_chain_process(&mut self) { match self.beacon_processor_rx.try_recv() { Ok(work) => { - assert_eq!(work.work_type(), beacon_processor::CHAIN_SEGMENT); + // Parent chain sends blocks one by one + assert_eq!(work.work_type(), beacon_processor::RPC_BLOCK); } - other => panic!("Expected chain segment process, found {:?}", other), + other => panic!( + "Expected rpc_block from chain segment process, found {:?}", + other + ), } } @@ -567,17 +599,24 @@ impl TestRig { } #[track_caller] - pub fn expect_penalty(&mut self, peer_id: PeerId) { - self.pop_received_network_event(|ev| match ev { - NetworkMessage::ReportPeer { peer_id: p_id, .. } if p_id == &peer_id => Some(()), - _ => None, - }) - .unwrap_or_else(|_| { - panic!( - "Expected peer penalty for {peer_id}: {:#?}", - self.network_rx_queue - ) - }) + pub fn expect_penalty(&mut self, peer_id: PeerId, expect_penalty_msg: &'static str) { + let penalty_msg = self + .pop_received_network_event(|ev| match ev { + NetworkMessage::ReportPeer { + peer_id: p_id, msg, .. + } if p_id == &peer_id => Some(msg.to_owned()), + _ => None, + }) + .unwrap_or_else(|_| { + panic!( + "Expected peer penalty for {peer_id}: {:#?}", + self.network_rx_queue + ) + }); + assert_eq!( + penalty_msg, expect_penalty_msg, + "Unexpected penalty msg for {peer_id}" + ); } pub fn block_with_parent_and_blobs( @@ -630,9 +669,9 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. rig.single_lookup_block_response(id, peer_id, None); - rig.single_block_component_processed_imported(id, block_root); + rig.single_block_component_processed_imported(id.id, block_root); rig.expect_empty_network(); - assert_eq!(rig.active_single_lookups_count(), 0); + rig.expect_no_active_lookups(); } #[test] @@ -648,7 +687,7 @@ fn test_single_block_lookup_empty_response() { // The peer does not have the block. It should be penalized. rig.single_lookup_block_response(id, peer_id, None); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "NoResponseReturned"); rig.expect_block_lookup_request(block_hash); // it should be retried } @@ -667,7 +706,7 @@ fn test_single_block_lookup_wrong_response() { // Peer sends something else. It should be penalized. let bad_block = rig.rand_block(); rig.single_lookup_block_response(id, peer_id, Some(bad_block.into())); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "UnrequestedBlockRoot"); rig.expect_block_lookup_request(block_hash); // should be retried // Send the stream termination. This should not produce an additional penalty. @@ -717,7 +756,7 @@ fn test_single_block_lookup_becomes_parent_request() { // Send the stream termination. Peer should have not been penalized, and the request moved to a // parent request after processing. rig.single_block_component_processed( - id, + id.id, BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), ); assert_eq!(rig.active_single_lookups_count(), 1); @@ -748,8 +787,8 @@ fn test_parent_lookup_happy_path() { BlockError::BlockIsAlreadyKnown(block_root).into(), ); rig.expect_parent_chain_process(); - rig.parent_chain_processed_success(block_root); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.parent_chain_processed_success(block_root, &[]); + rig.expect_no_active_lookups(); } #[test] @@ -766,7 +805,7 @@ fn test_parent_lookup_wrong_response() { // Peer sends the wrong block, peer should be penalized and the block re-requested. let bad_block = rig.rand_block(); rig.parent_lookup_block_response(id1, peer_id, Some(bad_block.into())); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "UnrequestedBlockRoot"); let id2 = rig.expect_block_parent_request(parent_root); // Send the stream termination for the first request. This should not produce extra penalties. @@ -780,8 +819,8 @@ fn test_parent_lookup_wrong_response() { // Processing succeeds, now the rest of the chain should be sent for processing. rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); - rig.parent_chain_processed_success(block_root); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.parent_chain_processed_success(block_root, &[]); + rig.expect_no_active_lookups(); } #[test] @@ -797,7 +836,7 @@ fn test_parent_lookup_empty_response() { // Peer sends an empty response, peer should be penalized and the block re-requested. rig.parent_lookup_block_response(id1, peer_id, None); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "NoResponseReturned"); let id2 = rig.expect_block_parent_request(parent_root); // Send the right block this time. @@ -806,9 +845,10 @@ fn test_parent_lookup_empty_response() { // Processing succeeds, now the rest of the chain should be sent for processing. rig.parent_block_processed_imported(block_root); - rig.expect_parent_chain_process(); - rig.parent_chain_processed_success(block_root); - assert_eq!(rig.active_parent_lookups_count(), 0); + + let id = rig.find_single_lookup_for(block_root); + rig.single_block_component_processed_imported(id, block_root); + rig.expect_no_active_lookups(); } #[test] @@ -833,8 +873,8 @@ fn test_parent_lookup_rpc_failure() { // Processing succeeds, now the rest of the chain should be sent for processing. rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); - rig.parent_chain_processed_success(block_root); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.parent_chain_processed_success(block_root, &[]); + rig.expect_no_active_lookups(); } #[test] @@ -872,11 +912,11 @@ fn test_parent_lookup_too_many_attempts() { // I'm unsure if this is how it should behave? // rig.parent_lookup_block_response(id, peer_id, None); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "UnrequestedBlockRoot"); } } - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.expect_no_active_lookups(); } #[test] @@ -902,13 +942,13 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { // Send a bad block this time. It should be tried again. let bad_block = rig.rand_block(); rig.parent_lookup_block_response(id, peer_id, Some(bad_block.into())); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "UnrequestedBlockRoot"); } } - assert_eq!(rig.active_parent_lookups_count(), 0); assert!(!rig.failed_chains_contains(&block_root)); assert!(!rig.failed_chains_contains(&parent.canonical_root())); + rig.expect_no_active_lookups(); } #[test] @@ -944,11 +984,11 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { rig.parent_lookup_block_response(id, peer_id, Some(parent.clone().into())); rig.parent_block_processed(block_root, BlockError::InvalidSignature.into()); rig.parent_lookup_block_response(id, peer_id, None); - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, "parent_request_err"); } assert!(rig.failed_chains_contains(&block_root)); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.expect_no_active_lookups(); } #[test] @@ -976,7 +1016,7 @@ fn test_parent_lookup_too_deep() { ) } - rig.expect_penalty(peer_id); + rig.expect_penalty(peer_id, ""); assert!(rig.failed_chains_contains(&chain_hash)); } @@ -988,7 +1028,7 @@ fn test_parent_lookup_disconnection() { rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); rig.peer_disconnected(peer_id); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.expect_no_active_lookups(); } #[test] @@ -1015,9 +1055,9 @@ fn test_single_block_lookup_ignored_response() { // after processing. rig.single_lookup_block_response(id, peer_id, None); // Send an Ignored response, the request should be dropped - rig.single_block_component_processed(id, BlockProcessingResult::Ignored); + rig.single_block_component_processed(id.id, BlockProcessingResult::Ignored); rig.expect_empty_network(); - assert_eq!(rig.active_single_lookups_count(), 0); + rig.expect_no_active_lookups(); } #[test] @@ -1028,8 +1068,10 @@ fn test_parent_lookup_ignored_response() { let peer_id = rig.new_connected_peer(); // Trigger the request - rig.trigger_unknown_parent_block(peer_id, block.into()); + rig.trigger_unknown_parent_block(peer_id, block.clone().into()); let id = rig.expect_parent_request_block_and_blobs(parent_root); + // Note: single block lookup for current `block` does not trigger any request because it does + // not has blobs, and the block is already cached // Peer sends the right block, it should be sent for processing. Peer should not be penalized. rig.parent_lookup_block_response(id, peer_id, Some(parent.into())); @@ -1039,7 +1081,7 @@ fn test_parent_lookup_ignored_response() { // Return an Ignored result. The request should be dropped rig.parent_block_processed(block_root, BlockProcessingResult::Ignored); rig.expect_empty_network(); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.expect_no_active_lookups(); } /// This is a regression test. @@ -1056,7 +1098,7 @@ fn test_same_chain_race_condition() { let chain_hash = trigger_block.canonical_root(); rig.trigger_unknown_parent_block(peer_id, trigger_block.clone()); - for (i, block) in blocks.into_iter().rev().enumerate() { + for (i, block) in blocks.clone().into_iter().rev().enumerate() { let id = rig.expect_parent_request_block_and_blobs(block.canonical_root()); // the block rig.parent_lookup_block_response(id, peer_id, Some(block.clone())); @@ -1088,14 +1130,15 @@ fn test_same_chain_race_condition() { rig.trigger_unknown_parent_block(peer_id, trigger_block); rig.assert_parent_lookups_consistency(); - rig.parent_chain_processed_success(chain_hash); - assert_eq!(rig.active_parent_lookups_count(), 0); + rig.parent_chain_processed_success(chain_hash, &blocks); + rig.expect_no_active_lookups(); } mod deneb_only { use super::*; use beacon_chain::data_availability_checker::AvailabilityCheckError; use ssz_types::VariableList; + use std::collections::VecDeque; struct DenebTester { rig: TestRig, @@ -1233,6 +1276,7 @@ mod deneb_only { } fn parent_block_response(mut self) -> Self { + self.rig.log("parent_block_response"); self.rig.expect_empty_network(); let block = self.parent_block.pop_front().unwrap().clone(); let _ = self.unknown_parent_block.insert(block.clone()); @@ -1247,6 +1291,7 @@ mod deneb_only { } fn parent_blob_response(mut self) -> Self { + self.rig.log("parent_blob_response"); let blobs = self.parent_blobs.pop_front().unwrap(); let _ = self.unknown_parent_blobs.insert(blobs.clone()); for blob in &blobs { @@ -1276,6 +1321,7 @@ mod deneb_only { } fn block_response(mut self) -> Self { + self.rig.log("block_response"); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. self.rig.single_lookup_block_response( @@ -1291,6 +1337,8 @@ mod deneb_only { } fn blobs_response(mut self) -> Self { + self.rig + .log(&format!("blobs response {}", self.blobs.len())); for blob in &self.blobs { self.rig.single_lookup_blob_response( self.blob_req_id.expect("blob request id"), @@ -1360,7 +1408,7 @@ mod deneb_only { // Missing blobs should be the request is not removed, the outstanding blobs request should // mean we do not send a new request. self.rig.single_block_component_processed( - self.block_req_id.expect("block request id"), + self.block_req_id.expect("block request id").id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), ); self.rig.expect_empty_network(); @@ -1369,6 +1417,7 @@ mod deneb_only { } fn parent_block_imported(mut self) -> Self { + self.rig.log("parent_block_imported"); self.rig.parent_block_processed( self.block_root, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), @@ -1407,7 +1456,7 @@ mod deneb_only { fn invalid_block_processed(mut self) -> Self { self.rig.single_block_component_processed( - self.block_req_id.expect("block request id"), + self.block_req_id.expect("block request id").id, BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), ); assert_eq!(self.rig.active_single_lookups_count(), 1); @@ -1416,7 +1465,7 @@ mod deneb_only { fn invalid_blob_processed(mut self) -> Self { self.rig.single_block_component_processed( - self.blob_req_id.expect("blob request id"), + self.blob_req_id.expect("blob request id").id, BlockProcessingResult::Err(BlockError::AvailabilityCheck( AvailabilityCheckError::KzgVerificationFailed, )), @@ -1427,7 +1476,7 @@ mod deneb_only { fn missing_components_from_block_request(mut self) -> Self { self.rig.single_block_component_processed( - self.block_req_id.expect("block request id"), + self.block_req_id.expect("block request id").id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( self.slot, self.block_root, @@ -1449,8 +1498,8 @@ mod deneb_only { self } - fn expect_penalty(mut self) -> Self { - self.rig.expect_penalty(self.peer_id); + fn expect_penalty(mut self, expect_penalty_msg: &'static str) -> Self { + self.rig.expect_penalty(self.peer_id, expect_penalty_msg); self } fn expect_no_penalty(mut self) -> Self { @@ -1514,6 +1563,10 @@ mod deneb_only { self.rig.expect_block_process(ResponseType::Block); self } + fn expect_no_active_lookups(self) -> Self { + self.rig.expect_no_active_lookups(); + self + } fn search_parent_dup(mut self) -> Self { self.rig .trigger_unknown_parent_block(self.peer_id, self.block.clone()); @@ -1555,7 +1608,7 @@ mod deneb_only { tester .empty_block_response() - .expect_penalty() + .expect_penalty("NoResponseReturned") .expect_block_request() .expect_no_blobs_request() .empty_blobs_response() @@ -1578,7 +1631,7 @@ mod deneb_only { .missing_components_from_block_request() .empty_blobs_response() .missing_components_from_blob_request() - .expect_penalty() + .expect_penalty("single_blob_failure") .expect_blobs_request() .expect_no_block_request(); } @@ -1595,7 +1648,7 @@ mod deneb_only { .expect_no_penalty_and_no_requests() .missing_components_from_blob_request() .empty_block_response() - .expect_penalty() + .expect_penalty("NoResponseReturned") .expect_block_request() .expect_no_blobs_request(); } @@ -1609,7 +1662,7 @@ mod deneb_only { tester .block_response_triggering_process() .invalid_block_processed() - .expect_penalty() + .expect_penalty("single_block_failure") .expect_block_request() .expect_no_blobs_request() .blobs_response() @@ -1628,7 +1681,7 @@ mod deneb_only { .missing_components_from_block_request() .blobs_response() .invalid_blob_processed() - .expect_penalty() + .expect_penalty("single_blob_failure") .expect_blobs_request() .expect_no_block_request(); } @@ -1645,7 +1698,7 @@ mod deneb_only { .invalidate_blobs_too_few() .blobs_response() .missing_components_from_blob_request() - .expect_penalty() + .expect_penalty("single_blob_failure") .expect_blobs_request() .expect_no_block_request(); } @@ -1660,7 +1713,7 @@ mod deneb_only { .block_response_triggering_process() .invalidate_blobs_too_many() .blobs_response() - .expect_penalty() + .expect_penalty("DuplicateData") .expect_blobs_request() .expect_no_block_request(); } @@ -1688,7 +1741,7 @@ mod deneb_only { tester .invalidate_blobs_too_many() .blobs_response() - .expect_penalty() + .expect_penalty("DuplicateData") .expect_blobs_request() .expect_no_block_request() .block_response_triggering_process(); @@ -1729,7 +1782,7 @@ mod deneb_only { .parent_blob_response() .expect_block_process() .invalid_parent_processed() - .expect_penalty() + .expect_penalty("parent_request_err") .expect_parent_block_request() .expect_parent_blobs_request() .expect_empty_beacon_processor(); @@ -1780,7 +1833,7 @@ mod deneb_only { tester .empty_parent_block_response() - .expect_penalty() + .expect_penalty("NoResponseReturned") .expect_parent_block_request() .expect_no_blobs_request() .parent_blob_response() @@ -1805,7 +1858,7 @@ mod deneb_only { .empty_parent_blobs_response() .expect_no_penalty_and_no_requests() .parent_block_response() - .expect_penalty() + .expect_penalty("single_blob_failure") .expect_parent_blobs_request() .parent_blob_response() .expect_block_process() @@ -1848,7 +1901,7 @@ mod deneb_only { .parent_blob_response() .expect_block_process() .invalid_parent_processed() - .expect_penalty() + .expect_penalty("parent_request_err") .expect_parent_block_request() .expect_parent_blobs_request() .expect_empty_beacon_processor(); @@ -1868,7 +1921,10 @@ mod deneb_only { .expect_block_process() .parent_block_imported() .block_response() - .expect_parent_chain_process(); + .blobs_response() + .expect_parent_chain_process() + .block_imported() + .expect_no_active_lookups(); } #[test] @@ -1886,7 +1942,10 @@ mod deneb_only { .parent_blob_response() .expect_block_process() .parent_block_imported() - .expect_parent_chain_process(); + .blobs_response() + .expect_parent_chain_process() + .block_imported() + .expect_no_active_lookups(); } #[test] @@ -1899,7 +1958,7 @@ mod deneb_only { tester .empty_parent_block_response() - .expect_penalty() + .expect_penalty("NoResponseReturned") .expect_parent_block_request() .expect_no_blobs_request() .parent_blob_response() @@ -1907,8 +1966,10 @@ mod deneb_only { .parent_block_response() .expect_block_process() .parent_block_imported() + .blobs_response() .block_response() - .expect_parent_chain_process(); + .block_imported() + .expect_no_active_lookups(); } #[test] @@ -1920,16 +1981,18 @@ mod deneb_only { }; tester - .block_response() - .empty_parent_blobs_response() - .expect_no_penalty_and_no_requests() - .parent_block_response() - .expect_penalty() - .expect_parent_blobs_request() - .parent_blob_response() - .expect_block_process() - .parent_block_imported() - .expect_parent_chain_process(); + .block_response() // reply with current block + .empty_parent_blobs_response() // replies empty blobs to parent block + .expect_no_penalty_and_no_requests() // no penalty because parent block is unknown + .parent_block_response() // reply with parent block + .expect_penalty("single_blob_failure") // parent block has data, so penalize parent blob peer + .expect_parent_blobs_request() // re-request parent blobs + .parent_blob_response() // good response now + .expect_block_process() // send parent block for import + .parent_block_imported() // parent block imported + .blobs_response() + .block_imported() // resolve original block trigger blobs request and import + .expect_no_active_lookups(); } #[test] diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9c17c6a1512..73b6bcf3f23 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -263,7 +263,7 @@ impl SyncManager { } #[cfg(test)] - pub(crate) fn active_single_lookups(&self) -> Vec { + pub(crate) fn active_single_lookups(&self) -> Vec<(Id, Hash256)> { self.block_lookups.active_single_lookups() } @@ -661,9 +661,6 @@ impl SyncManager { } } } - ChainSegmentProcessId::ParentLookup(chain_hash) => self - .block_lookups - .parent_chain_processed(chain_hash, result, &mut self.network), }, } }