Skip to content

Commit

Permalink
Trigger sampling on sync events (#5776)
Browse files Browse the repository at this point in the history
* Trigger sampling on sync events

* Update beacon_chain.rs

* Fix tests

* Fix tests
  • Loading branch information
dapplion authored May 15, 2024
1 parent 4332207 commit 07df74c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 20 deletions.
22 changes: 16 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,13 @@ impl TryInto<Hash256> for AvailabilityProcessingStatus {
/// The result of a chain segment processing.
pub enum ChainSegmentResult<E: EthSpec> {
/// Processing this chain segment finished successfully.
Successful { imported_blocks: usize },
Successful {
imported_blocks: Vec<(Hash256, Slot)>,
},
/// There was an error processing this chain segment. Before the error, some blocks could
/// have been imported.
Failed {
imported_blocks: usize,
imported_blocks: Vec<(Hash256, Slot)>,
error: BlockError<E>,
},
}
Expand Down Expand Up @@ -2712,7 +2714,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<RpcBlock<T::EthSpec>>,
) -> Result<Vec<HashBlockTuple<T::EthSpec>>, ChainSegmentResult<T::EthSpec>> {
// This function will never import any blocks.
let imported_blocks = 0;
let imported_blocks = vec![];
let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len());

// Produce a list of the parent root and slot of the child of each block.
Expand Down Expand Up @@ -2818,7 +2820,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<RpcBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
) -> ChainSegmentResult<T::EthSpec> {
let mut imported_blocks = 0;
let mut imported_blocks = vec![];

// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
Expand Down Expand Up @@ -2878,6 +2880,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Import the blocks into the chain.
for signature_verified_block in signature_verified_blocks {
let block_slot = signature_verified_block.slot();
match self
.process_block(
signature_verified_block.block_root(),
Expand All @@ -2889,9 +2892,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
{
Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(_) => {
AvailabilityProcessingStatus::Imported(block_root) => {
// The block was imported successfully.
imported_blocks += 1;
imported_blocks.push((block_root, block_slot));
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
warn!(self.log, "Blobs missing in response to range request";
Expand Down Expand Up @@ -6871,6 +6874,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.data_availability_checker.data_availability_boundary()
}

/// Returns true if we should issue a sampling request for this block
/// TODO(das): check if the block is still within the da_window
pub fn should_sample_slot(&self, slot: Slot) -> bool {
self.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
}

pub fn logger(&self) -> &Logger {
&self.log
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,10 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
pub fn block_root(&self) -> Hash256 {
self.block_root
}

pub fn slot(&self) -> Slot {
self.block.slot()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBlock<T> {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ async fn add_base_block_to_altair_chain() {
)
.await,
ChainSegmentResult::Failed {
imported_blocks: 0,
..
error: BlockError::InconsistentFork(InconsistentFork {
fork_at_slot: ForkName::Altair,
object_fork: ForkName::Base,
Expand Down Expand Up @@ -1497,7 +1497,7 @@ async fn add_altair_block_to_base_chain() {
)
.await,
ChainSegmentResult::Failed {
imported_blocks: 0,
..
error: BlockError::InconsistentFork(InconsistentFork {
fork_at_slot: ForkName::Base,
object_fork: ForkName::Altair,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,20 +1382,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

// TODO(das) Might be too early to issue a request here. We haven't checked that the block
// actually includes blob transactions and thus has data. A peer could send a block is
// garbage commitments, and make us trigger sampling for a block that does not have data.
if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
// either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
if self
.chain
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block.epoch() >= eip7594_fork_epoch
})
{
if self.chain.should_sample_slot(block.slot()) {
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
}
}
Expand Down
30 changes: 26 additions & 4 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

let slot = block.slot();
let block_has_data = block.as_block().num_expected_blobs() > 0;
let parent_root = block.message().parent_root();
let commitments_formatted = block.as_block().commitments_formatted();

Expand Down Expand Up @@ -182,6 +183,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

self.chain.recompute_head_at_current_slot().await;
}

// RPC block imported or execution validated. If the block was already imported by gossip we
// receive Err(BlockError::AlreadyKnown).
if result.is_ok() &&
// Block has at least one blob, so it produced columns
block_has_data &&
// Block slot is within the DA boundary (should always be the case) and PeerDAS is activated
self.chain.should_sample_slot(slot)
{
self.send_sync_message(SyncMessage::SampleBlock(block_root, slot));
}

// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
Expand Down Expand Up @@ -495,21 +508,30 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
{
ChainSegmentResult::Successful { imported_blocks } => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL);
if imported_blocks > 0 {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;

for (block_root, block_slot) in &imported_blocks {
if self.chain.should_sample_slot(*block_slot) {
self.send_sync_message(SyncMessage::SampleBlock(
*block_root,
*block_slot,
));
}
}
}
(imported_blocks, Ok(()))
(imported_blocks.len(), Ok(()))
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = self.handle_failed_chain_segment(error);
if imported_blocks > 0 {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
(imported_blocks, r)
(imported_blocks.len(), r)
}
}
}
Expand Down

0 comments on commit 07df74c

Please sign in to comment.