diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index cb85d7ccef6..fad4c2c67d4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1184,42 +1184,23 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } - pub fn get_data_columns_checking_early_attester_cache( - &self, - block_root: &Hash256, - ) -> Result, Error> { - self.early_attester_cache - .get_data_columns(*block_root) - .map_or_else(|| self.get_data_columns(block_root), Ok) - } - - pub fn get_selected_data_columns_checking_all_caches( + pub fn get_data_column_checking_all_caches( &self, block_root: Hash256, - indices: &[ColumnIndex], - ) -> Result>>, Error> { - let columns_from_availability_cache = indices - .iter() - .copied() - .filter_map(|index| { - self.data_availability_checker - .get_data_column(&DataColumnIdentifier { block_root, index }) - .transpose() - }) - .collect::, _>>()?; - // Existence of a column in the data availability cache and downstream caches is exclusive. - // If there's a single match in the availability cache we can safely skip other sources. - if !columns_from_availability_cache.is_empty() { - return Ok(columns_from_availability_cache); + index: ColumnIndex, + ) -> Result>>, Error> { + if let Some(column) = self + .data_availability_checker + .get_data_column(&DataColumnIdentifier { block_root, index })? + { + return Ok(Some(column)); } - Ok(self - .early_attester_cache - .get_data_columns(block_root) - .map_or_else(|| self.get_data_columns(&block_root), Ok)? - .into_iter() - .filter(|dc| indices.contains(&dc.index)) - .collect()) + if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) { + return Ok(columns.iter().find(|c| c.index == index).cloned()); + } + + self.get_data_column(&block_root, &index) } /// Returns the import status of block checking (in order) pre-import caches, fork-choice, db store @@ -1332,14 +1313,12 @@ impl BeaconChain { /// /// ## Errors /// May return a database error. - pub fn get_data_columns( + pub fn get_data_column( &self, block_root: &Hash256, - ) -> Result, Error> { - match self.store.get_data_columns(block_root)? { - Some(data_columns) => Ok(data_columns), - None => Ok(RuntimeVariableList::empty(self.spec.number_of_columns)), - } + column_index: &ColumnIndex, + ) -> Result>>, Error> { + Ok(self.store.get_data_column(block_root, column_index)?) } pub fn get_blinded_block( diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 947fddbcc77..f63cb1f45f0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -15,7 +15,7 @@ use std::time::Duration; use task_executor::TaskExecutor; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, + BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarVec, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; @@ -309,16 +309,11 @@ impl DataAvailabilityChecker { block, blobs: None, blobs_available_timestamp: None, - // TODO(das): update store type to prevent this conversion data_columns: Some( - RuntimeVariableList::new( - data_column_list - .into_iter() - .map(|d| d.clone_arc()) - .collect(), - self.spec.number_of_columns, - ) - .expect("data column list is within bounds"), + data_column_list + .into_iter() + .map(|d| d.clone_arc()) + .collect(), ), spec: self.spec.clone(), })) @@ -409,13 +404,8 @@ impl DataAvailabilityChecker { block, blobs: None, blobs_available_timestamp: None, - // TODO(das): update store type to prevent this conversion data_columns: data_columns.map(|data_columns| { - RuntimeVariableList::new( - data_columns.into_iter().map(|d| d.into_inner()).collect(), - self.spec.number_of_columns, - ) - .expect("data column list is within bounds") + data_columns.into_iter().map(|d| d.into_inner()).collect() }), spec: self.spec.clone(), }) @@ -605,7 +595,7 @@ pub struct AvailableBlock { blobs: Option>, /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). blobs_available_timestamp: Option, - data_columns: Option>, + data_columns: Option>, pub spec: Arc, } @@ -614,7 +604,7 @@ impl AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, - data_columns: Option>, + data_columns: Option>, spec: Arc, ) -> Self { Self { @@ -643,7 +633,7 @@ impl AvailableBlock { self.blobs_available_timestamp } - pub fn data_columns(&self) -> Option<&DataColumnSidecarList> { + pub fn data_columns(&self) -> Option<&DataColumnSidecarVec> { self.data_columns.as_ref() } @@ -654,7 +644,7 @@ impl AvailableBlock { Hash256, Arc>, Option>, - Option>, + Option>, ) { let AvailableBlock { block_root, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index bd058743d31..5614310f3a3 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -349,16 +349,11 @@ impl PendingComponents { block, blobs, blobs_available_timestamp, - // TODO(das): Update store types to prevent this conversion data_columns: Some( - RuntimeVariableList::new( - verified_data_columns - .into_iter() - .map(|d| d.into_inner()) - .collect(), - spec.number_of_columns, - ) - .expect("data column list is within bounds"), + verified_data_columns + .into_iter() + .map(|d| d.into_inner()) + .collect(), ), spec: spec.clone(), }; diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 119f41122bf..0bdd2daa0d5 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -22,7 +22,7 @@ pub struct CacheItem { */ block: Arc>, blobs: Option>, - data_columns: Option>, + data_columns: Option>, proto_block: ProtoBlock, } @@ -169,7 +169,7 @@ impl EarlyAttesterCache { } /// Returns the data columns, if `block_root` matches the cached item. - pub fn get_data_columns(&self, block_root: Hash256) -> Option> { + pub fn get_data_columns(&self, block_root: Hash256) -> Option> { self.item .read() .as_ref() diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index d5685286194..9424deb2a70 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2593,9 +2593,7 @@ pub fn generate_rand_block_and_data_columns( ) { let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng); let blob: BlobsList = blobs.into_iter().map(|b| b.blob).collect::>().into(); - let data_columns = DataColumnSidecar::build_sidecars(&blob, &block, &KZG, spec) - .unwrap() - .into(); + let data_columns = DataColumnSidecar::build_sidecars(&blob, &block, &KZG, spec).unwrap(); (block, data_columns) } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index f2b68233168..3822b36b3d9 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -20,10 +20,9 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarList, + AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarVec, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, - FullPayloadBellatrix, Hash256, RuntimeVariableList, SignedBeaconBlock, - SignedBlindedBeaconBlock, VariableList, + FullPayloadBellatrix, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, VariableList, }; use warp::http::StatusCode; use warp::{reply::Response, Rejection, Reply}; @@ -78,7 +77,7 @@ pub async fn publish_block>, blobs_opt: Option>, - data_cols_opt: Option>, + data_cols_opt: Option>, sender, log, seen_timestamp| { @@ -204,11 +203,10 @@ pub async fn publish_block>(); - RuntimeVariableList::from_vec(data_columns, chain.spec.number_of_columns) + .collect::>() }); let block_root = block_root.unwrap_or(gossip_verified_block.block_root); diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index fe69786b522..815a353bb15 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -14,13 +14,12 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; -use std::collections::HashSet; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; -use types::{ColumnIndex, Epoch, EthSpec, ForkName, Hash256, Slot}; +use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -332,13 +331,13 @@ impl NetworkBeaconProcessor { let column_indexes_by_block = request.group_by_ordered_block_root(); let mut send_data_column_count = 0; - for (block_root, column_ids) in column_indexes_by_block.iter() { - match self - .chain - .get_selected_data_columns_checking_all_caches(*block_root, column_ids) - { - Ok(data_columns) => { - for data_column in data_columns { + for (block_root, column_indices) in column_indexes_by_block.iter() { + for index in column_indices { + match self + .chain + .get_data_column_checking_all_caches(*block_root, *index) + { + Ok(Some(data_column)) => { send_data_column_count += 1; self.send_response( peer_id, @@ -346,21 +345,22 @@ impl NetworkBeaconProcessor { request_id, ); } - } - Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - // TODO(das): leak error details to ease debugging - format!("{:?}", e).to_string(), - request_id, - ); - error!(self.log, "Error getting data column"; - "block_root" => ?block_root, - "peer" => %peer_id, - "error" => ?e - ); - return; + Ok(None) => {} // no-op + Err(e) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + // TODO(das): leak error details to ease debugging + format!("{:?}", e).to_string(), + request_id, + ); + error!(self.log, "Error getting data column"; + "block_root" => ?block_root, + "peer" => %peer_id, + "error" => ?e + ); + return; + } } } } @@ -1077,40 +1077,36 @@ impl NetworkBeaconProcessor { // remove all skip slots let block_roots = block_roots.into_iter().flatten(); - let mut data_columns_sent = 0; - let requested_column_indices = - HashSet::::from_iter(req.columns.iter().copied()); for root in block_roots { - match self.chain.get_data_columns(&root) { - Ok(data_column_sidecar_list) => { - for data_column_sidecar in data_column_sidecar_list.iter() { - if requested_column_indices.contains(&data_column_sidecar.index) { - data_columns_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::DataColumnsByRange(Some( - data_column_sidecar.clone(), - )), - id: request_id, - }); - } + for index in &req.columns { + match self.chain.get_data_column(&root, index) { + Ok(Some(data_column_sidecar)) => { + data_columns_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::DataColumnsByRange(Some( + data_column_sidecar.clone(), + )), + id: request_id, + }); + } + Ok(None) => {} // no-op + Err(e) => { + error!( + self.log, + "Error fetching data columns block root"; + "request" => ?req, + "peer" => %peer_id, + "block_root" => ?root, + "error" => ?e + ); + return Err(( + RPCResponseErrorCode::ServerError, + "No data columns and failed fetching corresponding block", + )); } - } - Err(e) => { - error!( - self.log, - "Error fetching data columns block root"; - "request" => ?req, - "peer" => %peer_id, - "block_root" => ?root, - "error" => ?e - ); - return Err(( - RPCResponseErrorCode::ServerError, - "No data columns and failed fetching corresponding block", - )); } } } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index ef0927d6d4b..cc0fcec532a 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -30,7 +30,7 @@ const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5; #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct CustodyRequester(pub SingleLookupReqId); -type DataColumnSidecarList = Vec>>; +type DataColumnSidecarVec = Vec>>; pub struct ActiveCustodyRequest { block_root: Hash256, @@ -107,7 +107,7 @@ impl ActiveCustodyRequest { &mut self, peer_id: PeerId, req_id: DataColumnsByRootRequestId, - resp: RpcResponseResult>, + resp: RpcResponseResult>, cx: &mut SyncNetworkContext, ) -> CustodyRequestResult { // TODO(das): Should downscore peers for verify errors here diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 17359c47a7d..3fb21489d91 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -25,7 +25,7 @@ pub enum SamplingRequester { ImportedBlock(Hash256), } -type DataColumnSidecarList = Vec>>; +type DataColumnSidecarVec = Vec>>; pub struct Sampling { // TODO(das): stalled sampling request are never cleaned up @@ -102,7 +102,7 @@ impl Sampling { &mut self, id: SamplingId, peer_id: PeerId, - resp: Result<(DataColumnSidecarList, Duration), RpcResponseError>, + resp: Result<(DataColumnSidecarVec, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Option<(SamplingRequester, SamplingResult)> { let Some(request) = self.requests.get_mut(&id.id) else { @@ -237,7 +237,7 @@ impl ActiveSamplingRequest { &mut self, _peer_id: PeerId, column_index: ColumnIndex, - resp: Result<(DataColumnSidecarList, Duration), RpcResponseError>, + resp: Result<(DataColumnSidecarVec, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Result, SamplingError> { // Select columns to sample diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 930a7b84632..fbcbbb52aa0 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -17,12 +17,12 @@ use crate::metadata::{ DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, }; -use crate::metrics; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ - get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, - PartialBeaconState, StoreItem, StoreOp, + get_data_column_key, get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, + KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, }; +use crate::{metrics, parse_data_column_key}; use itertools::process_results; use leveldb::iterator::LevelDBIterator; use lru::LruCache; @@ -36,6 +36,7 @@ use state_processing::{ SlotProcessingError, }; use std::cmp::min; +use std::collections::HashMap; use std::marker::PhantomData; use std::num::NonZeroUsize; use std::path::Path; @@ -89,7 +90,7 @@ pub struct HotColdDB, Cold: ItemStore> { struct BlockCache { block_cache: LruCache>, blob_cache: LruCache>, - data_column_cache: LruCache>, + data_column_cache: LruCache>>>, } impl BlockCache { @@ -106,12 +107,10 @@ impl BlockCache { pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { self.blob_cache.put(block_root, blobs); } - pub fn put_data_columns( - &mut self, - block_root: Hash256, - data_columns: DataColumnSidecarList, - ) { - self.data_column_cache.put(block_root, data_columns); + pub fn put_data_column(&mut self, block_root: Hash256, data_column: Arc>) { + self.data_column_cache + .get_or_insert_mut(block_root, Default::default) + .insert(data_column.index, data_column); } pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) @@ -119,11 +118,14 @@ impl BlockCache { pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { self.blob_cache.get(block_root) } - pub fn get_data_columns<'a>( + pub fn get_data_column<'a>( &'a mut self, block_root: &Hash256, - ) -> Option<&'a DataColumnSidecarList> { - self.data_column_cache.get(block_root) + column_index: &ColumnIndex, + ) -> Option<&'a Arc>> { + self.data_column_cache + .get(block_root) + .and_then(|map| map.get(column_index)) } pub fn delete_block(&mut self, block_root: &Hash256) { let _ = self.block_cache.pop(block_root); @@ -672,15 +674,20 @@ impl, Cold: ItemStore> HotColdDB pub fn data_columns_as_kv_store_ops( &self, - key: &Hash256, - data_columns: DataColumnSidecarList, + block_root: &Hash256, + data_columns: DataColumnSidecarVec, ops: &mut Vec, ) { - let db_key = get_key_for_col(DBColumn::BeaconDataColumn.into(), key.as_bytes()); - ops.push(KeyValueStoreOp::PutKeyValue( - db_key, - data_columns.as_ssz_bytes(), - )); + for data_column in data_columns { + let db_key = get_key_for_col( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(block_root, &data_column.index), + ); + ops.push(KeyValueStoreOp::PutKeyValue( + db_key, + data_column.as_ssz_bytes(), + )); + } } pub fn put_state_summary( @@ -998,10 +1005,14 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } - StoreOp::DeleteDataColumns(block_root) => { - let key = - get_key_for_col(DBColumn::BeaconDataColumn.into(), block_root.as_bytes()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + StoreOp::DeleteDataColumns(block_root, column_indices) => { + for index in column_indices { + let key = get_key_for_col( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(&block_root, &index), + ); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } } StoreOp::DeleteState(state_root, slot) => { @@ -1054,9 +1065,19 @@ impl, Cold: ItemStore> HotColdDB } true } - StoreOp::DeleteDataColumns(block_root) => { - match self.get_data_columns(block_root) { - Ok(Some(data_column_sidecar_list)) => { + StoreOp::DeleteDataColumns(block_root, indices) => { + match indices + .iter() + .map(|index| self.get_data_column(block_root, index)) + .collect::, _>>() + { + Ok(data_column_sidecar_list_opt) => { + let data_column_sidecar_list = data_column_sidecar_list_opt + .into_iter() + .flatten() + .collect::>(); + // Must push the same number of items as StoreOp::DeleteDataColumns items to + // prevent a `HotColdDBError::Rollback` error below in case of rollback data_columns_to_delete.push((*block_root, data_column_sidecar_list)); } Err(e) => { @@ -1066,7 +1087,6 @@ impl, Cold: ItemStore> HotColdDB "error" => ?e ); } - _ => (), } true } @@ -1101,14 +1121,15 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops.iter_mut() { let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), - StoreOp::PutDataColumns(block_root, _) => { - StoreOp::DeleteDataColumns(*block_root) + StoreOp::PutDataColumns(block_root, data_columns) => { + let indices = data_columns.iter().map(|c| c.index).collect(); + StoreOp::DeleteDataColumns(*block_root, indices) } StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, - StoreOp::DeleteDataColumns(_) => match data_columns_to_delete.pop() { + StoreOp::DeleteDataColumns(_, _) => match data_columns_to_delete.pop() { Some((block_root, data_columns)) => { StoreOp::PutDataColumns(block_root, data_columns) } @@ -1152,7 +1173,7 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteBlobs(_) => (), - StoreOp::DeleteDataColumns(_) => (), + StoreOp::DeleteDataColumns(_, _) => (), StoreOp::DeleteExecutionPayload(_) => (), @@ -1653,30 +1674,40 @@ impl, Cold: ItemStore> HotColdDB } } - /// Fetch data_columns for a given block from the store. - pub fn get_data_columns( + /// Fetch all keys in the data_column column with prefix `block_root` + pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { + self.blobs_db + .iter_raw_keys(DBColumn::BeaconDataColumn, block_root.as_bytes()) + .map(|key| key.and_then(|key| parse_data_column_key(key).map(|key| key.1))) + .collect() + } + + /// Fetch a single data_column for a given block from the store. + pub fn get_data_column( &self, block_root: &Hash256, - ) -> Result>, Error> { + column_index: &ColumnIndex, + ) -> Result>>, Error> { // Check the cache. - if let Some(data_columns) = self.block_cache.lock().get_data_columns(block_root) { + if let Some(data_column) = self + .block_cache + .lock() + .get_data_column(block_root, column_index) + { metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT); - return Ok(Some(data_columns.clone())); + return Ok(Some(data_column.clone())); } - match self - .blobs_db - .get_bytes(DBColumn::BeaconDataColumn.into(), block_root.as_bytes())? - { - Some(ref data_columns_bytes) => { - let data_columns = RuntimeVariableList::from_ssz_bytes( - data_columns_bytes, - self.spec.number_of_columns, - )?; + match self.blobs_db.get_bytes( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(block_root, column_index), + )? { + Some(ref data_column_bytes) => { + let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?); self.block_cache .lock() - .put_data_columns(*block_root, data_columns.clone()); - Ok(Some(data_columns)) + .put_data_column(*block_root, data_column.clone()); + Ok(Some(data_column)) } None => Ok(None), } @@ -2481,15 +2512,33 @@ impl, Cold: ItemStore> HotColdDB } }; - if Some(block_root) != last_pruned_block_root && self.blobs_exist(&block_root)? { - trace!( - self.log, - "Pruning blobs of block"; - "slot" => slot, - "block_root" => ?block_root, - ); - last_pruned_block_root = Some(block_root); - ops.push(StoreOp::DeleteBlobs(block_root)); + if Some(block_root) != last_pruned_block_root { + if self + .spec + .is_peer_das_enabled_for_epoch(slot.epoch(E::slots_per_epoch())) + { + // data columns + let indices = self.get_data_column_keys(block_root)?; + if !indices.is_empty() { + trace!( + self.log, + "Pruning data columns of block"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteDataColumns(block_root, indices)); + } + } else if self.blobs_exist(&block_root)? { + trace!( + self.log, + "Pruning blobs of block"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteBlobs(block_root)); + } } if slot >= end_slot { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index ffd55c16a04..b224319ae4f 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -255,6 +255,10 @@ impl db_key::Key for BytesKey { } impl BytesKey { + pub fn starts_with(&self, prefix: &Self) -> bool { + self.key.starts_with(&prefix.key) + } + /// Return `true` iff this `BytesKey` was created with the given `column`. pub fn matches_column(&self, column: DBColumn) -> bool { self.key.starts_with(column.as_bytes()) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 2b648d71dbc..10b3d7a3a65 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -112,9 +112,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { Box::new(std::iter::empty()) } - fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter { - Box::new(std::iter::empty()) - } + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter; /// Iterate through all keys in a particular column. fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; @@ -146,6 +144,28 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { result } +pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> Vec { + let mut result = block_root.as_bytes().to_vec(); + result.extend_from_slice(&column_index.to_le_bytes()); + result +} + +pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Error> { + if data.len() != 32 + 8 { + return Err(Error::InvalidKey); + } + // split_at panics if 32 < 40 which will never happen after the length check above + let (block_root_bytes, column_index_bytes) = data.split_at(32); + let block_root = Hash256::from_slice(block_root_bytes); + // column_index_bytes is asserted to be 8 bytes after the length check above + let column_index = ColumnIndex::from_le_bytes( + column_index_bytes + .try_into() + .expect("slice with incorrect length"), + ); + Ok((block_root, column_index)) +} + #[must_use] #[derive(Clone)] pub enum KeyValueStoreOp { @@ -206,13 +226,13 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), PutBlobs(Hash256, BlobSidecarList), - PutDataColumns(Hash256, DataColumnSidecarList), + PutDataColumns(Hash256, DataColumnSidecarVec), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteBlobs(Hash256), - DeleteDataColumns(Hash256), + DeleteDataColumns(Hash256, Vec), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), KeyValueOp(KeyValueStoreOp), @@ -301,7 +321,6 @@ impl DBColumn { | Self::BeaconBlock | Self::BeaconState | Self::BeaconBlob - | Self::BeaconDataColumn | Self::BeaconStateSummary | Self::BeaconStateTemporary | Self::ExecPayload @@ -318,6 +337,7 @@ impl DBColumn { | Self::BeaconHistoricalRoots | Self::BeaconHistoricalSummaries | Self::BeaconRandaoMixes => 8, + Self::BeaconDataColumn => 32 + 8, } } } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 302d2c2add2..a7c2bd2c575 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,3 +1,4 @@ +use crate::RawKeyIter; use crate::{ get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp, @@ -100,6 +101,18 @@ impl KeyValueStore for MemoryStore { })) } + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix)); + let keys = self + .db + .read() + .range(start_key.clone()..) + .take_while(|(k, _)| k.starts_with(&start_key)) + .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) + .collect::>(); + Box::new(keys.into_iter().map(Ok)) + } + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 74f9a9b4223..bbed920259c 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -1,8 +1,8 @@ use crate::beacon_block_body::{KzgCommitments, BLOB_KZG_COMMITMENTS_INDEX}; use crate::test_utils::TestRandom; use crate::{ - BeaconBlockHeader, ChainSpec, EthSpec, Hash256, KzgProofs, RuntimeVariableList, - SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockHeader, ChainSpec, EthSpec, Hash256, KzgProofs, SignedBeaconBlock, + SignedBeaconBlockHeader, Slot, }; use crate::{BeaconStateError, BlobsList}; use bls::Signature; @@ -41,7 +41,7 @@ pub struct DataColumnIdentifier { pub index: ColumnIndex, } -pub type DataColumnSidecarList = RuntimeVariableList>>; +pub type DataColumnSidecarVec = Vec>>; #[derive( Debug, @@ -106,10 +106,10 @@ impl DataColumnSidecar { block: &SignedBeaconBlock, kzg: &Kzg, spec: &ChainSpec, - ) -> Result, DataColumnSidecarError> { + ) -> Result, DataColumnSidecarError> { let number_of_columns = spec.number_of_columns; if blobs.is_empty() { - return Ok(RuntimeVariableList::empty(number_of_columns)); + return Ok(vec![]); } let kzg_commitments = block .message() @@ -189,7 +189,6 @@ impl DataColumnSidecar { }) }) .collect(); - let sidecars = RuntimeVariableList::from_vec(sidecars, number_of_columns); Ok(sidecars) } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 79abe7af763..bdddb7226ff 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -144,7 +144,7 @@ pub use crate::config_and_preset::{ pub use crate::consolidation::Consolidation; pub use crate::contribution_and_proof::ContributionAndProof; pub use crate::data_column_sidecar::{ - ColumnIndex, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, + ColumnIndex, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarVec, }; pub use crate::data_column_subnet_id::DataColumnSubnetId; pub use crate::deposit::{Deposit, DEPOSIT_TREE_DEPTH};