diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 7263ea8a3ca..61688780907 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; use types::{ - BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap, + BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, SignedBeaconBlock, }; @@ -62,7 +62,7 @@ pub enum DataColumnsByRootRequester { #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct SamplingId { pub id: SamplingRequester, - pub column_index: ColumnIndex, + pub sampling_request_id: SamplingRequestId, } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -70,6 +70,10 @@ pub enum SamplingRequester { ImportedBlock(Hash256), } +/// Identifier of sampling requests. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct SamplingRequestId(pub usize); + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct CustodyId { pub requester: CustodyRequester, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index cd363cfaee3..9572bf7f444 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -716,7 +716,13 @@ impl TestRig { ) { let first_dc = data_columns.first().unwrap(); let block_root = first_dc.block_root(); - let column_index = first_dc.index; + let sampling_request_id = match id.0 { + SyncRequestId::DataColumnsByRoot( + _, + _requester @ DataColumnsByRootRequester::Sampling(sampling_id), + ) => sampling_id.sampling_request_id, + _ => unreachable!(), + }; self.complete_data_columns_by_root_request(id, data_columns); // Expect work event @@ -727,7 +733,7 @@ impl TestRig { self.send_sync_message(SyncMessage::SampleVerified { id: SamplingId { id: SamplingRequester::ImportedBlock(block_root), - column_index, + sampling_request_id, }, result: Ok(()), }) diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 32425ef8c8d..bae8547b1a7 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -1,9 +1,13 @@ use self::request::ActiveColumnSampleRequest; -use super::network_context::{RpcResponseError, SyncNetworkContext}; +use super::network_context::{ + DataColumnsByRootSingleBlockRequest, RpcResponseError, SyncNetworkContext, +}; use crate::metrics; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; -use lighthouse_network::service::api_types::{SamplingId, SamplingRequester}; +use lighthouse_network::service::api_types::{ + DataColumnsByRootRequester, SamplingId, SamplingRequestId, SamplingRequester, +}; use lighthouse_network::{PeerAction, PeerId}; use rand::{seq::SliceRandom, thread_rng}; use slog::{debug, error, warn}; @@ -101,7 +105,7 @@ impl Sampling { return None; }; - let result = request.on_sample_downloaded(peer_id, id.column_index, resp, cx); + let result = request.on_sample_downloaded(peer_id, id.sampling_request_id, resp, cx); self.handle_sampling_result(result, &id.id) } @@ -124,7 +128,7 @@ impl Sampling { return None; }; - let result = request.on_sample_verified(id.column_index, result, cx); + let result = request.on_sample_verified(id.sampling_request_id, result, cx); self.handle_sampling_result(result, &id.id) } @@ -156,6 +160,10 @@ pub struct ActiveSamplingRequest { block_slot: Slot, requester_id: SamplingRequester, column_requests: FnvHashMap, + /// Mapping of column indexes for a sampling request. + column_indexes_by_sampling_request: FnvHashMap>, + /// Sequential ID for sampling requests. + current_sampling_request_id: SamplingRequestId, column_shuffle: Vec, required_successes: Vec, /// Logger for the `SyncNetworkContext`. @@ -205,6 +213,8 @@ impl ActiveSamplingRequest { block_slot, requester_id, column_requests: <_>::default(), + column_indexes_by_sampling_request: <_>::default(), + current_sampling_request_id: SamplingRequestId(0), column_shuffle, required_successes: match sampling_config { SamplingConfig::Default => REQUIRED_SUCCESSES.to_vec(), @@ -226,7 +236,7 @@ impl ActiveSamplingRequest { pub(crate) fn on_sample_downloaded( &mut self, _peer_id: PeerId, - column_index: ColumnIndex, + sampling_request_id: SamplingRequestId, resp: Result<(DataColumnSidecarVec, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Result, SamplingError> { @@ -235,57 +245,103 @@ impl ActiveSamplingRequest { // Progress requests // If request fails retry or expand search // If all good return - let Some(request) = self.column_requests.get_mut(&column_index) else { - warn!( - self.log, - "Received sampling response for unrequested column index" - ); + let Some(column_indexes) = self + .column_indexes_by_sampling_request + .get(&sampling_request_id) + else { + error!(self.log, "Column indexes for the sampling request ID not found"; "sampling_request_id" => ?sampling_request_id); return Ok(None); }; match resp { - Ok((mut data_columns, seen_timestamp)) => { - debug!(self.log, "Sample download success"; "block_root" => %self.block_root, "column_index" => column_index, "count" => data_columns.len()); + Ok((mut resp_data_columns, seen_timestamp)) => { + debug!(self.log, "Sample download success"; "block_root" => %self.block_root, "column_indexes" => ?column_indexes, "count" => resp_data_columns.len()); metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::SUCCESS]); - // No need to check data_columns has len > 1, as the SyncNetworkContext ensure that - // only requested is returned (or none); - if let Some(data_column) = data_columns.pop() { + // Filter the data received in the response using the requested column indexes. + let mut data_columns = vec![]; + for column_index in column_indexes { + let Some(request) = self.column_requests.get_mut(column_index) else { + warn!( + self.log, + "Active column sample request not found"; "block_root" => %self.block_root, "column_index" => column_index + ); + continue; + }; + + let Some(data_pos) = resp_data_columns + .iter() + .position(|data| &data.index == column_index) + else { + // Peer does not have the requested data. + // TODO(das) what to do? + debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); + request.on_sampling_error()?; + continue; + }; + + data_columns.push(resp_data_columns.swap_remove(data_pos)); + } + + if !resp_data_columns.is_empty() { + let resp_column_indexes = resp_data_columns + .iter() + .map(|d| d.index) + .collect::>(); + debug!( + self.log, + "Received data that was not requested"; "block_root" => %self.block_root, "column_indexes" => ?resp_column_indexes + ); + } + + // Handle the downloaded data columns. + if data_columns.is_empty() { + debug!(self.log,"Received empty response"; "block_root" => %self.block_root); + self.column_indexes_by_sampling_request + .remove(&sampling_request_id); + } else { + // Overwrite `column_indexes` with the column indexes received in the response. + let column_indexes = data_columns.iter().map(|d| d.index).collect::>(); + self.column_indexes_by_sampling_request + .insert(sampling_request_id, column_indexes.clone()); // Peer has data column, send to verify let Some(beacon_processor) = cx.beacon_processor_if_enabled() else { // If processor is not available, error the entire sampling debug!(self.log, "Dropping sampling"; "block" => %self.block_root, "reason" => "beacon processor unavailable"); return Err(SamplingError::ProcessorUnavailable); }; - - debug!(self.log, "Sending data_column for verification"; "block" => ?self.block_root, "column_index" => column_index); + debug!(self.log, "Sending data_column for verification"; "block" => ?self.block_root, "column_indexes" => ?column_indexes); if let Err(e) = beacon_processor.send_rpc_validate_data_columns( self.block_root, - vec![data_column], + data_columns, seen_timestamp, SamplingId { id: self.requester_id, - column_index, + sampling_request_id, }, ) { // TODO(das): Beacon processor is overloaded, what should we do? error!(self.log, "Dropping sampling"; "block" => %self.block_root, "reason" => e.to_string()); return Err(SamplingError::SendFailed("beacon processor send failure")); } - } else { - // Peer does not have the requested data. - // TODO(das) what to do? - debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); - request.on_sampling_error()?; } } Err(err) => { - debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => ?err); + debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_indexes" => ?column_indexes, "error" => ?err); metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::FAILURE]); // Error downloading, maybe penalize peer and retry again. // TODO(das) with different peer or different peer? - request.on_sampling_error()?; + for column_index in column_indexes { + let Some(request) = self.column_requests.get_mut(column_index) else { + warn!( + self.log, + "Active column sample request not found"; "block_root" => %self.block_root, "column_index" => column_index + ); + continue; + }; + request.on_sampling_error()?; + } } }; @@ -302,43 +358,56 @@ impl ActiveSamplingRequest { /// - `Ok(None)`: Sampling request still active pub(crate) fn on_sample_verified( &mut self, - column_index: ColumnIndex, + sampling_request_id: SamplingRequestId, result: Result<(), String>, cx: &mut SyncNetworkContext, ) -> Result, SamplingError> { - // Select columns to sample - // Create individual request per column - // Progress requests - // If request fails retry or expand search - // If all good return - let Some(request) = self.column_requests.get_mut(&column_index) else { - warn!( - self.log, - "Received sampling response for unrequested column index" - ); + let Some(column_indexes) = self + .column_indexes_by_sampling_request + .get(&sampling_request_id) + else { + error!(self.log, "Column indexes for the sampling request ID not found"; "sampling_request_id" => ?sampling_request_id); return Ok(None); }; match result { Ok(_) => { - debug!(self.log, "Sample verification success"; "block_root" => %self.block_root, "column_index" => column_index); + debug!(self.log, "Sample verification success"; "block_root" => %self.block_root, "column_indexes" => ?column_indexes); metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::SUCCESS]); // Valid, continue_sampling will maybe consider sampling succees - request.on_sampling_success()?; + for column_index in column_indexes { + let Some(request) = self.column_requests.get_mut(column_index) else { + warn!( + self.log, + "Active column sample request not found"; "block_root" => %self.block_root, "column_index" => column_index + ); + continue; + }; + request.on_sampling_success()?; + } } Err(err) => { - debug!(self.log, "Sample verification failure"; "block_root" => %self.block_root, "column_index" => column_index, "reason" => ?err); + debug!(self.log, "Sample verification failure"; "block_root" => %self.block_root, "column_indexes" => ?column_indexes, "reason" => ?err); metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::FAILURE]); // TODO(das): Peer sent invalid data, penalize and try again from different peer // TODO(das): Count individual failures - let peer_id = request.on_sampling_error()?; - cx.report_peer( - peer_id, - PeerAction::LowToleranceError, - "invalid data column", - ); + for column_index in column_indexes { + let Some(request) = self.column_requests.get_mut(column_index) else { + warn!( + self.log, + "Active column sample request not found"; "block_root" => %self.block_root, "column_index" => column_index + ); + continue; + }; + let peer_id = request.on_sampling_error()?; + cx.report_peer( + peer_id, + PeerAction::LowToleranceError, + "invalid data column", + ); + } } } @@ -376,10 +445,11 @@ impl ActiveSamplingRequest { return Ok(Some(())); } - let mut sent_requests = 0; - // First, attempt to progress sampling by requesting more columns, so that request failures // are accounted for below. + + // Group the requested column indexes by the destination peer to batch sampling requests. + let mut column_indexes_to_request = FnvHashMap::default(); for idx in 0..*required_successes { // Re-request columns. Note: out of bounds error should never happen, inputs are hardcoded let column_index = *self @@ -391,8 +461,40 @@ impl ActiveSamplingRequest { .entry(column_index) .or_insert(ActiveColumnSampleRequest::new(column_index)); - if request.request(self.block_root, self.block_slot, self.requester_id, cx)? { - sent_requests += 1 + if request.is_ready_to_request() { + if let Some(peer_id) = request.choose_peer(self.block_slot, cx) { + let indexes = column_indexes_to_request.entry(peer_id).or_insert(vec![]); + indexes.push(column_index); + } + } + } + + // Send requests. + let mut sent_request = false; + for (peer_id, column_indexes) in column_indexes_to_request { + cx.data_column_lookup_request( + DataColumnsByRootRequester::Sampling(SamplingId { + id: self.requester_id, + sampling_request_id: self.current_sampling_request_id, + }), + peer_id, + DataColumnsByRootSingleBlockRequest { + block_root: self.block_root, + indices: column_indexes.clone(), + }, + ) + .map_err(SamplingError::SendFailed)?; + self.column_indexes_by_sampling_request + .insert(self.current_sampling_request_id, column_indexes.clone()); + self.current_sampling_request_id.0 += 1; + sent_request = true; + + // Update request status. + for column_index in column_indexes { + let Some(request) = self.column_requests.get_mut(&column_index) else { + continue; + }; + request.on_start_sampling(peer_id)?; } } @@ -400,7 +502,7 @@ impl ActiveSamplingRequest { // receive a new event of some type. If there are no ongoing requests, and no new // request was sent, loop to increase the required_successes until the sampling fails if // there are no peers. - if ongoings == 0 && sent_requests == 0 { + if ongoings == 0 && !sent_request { debug!(self.log, "Sampling request stalled"; "block_root" => %self.block_root); } @@ -409,14 +511,14 @@ impl ActiveSamplingRequest { } mod request { - use super::{SamplingError, SamplingId, SamplingRequester}; - use crate::sync::network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}; + use super::SamplingError; + use crate::sync::network_context::SyncNetworkContext; use beacon_chain::BeaconChainTypes; - use lighthouse_network::{service::api_types::DataColumnsByRootRequester, PeerId}; + use lighthouse_network::PeerId; use rand::seq::SliceRandom; use rand::thread_rng; use std::collections::HashSet; - use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot}; + use types::{data_column_sidecar::ColumnIndex, EthSpec, Slot}; pub(crate) struct ActiveColumnSampleRequest { column_index: ColumnIndex, @@ -463,19 +565,18 @@ mod request { } } - pub(crate) fn request( - &mut self, - block_root: Hash256, - block_slot: Slot, - requester: SamplingRequester, - cx: &mut SyncNetworkContext, - ) -> Result { - match &self.status { - Status::NoPeers | Status::NotStarted => {} // Ok to continue - Status::Sampling(_) => return Ok(false), // Already downloading - Status::Verified => return Ok(false), // Already completed + pub(crate) fn is_ready_to_request(&self) -> bool { + match self.status { + Status::NoPeers | Status::NotStarted => true, + Status::Sampling(_) | Status::Verified => false, } + } + pub(crate) fn choose_peer( + &mut self, + block_slot: Slot, + cx: &SyncNetworkContext, + ) -> Option { // TODO: When is a fork and only a subset of your peers know about a block, sampling should only // be queried on the peers on that fork. Should this case be handled? How to handle it? let mut peer_ids = cx.get_custodial_peers( @@ -485,25 +586,24 @@ mod request { peer_ids.retain(|peer_id| !self.peers_dont_have.contains(peer_id)); - if let Some(peer_id) = peer_ids.choose(&mut thread_rng()).cloned() { - cx.data_column_lookup_request( - DataColumnsByRootRequester::Sampling(SamplingId { - id: requester, - column_index: self.column_index, - }), - peer_id, - DataColumnsByRootSingleBlockRequest { - block_root, - indices: vec![self.column_index], - }, - ) - .map_err(SamplingError::SendFailed)?; - - self.status = Status::Sampling(peer_id); - Ok(true) + if let Some(peer_id) = peer_ids.choose(&mut thread_rng()) { + Some(*peer_id) } else { self.status = Status::NoPeers; - Ok(false) + None + } + } + + pub(crate) fn on_start_sampling(&mut self, peer_id: PeerId) -> Result<(), SamplingError> { + match self.status.clone() { + Status::NoPeers | Status::NotStarted => { + self.status = Status::Sampling(peer_id); + Ok(()) + } + other => Err(SamplingError::BadState(format!( + "bad state on_start_sampling expected NoPeers|NotStarted got {other:?}. column_index:{}", + self.column_index + ))), } } @@ -515,7 +615,8 @@ mod request { Ok(peer_id) } other => Err(SamplingError::BadState(format!( - "bad state on_sampling_error expected Sampling got {other:?}" + "bad state on_sampling_error expected Sampling got {other:?}. column_index:{}", + self.column_index ))), } } @@ -527,7 +628,8 @@ mod request { Ok(()) } other => Err(SamplingError::BadState(format!( - "bad state on_sampling_success expected Sampling got {other:?}" + "bad state on_sampling_success expected Sampling got {other:?}. column_index:{}", + self.column_index ))), } }