From 07df74c83f53adeabac1d466007894cb94b553f2 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 15 May 2024 14:11:54 +0300 Subject: [PATCH] Trigger sampling on sync events (#5776) * Trigger sampling on sync events * Update beacon_chain.rs * Fix tests * Fix tests --- beacon_node/beacon_chain/src/beacon_chain.rs | 22 ++++++++++---- .../beacon_chain/src/block_verification.rs | 4 +++ .../beacon_chain/tests/block_verification.rs | 4 +-- .../gossip_methods.rs | 12 +++----- .../network_beacon_processor/sync_methods.rs | 30 ++++++++++++++++--- 5 files changed, 52 insertions(+), 20 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7353fda794b..2ebf14dd322 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -220,11 +220,13 @@ impl TryInto for AvailabilityProcessingStatus { /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. - Successful { imported_blocks: usize }, + Successful { + imported_blocks: Vec<(Hash256, Slot)>, + }, /// There was an error processing this chain segment. Before the error, some blocks could /// have been imported. Failed { - imported_blocks: usize, + imported_blocks: Vec<(Hash256, Slot)>, error: BlockError, }, } @@ -2712,7 +2714,7 @@ impl BeaconChain { chain_segment: Vec>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. - let imported_blocks = 0; + let imported_blocks = vec![]; let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len()); // Produce a list of the parent root and slot of the child of each block. @@ -2818,7 +2820,7 @@ impl BeaconChain { chain_segment: Vec>, notify_execution_layer: NotifyExecutionLayer, ) -> ChainSegmentResult { - let mut imported_blocks = 0; + let mut imported_blocks = vec![]; // Filter uninteresting blocks from the chain segment in a blocking task. let chain = self.clone(); @@ -2878,6 +2880,7 @@ impl BeaconChain { // Import the blocks into the chain. for signature_verified_block in signature_verified_blocks { + let block_slot = signature_verified_block.slot(); match self .process_block( signature_verified_block.block_root(), @@ -2889,9 +2892,9 @@ impl BeaconChain { { Ok(status) => { match status { - AvailabilityProcessingStatus::Imported(_) => { + AvailabilityProcessingStatus::Imported(block_root) => { // The block was imported successfully. - imported_blocks += 1; + imported_blocks.push((block_root, block_slot)); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { warn!(self.log, "Blobs missing in response to range request"; @@ -6871,6 +6874,13 @@ impl BeaconChain { self.data_availability_checker.data_availability_boundary() } + /// Returns true if we should issue a sampling request for this block + /// TODO(das): check if the block is still within the da_window + pub fn should_sample_slot(&self, slot: Slot) -> bool { + self.spec + .is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())) + } + pub fn logger(&self) -> &Logger { &self.log } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index dc989595f63..9dc3ee7029f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1220,6 +1220,10 @@ impl SignatureVerifiedBlock { pub fn block_root(&self) -> Hash256 { self.block_root } + + pub fn slot(&self) -> Slot { + self.block.slot() + } } impl IntoExecutionPendingBlock for SignatureVerifiedBlock { diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 98a112daffe..c1b7261fc21 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1362,7 +1362,7 @@ async fn add_base_block_to_altair_chain() { ) .await, ChainSegmentResult::Failed { - imported_blocks: 0, + .. error: BlockError::InconsistentFork(InconsistentFork { fork_at_slot: ForkName::Altair, object_fork: ForkName::Base, @@ -1497,7 +1497,7 @@ async fn add_altair_block_to_base_chain() { ) .await, ChainSegmentResult::Failed { - imported_blocks: 0, + .. error: BlockError::InconsistentFork(InconsistentFork { fork_at_slot: ForkName::Base, object_fork: ForkName::Altair, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 8007ad25735..4eefb473c15 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1382,20 +1382,16 @@ impl NetworkBeaconProcessor { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; + // TODO(das) Might be too early to issue a request here. We haven't checked that the block + // actually includes blob transactions and thus has data. A peer could send a block is + // garbage commitments, and make us trigger sampling for a block that does not have data. if block.num_expected_blobs() > 0 { // Trigger sampling for block not yet execution valid. At this point column custodials are // unlikely to have received their columns. Triggering sampling so early is only viable with // either: // - Sync delaying sampling until some latter window // - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569 - if self - .chain - .spec - .eip7594_fork_epoch - .map_or(false, |eip7594_fork_epoch| { - block.epoch() >= eip7594_fork_epoch - }) - { + if self.chain.should_sample_slot(block.slot()) { self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot())); } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index a487157caaf..598f9b4f303 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -140,6 +140,7 @@ impl NetworkBeaconProcessor { }; let slot = block.slot(); + let block_has_data = block.as_block().num_expected_blobs() > 0; let parent_root = block.message().parent_root(); let commitments_formatted = block.as_block().commitments_formatted(); @@ -182,6 +183,18 @@ impl NetworkBeaconProcessor { self.chain.recompute_head_at_current_slot().await; } + + // RPC block imported or execution validated. If the block was already imported by gossip we + // receive Err(BlockError::AlreadyKnown). + if result.is_ok() && + // Block has at least one blob, so it produced columns + block_has_data && + // Block slot is within the DA boundary (should always be the case) and PeerDAS is activated + self.chain.should_sample_slot(slot) + { + self.send_sync_message(SyncMessage::SampleBlock(block_root, slot)); + } + // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, @@ -495,10 +508,19 @@ impl NetworkBeaconProcessor { { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks > 0 { + if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; + + for (block_root, block_slot) in &imported_blocks { + if self.chain.should_sample_slot(*block_slot) { + self.send_sync_message(SyncMessage::SampleBlock( + *block_root, + *block_slot, + )); + } + } } - (imported_blocks, Ok(())) + (imported_blocks.len(), Ok(())) } ChainSegmentResult::Failed { imported_blocks, @@ -506,10 +528,10 @@ impl NetworkBeaconProcessor { } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); let r = self.handle_failed_chain_segment(error); - if imported_blocks > 0 { + if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; } - (imported_blocks, r) + (imported_blocks.len(), r) } } }