From 1142f5f7d86c873f14bca9783fe1b9aab60ebfe9 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 9 May 2024 15:26:13 +0900 Subject: [PATCH 1/2] Fetch custody columns in range sync --- .../src/block_verification_types.rs | 6 +- .../src/rpc/codec/ssz_snappy.rs | 26 +- .../lighthouse_network/src/rpc/config.rs | 23 +- .../lighthouse_network/src/rpc/methods.rs | 12 +- .../lighthouse_network/src/rpc/outbound.rs | 4 +- .../lighthouse_network/src/rpc/protocol.rs | 3 +- .../src/rpc/rate_limiter.rs | 31 ++- .../src/service/api_types.rs | 8 +- .../network_beacon_processor/rpc_methods.rs | 120 ++++----- .../network/src/sync/backfill_sync/mod.rs | 2 +- .../src/sync/block_sidecar_coupling.rs | 247 ++++++++++++++---- beacon_node/network/src/sync/manager.rs | 7 +- .../network/src/sync/network_context.rs | 121 ++++++--- .../src/sync/network_context/custody.rs | 2 + .../network/src/sync/range_sync/chain.rs | 2 +- 15 files changed, 404 insertions(+), 210 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 1a0ff1d6bbc..636a00c0a9f 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -156,11 +156,9 @@ impl RpcBlock { ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - if let Ok(block_commitments) = block.message().body().blob_kzg_commitments() { + if block.num_expected_blobs() > 0 && custody_columns.is_empty() { // The number of required custody columns is out of scope here. - if !block_commitments.is_empty() && custody_columns.is_empty() { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } + return Err(AvailabilityCheckError::MissingCustodyColumns); } // Treat empty blob lists as if they are missing. let inner = if custody_columns.is_empty() { diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 90eca79d98a..74751c604ba 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -227,8 +227,8 @@ impl Encoder> for SSZSnappyOutboundCodec { }, OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), + OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(), OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), - OutboundRequest::DataColumnsByRange(req) => req.data_column_ids.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode }; @@ -521,6 +521,9 @@ fn handle_rpc_request( )?, }))) } + SupportedProtocol::DataColumnsByRangeV1 => Ok(Some(InboundRequest::DataColumnsByRange( + DataColumnsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))), SupportedProtocol::DataColumnsByRootV1 => Ok(Some(InboundRequest::DataColumnsByRoot( DataColumnsByRootRequest { data_column_ids: RuntimeVariableList::from_ssz_bytes( @@ -529,9 +532,6 @@ fn handle_rpc_request( )?, }, ))), - SupportedProtocol::DataColumnsByRangeV1 => Ok(Some(InboundRequest::DataColumnsByRange( - DataColumnsByRangeRequest::from_ssz_bytes(decoded_buffer)?, - ))), SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -624,14 +624,14 @@ fn handle_rpc_response( ), )), }, - SupportedProtocol::DataColumnsByRootV1 => match fork_name { + SupportedProtocol::DataColumnsByRangeV1 => match fork_name { // TODO(das): update fork name - Some(ForkName::Deneb) => Ok(Some(RPCResponse::DataColumnsByRoot(Arc::new( + Some(ForkName::Deneb) => Ok(Some(RPCResponse::DataColumnsByRange(Arc::new( DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, )))), Some(_) => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid fork name for data columns by root".to_string(), + "Invalid fork name for data columns by range".to_string(), )), None => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, @@ -641,14 +641,14 @@ fn handle_rpc_response( ), )), }, - SupportedProtocol::DataColumnsByRangeV1 => match fork_name { + SupportedProtocol::DataColumnsByRootV1 => match fork_name { // TODO(das): update fork name - Some(ForkName::Deneb) => Ok(Some(RPCResponse::DataColumnsByRange(Arc::new( + Some(ForkName::Deneb) => Ok(Some(RPCResponse::DataColumnsByRoot(Arc::new( DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, )))), Some(_) => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid fork name for data columns by range".to_string(), + "Invalid fork name for data columns by root".to_string(), )), None => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, @@ -1066,12 +1066,12 @@ mod tests { OutboundRequest::BlobsByRoot(bbroot) => { assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot)) } + OutboundRequest::DataColumnsByRange(value) => { + assert_eq!(decoded, InboundRequest::DataColumnsByRange(value)) + } OutboundRequest::DataColumnsByRoot(dcbroot) => { assert_eq!(decoded, InboundRequest::DataColumnsByRoot(dcbroot)) } - OutboundRequest::DataColumnsByRange(dcbrange) => { - assert_eq!(decoded, InboundRequest::DataColumnsByRange(dcbrange)) - } OutboundRequest::Ping(ping) => { assert_eq!(decoded, InboundRequest::Ping(ping)) } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index a357495e7a8..7f1595d5295 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -91,8 +91,8 @@ pub struct RateLimiterConfig { pub(super) blocks_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, pub(super) blobs_by_root_quota: Quota, - pub(super) data_columns_by_root_quota: Quota, pub(super) data_columns_by_range_quota: Quota, + pub(super) data_columns_by_root_quota: Quota, pub(super) light_client_bootstrap_quota: Quota, pub(super) light_client_optimistic_update_quota: Quota, pub(super) light_client_finality_update_quota: Quota, @@ -107,8 +107,9 @@ impl RateLimiterConfig { pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10); pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); - pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); + // TODO(das): random value without thought pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota = Quota::n_every(128, 10); + pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); @@ -125,8 +126,8 @@ impl Default for RateLimiterConfig { blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, - data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, data_columns_by_range_quota: Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA, + data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA, light_client_optimistic_update_quota: Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA, @@ -156,6 +157,10 @@ impl Debug for RateLimiterConfig { .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) .field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota)) .field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota)) + .field( + "data_columns_by_range", + fmt_q!(&self.data_columns_by_range_quota), + ) .field( "data_columns_by_root", fmt_q!(&self.data_columns_by_root_quota), @@ -180,8 +185,8 @@ impl FromStr for RateLimiterConfig { let mut blocks_by_root_quota = None; let mut blobs_by_range_quota = None; let mut blobs_by_root_quota = None; - let mut data_columns_by_root_quota = None; let mut data_columns_by_range_quota = None; + let mut data_columns_by_root_quota = None; let mut light_client_bootstrap_quota = None; let mut light_client_optimistic_update_quota = None; let mut light_client_finality_update_quota = None; @@ -196,12 +201,12 @@ impl FromStr for RateLimiterConfig { Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), - Protocol::DataColumnsByRoot => { - data_columns_by_root_quota = data_columns_by_root_quota.or(quota) - } Protocol::DataColumnsByRange => { data_columns_by_range_quota = data_columns_by_range_quota.or(quota) } + Protocol::DataColumnsByRoot => { + data_columns_by_root_quota = data_columns_by_root_quota.or(quota) + } Protocol::Ping => ping_quota = ping_quota.or(quota), Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), Protocol::LightClientBootstrap => { @@ -229,10 +234,10 @@ impl FromStr for RateLimiterConfig { blobs_by_range_quota: blobs_by_range_quota .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), - data_columns_by_root_quota: data_columns_by_root_quota - .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA), data_columns_by_range_quota: data_columns_by_range_quota .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA), + data_columns_by_root_quota: data_columns_by_root_quota + .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA), light_client_bootstrap_quota: light_client_bootstrap_quota .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA), light_client_optimistic_update_quota: light_client_optimistic_update_quota diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 84add18ee05..a892b7f07cc 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -300,17 +300,17 @@ impl BlobsByRangeRequest { pub struct DataColumnsByRangeRequest { /// The starting slot to request data columns. pub start_slot: u64, - /// The number of slots from the start slot. pub count: u64, - - /// The list of beacon block roots and column indices being requested. - pub data_column_ids: Vec, + /// The list column indices being requested. + pub columns: Vec, } impl DataColumnsByRangeRequest { - pub fn max_data_columns_requested(&self) -> u64 { - self.count.saturating_mul(E::max_blobs_per_block() as u64) + pub fn max_requested(&self) -> u64 { + self.count + .saturating_mul(E::max_blobs_per_block() as u64) + .saturating_mul(self.columns.len() as u64) } } diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index bcb76c00081..5c537c49d48 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -36,8 +36,8 @@ pub enum OutboundRequest { BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), - DataColumnsByRoot(DataColumnsByRootRequest), DataColumnsByRange(DataColumnsByRangeRequest), + DataColumnsByRoot(DataColumnsByRootRequest), Ping(Ping), MetaData(MetadataRequest), } @@ -111,7 +111,7 @@ impl OutboundRequest { OutboundRequest::BlobsByRange(req) => req.max_blobs_requested::(), OutboundRequest::BlobsByRoot(req) => req.blob_ids.len() as u64, OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, - OutboundRequest::DataColumnsByRange(req) => req.data_column_ids.len() as u64, + OutboundRequest::DataColumnsByRange(req) => req.max_requested::(), OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 1447a57e706..eadeb4d4374 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -382,6 +382,7 @@ impl SupportedProtocol { ProtocolId::new(SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy), // TODO(das): add to PeerDAS fork ProtocolId::new(SupportedProtocol::DataColumnsByRootV1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy), ]); } supported @@ -704,7 +705,7 @@ impl InboundRequest { InboundRequest::BlobsByRange(req) => req.max_blobs_requested::(), InboundRequest::BlobsByRoot(req) => req.blob_ids.len() as u64, InboundRequest::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, - InboundRequest::DataColumnsByRange(req) => req.data_column_ids.len() as u64, + InboundRequest::DataColumnsByRange(req) => req.max_requested::(), InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, InboundRequest::LightClientBootstrap(_) => 1, diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 9fb085efd86..de0b1afc8b4 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -97,10 +97,10 @@ pub struct RPCRateLimiter { blbrange_rl: Limiter, /// BlobsByRoot rate limiter. blbroot_rl: Limiter, + /// DataColumnssByRange rate limiter. + dcbrange_rl: Limiter, /// DataColumnssByRoot rate limiter. dcbroot_rl: Limiter, - /// DataColumnsByRange rate limiter. - dcbrange_rl: Limiter, /// LightClientBootstrap rate limiter. lc_bootstrap_rl: Limiter, /// LightClientOptimisticUpdate rate limiter. @@ -137,10 +137,10 @@ pub struct RPCRateLimiterBuilder { blbrange_quota: Option, /// Quota for the BlobsByRoot protocol. blbroot_quota: Option, - /// Quota for the DataColumnsByRoot protocol. - dcbroot_quota: Option, /// Quota for the DataColumnsByRange protocol. dcbrange_quota: Option, + /// Quota for the DataColumnsByRoot protocol. + dcbroot_quota: Option, /// Quota for the LightClientBootstrap protocol. lcbootstrap_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. @@ -162,8 +162,8 @@ impl RPCRateLimiterBuilder { Protocol::BlocksByRoot => self.bbroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, Protocol::BlobsByRoot => self.blbroot_quota = q, - Protocol::DataColumnsByRoot => self.dcbroot_quota = q, Protocol::DataColumnsByRange => self.dcbrange_quota = q, + Protocol::DataColumnsByRoot => self.dcbroot_quota = q, Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, Protocol::LightClientOptimisticUpdate => self.lc_optimistic_update_quota = q, Protocol::LightClientFinalityUpdate => self.lc_finality_update_quota = q, @@ -196,18 +196,15 @@ impl RPCRateLimiterBuilder { let blbrange_quota = self .blbrange_quota .ok_or("BlobsByRange quota not specified")?; - let blbroots_quota = self .blbroot_quota .ok_or("BlobsByRoot quota not specified")?; - - let dcbroot_quota = self - .dcbroot_quota - .ok_or("DataColumnsByRoot quota not specified")?; - let dcbrange_quota = self .dcbrange_quota .ok_or("DataColumnsByRange quota not specified")?; + let dcbroot_quota = self + .dcbroot_quota + .ok_or("DataColumnsByRoot quota not specified")?; // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; @@ -218,8 +215,8 @@ impl RPCRateLimiterBuilder { let bbrange_rl = Limiter::from_quota(bbrange_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; let blbroot_rl = Limiter::from_quota(blbroots_quota)?; - let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; let dcbrange_rl = Limiter::from_quota(dcbrange_quota)?; + let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; let lc_bootstrap_rl = Limiter::from_quota(lc_bootstrap_quota)?; let lc_optimistic_update_rl = Limiter::from_quota(lc_optimistic_update_quota)?; let lc_finality_update_rl = Limiter::from_quota(lc_finality_update_quota)?; @@ -238,8 +235,8 @@ impl RPCRateLimiterBuilder { bbrange_rl, blbrange_rl, blbroot_rl, - dcbroot_rl, dcbrange_rl, + dcbroot_rl, lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, @@ -284,8 +281,8 @@ impl RPCRateLimiter { blocks_by_root_quota, blobs_by_range_quota, blobs_by_root_quota, - data_columns_by_root_quota, data_columns_by_range_quota, + data_columns_by_root_quota, light_client_bootstrap_quota, light_client_optimistic_update_quota, light_client_finality_update_quota, @@ -300,8 +297,8 @@ impl RPCRateLimiter { .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) - .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) .set_quota(Protocol::DataColumnsByRange, data_columns_by_range_quota) + .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) .set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota) .set_quota( Protocol::LightClientOptimisticUpdate, @@ -338,8 +335,8 @@ impl RPCRateLimiter { Protocol::BlocksByRoot => &mut self.bbroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, Protocol::BlobsByRoot => &mut self.blbroot_rl, - Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, Protocol::DataColumnsByRange => &mut self.dcbrange_rl, + Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, Protocol::LightClientBootstrap => &mut self.lc_bootstrap_rl, Protocol::LightClientOptimisticUpdate => &mut self.lc_optimistic_update_rl, Protocol::LightClientFinalityUpdate => &mut self.lc_finality_update_rl, @@ -357,6 +354,8 @@ impl RPCRateLimiter { self.bbroots_rl.prune(time_since_start); self.blbrange_rl.prune(time_since_start); self.blbroot_rl.prune(time_since_start); + self.dcbrange_rl.prune(time_since_start); + self.dcbroot_rl.prune(time_since_start); } } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 0902b6d27ba..86f681fa601 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -6,7 +6,9 @@ use types::{ LightClientOptimisticUpdate, SignedBeaconBlock, }; -use crate::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest}; +use crate::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, +}; use crate::rpc::{ methods::{ BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, @@ -16,8 +18,6 @@ use crate::rpc::{ OutboundRequest, SubstreamId, }; -use super::methods::DataColumnsByRangeRequest; - /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); @@ -84,8 +84,8 @@ impl std::convert::From for OutboundRequest { } Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r), Request::BlobsByRoot(r) => OutboundRequest::BlobsByRoot(r), - Request::DataColumnsByRoot(r) => OutboundRequest::DataColumnsByRoot(r), Request::DataColumnsByRange(r) => OutboundRequest::DataColumnsByRange(r), + Request::DataColumnsByRoot(r) => OutboundRequest::DataColumnsByRoot(r), Request::Status(s) => OutboundRequest::Status(s), } } diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 04329762629..fe69786b522 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -14,12 +14,13 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; +use std::collections::HashSet; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; -use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, ForkName, Hash256, Slot}; impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -936,11 +937,26 @@ impl NetworkBeaconProcessor { /// Handle a `DataColumnsByRange` request from the peer. pub fn handle_data_columns_by_range_request( - self: Arc, + &self, peer_id: PeerId, request_id: PeerRequestId, req: DataColumnsByRangeRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.handle_data_columns_by_range_request_inner(peer_id, request_id, req), + Response::DataColumnsByRange, + ); + } + + /// Handle a `DataColumnsByRange` request from the peer. + pub fn handle_data_columns_by_range_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + req: DataColumnsByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received DataColumnsByRange Request"; "peer_id" => %peer_id, "count" => req.count, @@ -948,15 +964,11 @@ impl NetworkBeaconProcessor { ); // Should not send more than max request data columns - if req.max_data_columns_requested::() - > self.chain.spec.max_request_data_column_sidecars - { - return self.send_error_response( - peer_id, + if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { + return Err(( RPCResponseErrorCode::InvalidRequest, - "Request exceeded `MAX_REQUEST_DATA_COLUMN_SIDECARS`".into(), - request_id, - ); + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + )); } let request_start_slot = Slot::from(req.start_slot); @@ -965,13 +977,10 @@ impl NetworkBeaconProcessor { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { debug!(self.log, "Deneb fork is disabled"); - self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - "Deneb fork is disabled".into(), - request_id, - ); - return; + "Deneb fork is disabled", + )); } }; @@ -992,19 +1001,15 @@ impl NetworkBeaconProcessor { ); return if data_availability_boundary_slot < oldest_data_column_slot { - self.send_error_response( - peer_id, + Err(( RPCResponseErrorCode::ResourceUnavailable, - "data columns pruned within boundary".into(), - request_id, - ) + "blobs pruned within boundary", + )) } else { - self.send_error_response( - peer_id, + Err(( RPCResponseErrorCode::InvalidRequest, - "Req outside availability period".into(), - request_id, - ) + "Req outside availability period", + )) }; } @@ -1021,25 +1026,15 @@ impl NetworkBeaconProcessor { "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); } Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database error".into(), - request_id, - ); - return error!(self.log, "Unable to obtain root iter"; + error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -1071,11 +1066,12 @@ impl NetworkBeaconProcessor { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - return error!(self.log, "Error during iteration over blocks"; + error!(self.log, "Error during iteration over blocks"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -1083,23 +1079,22 @@ impl NetworkBeaconProcessor { let block_roots = block_roots.into_iter().flatten(); let mut data_columns_sent = 0; - let mut send_response = true; + let requested_column_indices = + HashSet::::from_iter(req.columns.iter().copied()); for root in block_roots { match self.chain.get_data_columns(&root) { Ok(data_column_sidecar_list) => { for data_column_sidecar in data_column_sidecar_list.iter() { - for &data_column_id in req.data_column_ids.iter() { - if data_column_sidecar.id() == data_column_id { - data_columns_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::DataColumnsByRange(Some( - data_column_sidecar.clone(), - )), - id: request_id, - }); - } + if requested_column_indices.contains(&data_column_sidecar.index) { + data_columns_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::DataColumnsByRange(Some( + data_column_sidecar.clone(), + )), + id: request_id, + }); } } } @@ -1112,14 +1107,10 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "error" => ?e ); - self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::ServerError, - "No data columns and failed fetching corresponding block".into(), - request_id, - ); - send_response = false; - break; + "No data columns and failed fetching corresponding block", + )); } } } @@ -1139,14 +1130,7 @@ impl NetworkBeaconProcessor { "returned" => data_columns_sent ); - if send_response { - // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::DataColumnsByRange(None), - id: request_id, - }); - } + Ok(()) } /// Helper function to ensure single item protocol always end with either a single chunk or an diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 4be92d59a4b..0eb7b8634ff 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -929,7 +929,7 @@ impl BackFillSync { ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, is_blob_batch) = batch.to_blocks_by_range_request(); - match network.blocks_and_blobs_by_range_request( + match network.block_components_by_range_request( peer, is_blob_batch, request, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 80cfb4eb64b..4565e389901 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,78 +1,90 @@ use beacon_chain::{ - block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, + block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; use ssz_types::VariableList; -use std::{collections::VecDeque, sync::Arc}; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; - -use super::range_sync::ByRangeRequestType; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; +use types::{BlobSidecar, ColumnIndex, EthSpec, Hash256, SignedBeaconBlock}; #[derive(Debug)] -pub struct BlocksAndBlobsRequestInfo { +pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. - accumulated_blocks: VecDeque>>, + blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. - accumulated_sidecars: VecDeque>>, - accumulated_custody_columns: VecDeque>, + blobs: VecDeque>>, + custody_columns: VecDeque>, /// Whether the individual RPC request for blocks is finished or not. is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. is_sidecars_stream_terminated: bool, - is_custody_columns_stream_terminated: bool, + custody_columns_streams_terminated: usize, /// Used to determine if this accumulator should wait for a sidecars stream termination - request_type: ByRangeRequestType, + expects_blobs: bool, + expects_custody_columns: Option>, } -impl BlocksAndBlobsRequestInfo { - pub fn new(request_type: ByRangeRequestType) -> Self { +impl RangeBlockComponentsRequest { + pub fn new(expects_blobs: bool, expects_custody_columns: Option>) -> Self { Self { - accumulated_blocks: <_>::default(), - accumulated_sidecars: <_>::default(), - accumulated_custody_columns: <_>::default(), - is_blocks_stream_terminated: <_>::default(), - is_sidecars_stream_terminated: <_>::default(), - is_custody_columns_stream_terminated: <_>::default(), - request_type, + blocks: <_>::default(), + blobs: <_>::default(), + custody_columns: <_>::default(), + is_blocks_stream_terminated: false, + is_sidecars_stream_terminated: false, + custody_columns_streams_terminated: 0, + expects_blobs, + expects_custody_columns, } } - pub fn get_request_type(&self) -> ByRangeRequestType { - self.request_type + // TODO: This function should be deprecated when simplying the retry mechanism of this range + // requests. + pub fn get_requirements(&self) -> (bool, Option>) { + (self.expects_blobs, self.expects_custody_columns.clone()) } pub fn add_block_response(&mut self, block_opt: Option>>) { match block_opt { - Some(block) => self.accumulated_blocks.push_back(block), + Some(block) => self.blocks.push_back(block), None => self.is_blocks_stream_terminated = true, } } pub fn add_sidecar_response(&mut self, sidecar_opt: Option>>) { match sidecar_opt { - Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), + Some(sidecar) => self.blobs.push_back(sidecar), None => self.is_sidecars_stream_terminated = true, } } pub fn add_custody_column(&mut self, column_opt: Option>) { match column_opt { - Some(column) => self.accumulated_custody_columns.push_back(column), - None => self.is_custody_columns_stream_terminated = true, + Some(column) => self.custody_columns.push_back(column), + // TODO(das): this mechanism is dangerous, if somehow there are two requests for the + // same column index it can terminate early. This struct should track that all requests + // for all custody columns terminate. + None => self.custody_columns_streams_terminated += 1, } } pub fn into_responses(self) -> Result>, String> { - let BlocksAndBlobsRequestInfo { - accumulated_blocks, - accumulated_sidecars, - .. - } = self; + if let Some(expects_custody_columns) = self.expects_custody_columns.clone() { + self.into_responses_with_custody_columns(expects_custody_columns) + } else { + self.into_responses_with_blobs() + } + } + + fn into_responses_with_blobs(self) -> Result>, String> { + let RangeBlockComponentsRequest { blocks, blobs, .. } = self; // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. - let mut responses = Vec::with_capacity(accumulated_blocks.len()); - let mut blob_iter = accumulated_sidecars.into_iter().peekable(); - for block in accumulated_blocks.into_iter() { + let mut responses = Vec::with_capacity(blocks.len()); + let mut blob_iter = blobs.into_iter().peekable(); + for block in blocks.into_iter() { let mut blob_list = Vec::with_capacity(E::max_blobs_per_block()); while { let pair_next_blob = blob_iter @@ -108,27 +120,116 @@ impl BlocksAndBlobsRequestInfo { Ok(responses) } + fn into_responses_with_custody_columns( + self, + expects_custody_columns: Vec, + ) -> Result>, String> { + let RangeBlockComponentsRequest { + blocks, + custody_columns, + .. + } = self; + + // Group data columns by block_root and index + let mut custody_columns_by_block = + HashMap::>>::new(); + + for column in custody_columns { + let block_root = column.as_data_column().block_root(); + let index = column.index(); + if custody_columns_by_block + .entry(block_root) + .or_default() + .insert(index, column) + .is_some() + { + return Err(format!( + "Repeated column block_root {block_root:?} index {index}" + )); + } + } + + // Now iterate all blocks ensuring that the block roots of each block and data column match, + // plus we have columns for our custody requirements + let mut rpc_blocks = Vec::with_capacity(blocks.len()); + + for block in blocks { + let block_root = get_block_root(&block); + rpc_blocks.push(if block.num_expected_blobs() > 0 { + let Some(mut custody_columns_by_index) = + custody_columns_by_block.remove(&block_root) + else { + // This PR ignores the fix from https://github.com/sigp/lighthouse/pull/5675 + // which allows blobs to not match blocks. + // TODO(das): on the initial version of PeerDAS the beacon chain does not check + // rpc custody requirements and dropping this check can allow the block to have + // an inconsistent DB. + return Err(format!("No columns for block {block_root:?} with data")); + }; + + let mut custody_columns = vec![]; + for index in &expects_custody_columns { + let Some(custody_column) = custody_columns_by_index.remove(index) else { + return Err(format!("No column for block {block_root:?} index {index}")); + }; + custody_columns.push(custody_column); + } + + // Assert that there are no columns left + if !custody_columns_by_index.is_empty() { + let remaining_indices = custody_columns_by_index.keys().collect::>(); + return Err(format!( + "Not all columns consumed for block {block_root:?}: {remaining_indices:?}" + )); + } + + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns) + .map_err(|e| format!("{e:?}"))? + } else { + RpcBlock::new_without_blobs(Some(block_root), block) + }); + } + + // Assert that there are no columns left for other blocks + if !custody_columns_by_block.is_empty() { + let remaining_roots = custody_columns_by_block.keys().collect::>(); + return Err(format!("Not all columns consumed: {remaining_roots:?}")); + } + + Ok(rpc_blocks) + } + pub fn is_finished(&self) -> bool { - let blobs_requested = matches!(self.request_type, ByRangeRequestType::BlocksAndBlobs); - let custody_columns_requested = - matches!(self.request_type, ByRangeRequestType::BlocksAndColumns); - self.is_blocks_stream_terminated - && (!blobs_requested || self.is_sidecars_stream_terminated) - && (!custody_columns_requested || self.is_custody_columns_stream_terminated) + if !self.is_blocks_stream_terminated { + return false; + } + if self.expects_blobs && !self.is_sidecars_stream_terminated { + return false; + } + if let Some(expects_custody_columns) = &self.expects_custody_columns { + if self.custody_columns_streams_terminated < expects_custody_columns.len() { + return false; + } + } + true } } #[cfg(test)] mod tests { - use super::BlocksAndBlobsRequestInfo; - use crate::sync::range_sync::ByRangeRequestType; - use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs}; + use super::RangeBlockComponentsRequest; + use beacon_chain::{ + data_column_verification::CustodyDataColumn, + test_utils::{ + generate_rand_block_and_blobs, generate_rand_block_and_data_columns, NumBlobs, + }, + }; use rand::SeedableRng; use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; #[test] fn no_blobs_into_responses() { - let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::Blocks); + let mut info = RangeBlockComponentsRequest::::new(false, None); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng).0) @@ -147,7 +248,7 @@ mod tests { #[test] fn empty_blobs_into_responses() { - let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::BlocksAndBlobs); + let mut info = RangeBlockComponentsRequest::::new(true, None); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -170,4 +271,60 @@ mod tests { assert!(info.is_finished()); info.into_responses().unwrap(); } + + #[test] + fn rpc_block_with_custody_columns() { + let expects_custody_columns = vec![1, 2, 3, 4]; + let mut info = + RangeBlockComponentsRequest::::new(false, Some(expects_custody_columns.clone())); + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Deneb, + NumBlobs::Number(1), + &mut rng, + ) + }) + .collect::>(); + + // Send blocks and complete terminate response + for block in &blocks { + info.add_block_response(Some(block.0.clone().into())); + } + info.add_block_response(None); + // Assert response is not finished + assert!(!info.is_finished()); + + // Send data columns interleaved + for block in &blocks { + for column in &block.1 { + if expects_custody_columns.contains(&column.index) { + info.add_custody_column(Some(CustodyDataColumn::from_asserted_custody( + column.clone().into(), + ))); + } + } + } + + // Terminate the requests + for (i, _column_index) in expects_custody_columns.iter().enumerate() { + info.add_custody_column(None); + + if i < expects_custody_columns.len() - 1 { + assert!( + !info.is_finished(), + "requested should not be finished at loop {i}" + ); + } else { + assert!( + info.is_finished(), + "request should be finishied at loop {i}" + ); + } + } + + // All completed construct response + info.into_responses().unwrap(); + } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 05ef447d828..797f57a0a6b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -47,7 +47,7 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; -use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo; +use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -1135,7 +1135,10 @@ impl SyncManager { self.network.insert_range_blocks_and_blobs_request( id, resp.sender_id, - BlocksAndBlobsRequestInfo::new(resp.request_type), + RangeBlockComponentsRequest::new( + resp.expects_blobs, + resp.expects_custody_columns, + ), ); // inform range that the request needs to be treated as failed // With time we will want to downgrade this log diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index bdcebc80516..d2abfc8d9ed 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -9,7 +9,7 @@ use self::requests::{ pub use self::requests::{ BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest, }; -use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; +use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::{ BlockProcessType, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, RequestId as SyncRequestId, @@ -25,7 +25,7 @@ use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{ Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, @@ -40,7 +40,7 @@ use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256, - SignedBeaconBlock, + SignedBeaconBlock, Slot, }; pub mod custody; @@ -49,7 +49,8 @@ mod requests; pub struct BlocksAndBlobsByRangeResponse { pub sender_id: RangeRequestId, pub responses: Result>, String>, - pub request_type: ByRangeRequestType, + pub expects_blobs: bool, + pub expects_custody_columns: Option>, } #[derive(Debug, Clone, Copy)] @@ -129,8 +130,8 @@ pub struct SyncNetworkContext { custody_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange - range_blocks_and_blobs_requests: - FnvHashMap)>, + range_block_components_requests: + FnvHashMap)>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -179,7 +180,7 @@ impl SyncNetworkContext { blobs_by_root_requests: <_>::default(), data_columns_by_root_requests: <_>::default(), custody_by_root_requests: <_>::default(), - range_blocks_and_blobs_requests: FnvHashMap::default(), + range_block_components_requests: FnvHashMap::default(), network_beacon_processor, chain, log, @@ -253,19 +254,22 @@ impl SyncNetworkContext { } } - /// A blocks by range request for the range sync algorithm. - pub fn blocks_by_range_request( + /// A blocks by range request sent by the range sync algorithm + pub fn block_components_by_range_request( &mut self, peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, + sender_id: RangeRequestId, ) -> Result { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let id = self.next_id(); - trace!( + debug!( self.log, "Sending BlocksByRange request"; "method" => "BlocksByRange", "count" => request.count(), + "epoch" => epoch, "peer" => %peer_id, ); self.send_network_msg(NetworkMessage::SendRequest { @@ -274,12 +278,13 @@ impl SyncNetworkContext { request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), })?; - if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { + let expected_blobs = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { debug!( self.log, "Sending BlobsByRange requests"; "method" => "BlobsByRange", "count" => request.count(), + "epoch" => epoch, "peer" => %peer_id, ); @@ -292,28 +297,62 @@ impl SyncNetworkContext { }), request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), })?; - } + true + } else { + false + }; - Ok(id) - } + let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) + { + let custody_indexes = self.network_globals().custody_columns(epoch)?; + + for column_index in &custody_indexes { + let custody_peer_ids = self.get_custodial_peers(epoch, *column_index); + let Some(custody_peer) = custody_peer_ids.first().cloned() else { + // TODO(das): this will be pretty bad UX. To improve we should: + // - Attempt to fetch custody requests first, before requesting blocks + // - Handle the no peers case gracefully, maybe add some timeout and give a few + // minutes / seconds to the peer manager to locate peers on this subnet before + // abandoing progress on the chain completely. + return Err("no custody peer"); + }; + + debug!( + self.log, + "Sending DataColumnsByRange requests"; + "method" => "DataColumnsByRange", + "count" => request.count(), + "epoch" => epoch, + "index" => column_index, + "peer" => %custody_peer, + ); - /// A blocks by range request sent by the range sync algorithm - pub fn blocks_and_blobs_by_range_request( - &mut self, - peer_id: PeerId, - batch_type: ByRangeRequestType, - request: BlocksByRangeRequest, - sender_id: RangeRequestId, - ) -> Result { - let id = self.blocks_by_range_request(peer_id, batch_type, request)?; - self.range_blocks_and_blobs_requests - .insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type))); + // Create the blob request based on the blocks request. + self.send_network_msg(NetworkMessage::SendRequest { + peer_id: custody_peer, + request: Request::DataColumnsByRange(DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns: vec![*column_index], + }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + })?; + } + + Some(custody_indexes) + } else { + None + }; + + let info = RangeBlockComponentsRequest::new(expected_blobs, expects_custody_columns); + self.range_block_components_requests + .insert(id, (sender_id, info)); Ok(id) } pub fn range_request_failed(&mut self, request_id: Id) -> Option { let sender_id = self - .range_blocks_and_blobs_requests + .range_block_components_requests .remove(&request_id) .map(|(sender_id, _info)| sender_id); if let Some(sender_id) = sender_id { @@ -337,7 +376,7 @@ impl SyncNetworkContext { request_id: Id, block_or_blob: BlockOrBlob, ) -> Option> { - match self.range_blocks_and_blobs_requests.entry(request_id) { + match self.range_block_components_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { @@ -348,11 +387,12 @@ impl SyncNetworkContext { if info.is_finished() { // If the request is finished, dequeue everything let (sender_id, info) = entry.remove(); - let request_type = info.get_request_type(); + let (expects_blobs, expects_custody_columns) = info.get_requirements(); Some(BlocksAndBlobsByRangeResponse { sender_id, - request_type, responses: info.into_responses(), + expects_blobs, + expects_custody_columns, }) } else { None @@ -727,13 +767,18 @@ impl SyncNetworkContext { "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); - if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { - if epoch >= data_availability_boundary { - // TODO(das): After peerdas fork, return `BlocksAndColumns` - ByRangeRequestType::BlocksAndBlobs - } else { - ByRangeRequestType::Blocks - } + if self + .chain + .data_availability_checker + .data_columns_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndColumns + } else if self + .chain + .data_availability_checker + .blobs_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndBlobs } else { ByRangeRequestType::Blocks } @@ -743,9 +788,9 @@ impl SyncNetworkContext { &mut self, id: Id, sender_id: RangeRequestId, - info: BlocksAndBlobsRequestInfo, + info: RangeBlockComponentsRequest, ) { - self.range_blocks_and_blobs_requests + self.range_block_components_requests .insert(id, (sender_id, info)); } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 72964d3f362..7e080c5bf64 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -109,6 +109,8 @@ impl ActiveCustodyRequest { } else { // Peer does not have the requested data. // TODO(das) what to do? + // TODO(das): If the peer is in the lookup peer set it claims to have imported + // the block AND its custody columns. So in this case we can downscore debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); // TODO(das) tolerate this failure if you are not sure the block has data request.on_download_success()?; diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 9a6c99ebf6c..809d2d7bb16 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -884,7 +884,7 @@ impl SyncingChain { ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); - match network.blocks_and_blobs_by_range_request( + match network.block_components_by_range_request( peer, batch_type, request, From e242436eb8f40b8fa5f14b2d3aebfd8f28d5fc14 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 9 May 2024 15:54:57 +0900 Subject: [PATCH 2/2] Clean up todos --- beacon_node/network/src/router.rs | 2 +- .../network/src/sync/block_lookups/tests.rs | 7 +- .../src/sync/block_sidecar_coupling.rs | 55 +++++++------- beacon_node/network/src/sync/manager.rs | 72 ++++++------------- .../network/src/sync/network_context.rs | 12 ++-- .../src/sync/network_context/custody.rs | 9 ++- .../network/src/sync/range_sync/range.rs | 4 +- 7 files changed, 60 insertions(+), 101 deletions(-) diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 944f21a4700..375cef2094d 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -521,7 +521,7 @@ impl Router { ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { - id @ SyncId::RangeBlockAndBlobs { .. } => id, + id @ SyncId::RangeBlockComponents { .. } => id, other => { crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other); return; diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index f2fa7bd5be6..c3f0b343a0b 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -5,7 +5,6 @@ use crate::sync::manager::{ DataColumnsByRootRequestId, DataColumnsByRootRequester, RequestId as SyncRequestId, SingleLookupReqId, SyncManager, }; -use crate::sync::network_context::custody::CustodyRequester; use crate::sync::sampling::{SamplingConfig, SamplingRequester}; use crate::sync::{SamplingId, SyncMessage}; use crate::NetworkMessage; @@ -612,11 +611,7 @@ impl TestRig { let lookup_id = if let DataColumnsByRootRequester::Custody(id) = sampling_ids.first().unwrap().0.requester { - if let CustodyRequester::Lookup(id) = id.id { - id.lookup_id - } else { - panic!("not a lookup requester"); - } + id.id.0.lookup_id } else { panic!("not a custody requester") }; diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 4565e389901..ca6460f4305 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -6,7 +6,7 @@ use std::{ collections::{HashMap, VecDeque}, sync::Arc, }; -use types::{BlobSidecar, ColumnIndex, EthSpec, Hash256, SignedBeaconBlock}; +use types::{BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock}; #[derive(Debug)] pub struct RangeBlockComponentsRequest { @@ -14,7 +14,7 @@ pub struct RangeBlockComponentsRequest { blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. blobs: VecDeque>>, - custody_columns: VecDeque>, + data_columns: VecDeque>>, /// Whether the individual RPC request for blocks is finished or not. is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. @@ -30,7 +30,7 @@ impl RangeBlockComponentsRequest { Self { blocks: <_>::default(), blobs: <_>::default(), - custody_columns: <_>::default(), + data_columns: <_>::default(), is_blocks_stream_terminated: false, is_sidecars_stream_terminated: false, custody_columns_streams_terminated: 0, @@ -59,9 +59,9 @@ impl RangeBlockComponentsRequest { } } - pub fn add_custody_column(&mut self, column_opt: Option>) { + pub fn add_data_column(&mut self, column_opt: Option>>) { match column_opt { - Some(column) => self.custody_columns.push_back(column), + Some(column) => self.data_columns.push_back(column), // TODO(das): this mechanism is dangerous, if somehow there are two requests for the // same column index it can terminate early. This struct should track that all requests // for all custody columns terminate. @@ -126,18 +126,18 @@ impl RangeBlockComponentsRequest { ) -> Result>, String> { let RangeBlockComponentsRequest { blocks, - custody_columns, + data_columns, .. } = self; // Group data columns by block_root and index - let mut custody_columns_by_block = - HashMap::>>::new(); + let mut data_columns_by_block = + HashMap::>>>::new(); - for column in custody_columns { - let block_root = column.as_data_column().block_root(); - let index = column.index(); - if custody_columns_by_block + for column in data_columns { + let block_root = column.block_root(); + let index = column.index; + if data_columns_by_block .entry(block_root) .or_default() .insert(index, column) @@ -156,8 +156,7 @@ impl RangeBlockComponentsRequest { for block in blocks { let block_root = get_block_root(&block); rpc_blocks.push(if block.num_expected_blobs() > 0 { - let Some(mut custody_columns_by_index) = - custody_columns_by_block.remove(&block_root) + let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) else { // This PR ignores the fix from https://github.com/sigp/lighthouse/pull/5675 // which allows blobs to not match blocks. @@ -169,15 +168,18 @@ impl RangeBlockComponentsRequest { let mut custody_columns = vec![]; for index in &expects_custody_columns { - let Some(custody_column) = custody_columns_by_index.remove(index) else { + let Some(data_column) = data_columns_by_index.remove(index) else { return Err(format!("No column for block {block_root:?} index {index}")); }; - custody_columns.push(custody_column); + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } // Assert that there are no columns left - if !custody_columns_by_index.is_empty() { - let remaining_indices = custody_columns_by_index.keys().collect::>(); + if !data_columns_by_index.is_empty() { + let remaining_indices = data_columns_by_index.keys().collect::>(); return Err(format!( "Not all columns consumed for block {block_root:?}: {remaining_indices:?}" )); @@ -191,8 +193,8 @@ impl RangeBlockComponentsRequest { } // Assert that there are no columns left for other blocks - if !custody_columns_by_block.is_empty() { - let remaining_roots = custody_columns_by_block.keys().collect::>(); + if !data_columns_by_block.is_empty() { + let remaining_roots = data_columns_by_block.keys().collect::>(); return Err(format!("Not all columns consumed: {remaining_roots:?}")); } @@ -218,11 +220,8 @@ impl RangeBlockComponentsRequest { #[cfg(test)] mod tests { use super::RangeBlockComponentsRequest; - use beacon_chain::{ - data_column_verification::CustodyDataColumn, - test_utils::{ - generate_rand_block_and_blobs, generate_rand_block_and_data_columns, NumBlobs, - }, + use beacon_chain::test_utils::{ + generate_rand_block_and_blobs, generate_rand_block_and_data_columns, NumBlobs, }; use rand::SeedableRng; use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; @@ -300,16 +299,14 @@ mod tests { for block in &blocks { for column in &block.1 { if expects_custody_columns.contains(&column.index) { - info.add_custody_column(Some(CustodyDataColumn::from_asserted_custody( - column.clone().into(), - ))); + info.add_data_column(Some(column.clone().into())); } } } // Terminate the requests for (i, _column_index) in expects_custody_columns.iter().enumerate() { - info.add_custody_column(None); + info.add_data_column(None); if i < expects_custody_columns.len() - 1 { assert!( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 797f57a0a6b..7b8d7850a71 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; use super::network_context::{ - custody::CustodyRequester, BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext, + BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext, }; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -94,9 +94,7 @@ pub enum RequestId { /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndBlobs { id: Id }, - /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndDataColumns { id: Id }, + RangeBlockComponents(Id), } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -397,30 +395,7 @@ impl SyncManager { RequestId::DataColumnsByRoot(id) => { self.on_single_data_column_response(id, peer_id, RpcEvent::RPCError(error)) } - RequestId::RangeBlockAndBlobs { id } => { - if let Some(sender_id) = self.network.range_request_failed(id) { - match sender_id { - RangeRequestId::RangeSync { chain_id, batch_id } => { - self.range_sync.inject_error( - &mut self.network, - peer_id, - batch_id, - chain_id, - id, - ); - self.update_sync_state(); - } - RangeRequestId::BackfillSync { batch_id } => match self - .backfill_sync - .inject_error(&mut self.network, batch_id, &peer_id, id) - { - Ok(_) => {} - Err(_) => self.update_sync_state(), - }, - } - } - } - RequestId::RangeBlockAndDataColumns { id } => { + RequestId::RangeBlockComponents(id) => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { RangeRequestId::RangeSync { chain_id, batch_id } => { @@ -898,7 +873,7 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - RequestId::RangeBlockAndBlobs { id } => { + RequestId::RangeBlockComponents(id) => { self.range_block_and_blobs_response(id, peer_id, block.into()) } other => { @@ -941,7 +916,7 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - RequestId::RangeBlockAndBlobs { id } => { + RequestId::RangeBlockComponents(id) => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } other => { @@ -971,11 +946,12 @@ impl SyncManager { }, ); } - RequestId::RangeBlockAndBlobs { id } => { - todo!("TODO(das): handle sampling for range sync based on {id}"); - } - RequestId::RangeBlockAndDataColumns { id } => { - todo!("TODO(das): handle sampling for range sync based on {id}"); + RequestId::RangeBlockComponents(id) => { + self.range_block_and_blobs_response( + id, + peer_id, + BlockOrBlob::CustodyColumns(data_column), + ); } } } @@ -1023,22 +999,14 @@ impl SyncManager { { // TODO(das): get proper timestamp let seen_timestamp = timestamp_now(); - match requester { - CustodyRequester::Lookup(id) => self - .block_lookups - .on_download_response::>( - id.lookup_id, - custody_columns.map(|(columns, peer_group)| { - (columns, peer_group, seen_timestamp) - }), - &mut self.network, - ), - CustodyRequester::RangeSync(_) => { - // TODO(das): this line should be unreachable, no mechanism to make - // custody requests for sync yet - todo!("custody fetching for sync not implemented"); - } - } + self.block_lookups + .on_download_response::>( + requester.0.lookup_id, + custody_columns.map(|(columns, peer_group)| { + (columns, peer_group, seen_timestamp) + }), + &mut self.network, + ); } } } @@ -1149,7 +1117,7 @@ impl SyncManager { "sender_id" => ?resp.sender_id, "error" => e.clone() ); - let id = RequestId::RangeBlockAndBlobs { id }; + let id = RequestId::RangeBlockComponents(id); self.network.report_peer( peer_id, PeerAction::MidToleranceError, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d2abfc8d9ed..bdd6ca241fd 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -150,7 +150,7 @@ pub struct SyncNetworkContext { pub enum BlockOrBlob { Block(Option>>), Blob(Option>>), - CustodyColumns(Option>), + CustodyColumns(Option>>), } impl From>>> for BlockOrBlob { @@ -275,7 +275,7 @@ impl SyncNetworkContext { self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRange(request.clone()), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), })?; let expected_blobs = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { @@ -295,7 +295,7 @@ impl SyncNetworkContext { start_slot: *request.start_slot(), count: *request.count(), }), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), })?; true } else { @@ -335,7 +335,7 @@ impl SyncNetworkContext { count: *request.count(), columns: vec![*column_index], }), - request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), })?; } @@ -382,7 +382,7 @@ impl SyncNetworkContext { match block_or_blob { BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), - BlockOrBlob::CustodyColumns(column) => info.add_custody_column(column), + BlockOrBlob::CustodyColumns(column) => info.add_data_column(column), } if info.is_finished() { // If the request is finished, dequeue everything @@ -638,7 +638,7 @@ impl SyncNetworkContext { "id" => ?id ); - let requester = CustodyRequester::Lookup(id); + let requester = CustodyRequester(id); let mut request = ActiveCustodyRequest::new( block_root, requester, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 7e080c5bf64..f20a95415db 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -1,4 +1,4 @@ -use crate::sync::manager::{Id, SingleLookupReqId}; +use crate::sync::manager::SingleLookupReqId; use self::request::ActiveColumnSampleRequest; use beacon_chain::data_column_verification::CustodyDataColumn; @@ -17,11 +17,10 @@ pub struct CustodyId { pub column_index: ColumnIndex, } +/// Downstream components that perform custody by root requests. +/// Currently, it's only single block lookups, so not using an enum #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub enum CustodyRequester { - Lookup(SingleLookupReqId), - RangeSync(Id), -} +pub struct CustodyRequester(pub SingleLookupReqId); type DataColumnSidecarList = Vec>>; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index fe48db35b45..eaf5e5f6b1b 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -554,7 +554,7 @@ mod tests { ) -> (ChainId, BatchId, Id) { if blob_req_opt.is_some() { match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlockComponents(id)) => { let _ = self .cx .range_block_and_blob_response(id, BlockOrBlob::Block(None)); @@ -570,7 +570,7 @@ mod tests { } } else { match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlockComponents(id)) => { let response = self .cx .range_block_and_blob_response(id, BlockOrBlob::Block(None))