Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store data columns individually in store and caches #5890

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 17 additions & 38 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,42 +1184,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_columns_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
self.early_attester_cache
.get_data_columns(*block_root)
.map_or_else(|| self.get_data_columns(block_root), Ok)
}

pub fn get_selected_data_columns_checking_all_caches(
pub fn get_data_column_checking_all_caches(
&self,
block_root: Hash256,
indices: &[ColumnIndex],
) -> Result<Vec<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
let columns_from_availability_cache = indices
.iter()
.copied()
.filter_map(|index| {
self.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })
.transpose()
})
.collect::<Result<Vec<_>, _>>()?;
// Existence of a column in the data availability cache and downstream caches is exclusive.
// If there's a single match in the availability cache we can safely skip other sources.
if !columns_from_availability_cache.is_empty() {
return Ok(columns_from_availability_cache);
index: ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
if let Some(column) = self
.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })?
{
return Ok(Some(column));
}

Ok(self
.early_attester_cache
.get_data_columns(block_root)
.map_or_else(|| self.get_data_columns(&block_root), Ok)?
.into_iter()
.filter(|dc| indices.contains(&dc.index))
.collect())
if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) {
return Ok(columns.iter().find(|c| c.index == index).cloned());
}

self.get_data_column(&block_root, &index)
}

/// Returns the import status of block checking (in order) pre-import caches, fork-choice, db store
Expand Down Expand Up @@ -1332,14 +1313,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// ## Errors
/// May return a database error.
pub fn get_data_columns(
pub fn get_data_column(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
match self.store.get_data_columns(block_root)? {
Some(data_columns) => Ok(data_columns),
None => Ok(RuntimeVariableList::empty(self.spec.number_of_columns)),
}
column_index: &ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
Ok(self.store.get_data_column(block_root, column_index)?)
}

pub fn get_blinded_block(
Expand Down
30 changes: 10 additions & 20 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarVec, Epoch, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlock,
};

Expand Down Expand Up @@ -309,16 +309,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block,
blobs: None,
blobs_available_timestamp: None,
// TODO(das): update store type to prevent this conversion
data_columns: Some(
RuntimeVariableList::new(
data_column_list
.into_iter()
.map(|d| d.clone_arc())
.collect(),
self.spec.number_of_columns,
)
.expect("data column list is within bounds"),
data_column_list
.into_iter()
.map(|d| d.clone_arc())
.collect(),
),
spec: self.spec.clone(),
}))
Expand Down Expand Up @@ -409,13 +404,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block,
blobs: None,
blobs_available_timestamp: None,
// TODO(das): update store type to prevent this conversion
data_columns: data_columns.map(|data_columns| {
RuntimeVariableList::new(
data_columns.into_iter().map(|d| d.into_inner()).collect(),
self.spec.number_of_columns,
)
.expect("data column list is within bounds")
data_columns.into_iter().map(|d| d.into_inner()).collect()
}),
spec: self.spec.clone(),
})
Expand Down Expand Up @@ -605,7 +595,7 @@ pub struct AvailableBlock<E: EthSpec> {
blobs: Option<BlobSidecarList<E>>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
blobs_available_timestamp: Option<Duration>,
data_columns: Option<DataColumnSidecarList<E>>,
data_columns: Option<DataColumnSidecarVec<E>>,
pub spec: Arc<ChainSpec>,
}

Expand All @@ -614,7 +604,7 @@ impl<E: EthSpec> AvailableBlock<E> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
data_columns: Option<DataColumnSidecarVec<E>>,
spec: Arc<ChainSpec>,
) -> Self {
Self {
Expand Down Expand Up @@ -643,7 +633,7 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs_available_timestamp
}

pub fn data_columns(&self) -> Option<&DataColumnSidecarList<E>> {
pub fn data_columns(&self) -> Option<&DataColumnSidecarVec<E>> {
self.data_columns.as_ref()
}

Expand All @@ -654,7 +644,7 @@ impl<E: EthSpec> AvailableBlock<E> {
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
Option<DataColumnSidecarVec<E>>,
) {
let AvailableBlock {
block_root,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,11 @@ impl<E: EthSpec> PendingComponents<E> {
block,
blobs,
blobs_available_timestamp,
// TODO(das): Update store types to prevent this conversion
data_columns: Some(
RuntimeVariableList::new(
verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect(),
spec.number_of_columns,
)
.expect("data column list is within bounds"),
verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect(),
),
spec: spec.clone(),
};
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/early_attester_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct CacheItem<E: EthSpec> {
*/
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
data_columns: Option<DataColumnSidecarVec<E>>,
proto_block: ProtoBlock,
}

Expand Down Expand Up @@ -169,7 +169,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
}

/// Returns the data columns, if `block_root` matches the cached item.
pub fn get_data_columns(&self, block_root: Hash256) -> Option<DataColumnSidecarList<E>> {
pub fn get_data_columns(&self, block_root: Hash256) -> Option<DataColumnSidecarVec<E>> {
self.item
.read()
.as_ref()
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2593,9 +2593,7 @@ pub fn generate_rand_block_and_data_columns<E: EthSpec>(
) {
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
let blob: BlobsList<E> = blobs.into_iter().map(|b| b.blob).collect::<Vec<_>>().into();
let data_columns = DataColumnSidecar::build_sidecars(&blob, &block, &KZG, spec)
.unwrap()
.into();
let data_columns = DataColumnSidecar::build_sidecars(&blob, &block, &KZG, spec).unwrap();

(block, data_columns)
}
12 changes: 5 additions & 7 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarList,
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarVec,
DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload,
FullPayloadBellatrix, Hash256, RuntimeVariableList, SignedBeaconBlock,
SignedBlindedBeaconBlock, VariableList,
FullPayloadBellatrix, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, VariableList,
};
use warp::http::StatusCode;
use warp::{reply::Response, Rejection, Reply};
Expand Down Expand Up @@ -78,7 +77,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
/* actually publish a block */
let publish_block = move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
blobs_opt: Option<BlobSidecarList<T::EthSpec>>,
data_cols_opt: Option<DataColumnSidecarList<T::EthSpec>>,
data_cols_opt: Option<DataColumnSidecarVec<T::EthSpec>>,
sender,
log,
seen_timestamp| {
Expand Down Expand Up @@ -204,11 +203,10 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
let data_cols_opt = gossip_verified_data_columns
.as_ref()
.map(|gossip_verified_data_columns| {
let data_columns = gossip_verified_data_columns
gossip_verified_data_columns
.into_iter()
.map(|col| col.clone_data_column())
.collect::<Vec<_>>();
RuntimeVariableList::from_vec(data_columns, chain.spec.number_of_columns)
.collect::<Vec<_>>()
});

let block_root = block_root.unwrap_or(gossip_verified_block.block_root);
Expand Down
104 changes: 50 additions & 54 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::HashSet;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use types::blob_sidecar::BlobIdentifier;
use types::{ColumnIndex, Epoch, EthSpec, ForkName, Hash256, Slot};
use types::{Epoch, EthSpec, ForkName, Hash256, Slot};

impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/* Auxiliary functions */
Expand Down Expand Up @@ -332,35 +331,36 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let column_indexes_by_block = request.group_by_ordered_block_root();
let mut send_data_column_count = 0;

for (block_root, column_ids) in column_indexes_by_block.iter() {
match self
.chain
.get_selected_data_columns_checking_all_caches(*block_root, column_ids)
{
Ok(data_columns) => {
for data_column in data_columns {
for (block_root, column_indices) in column_indexes_by_block.iter() {
for index in column_indices {
match self
.chain
.get_data_column_checking_all_caches(*block_root, *index)
{
Ok(Some(data_column)) => {
send_data_column_count += 1;
self.send_response(
peer_id,
Response::DataColumnsByRoot(Some(data_column)),
request_id,
);
}
}
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
// TODO(das): leak error details to ease debugging
format!("{:?}", e).to_string(),
request_id,
);
error!(self.log, "Error getting data column";
"block_root" => ?block_root,
"peer" => %peer_id,
"error" => ?e
);
return;
Ok(None) => {} // no-op
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
// TODO(das): leak error details to ease debugging
format!("{:?}", e).to_string(),
request_id,
);
error!(self.log, "Error getting data column";
"block_root" => ?block_root,
"peer" => %peer_id,
"error" => ?e
);
return;
}
}
}
}
Expand Down Expand Up @@ -1077,40 +1077,36 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

// remove all skip slots
let block_roots = block_roots.into_iter().flatten();

let mut data_columns_sent = 0;
let requested_column_indices =
HashSet::<ColumnIndex>::from_iter(req.columns.iter().copied());

for root in block_roots {
match self.chain.get_data_columns(&root) {
Ok(data_column_sidecar_list) => {
for data_column_sidecar in data_column_sidecar_list.iter() {
if requested_column_indices.contains(&data_column_sidecar.index) {
data_columns_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::DataColumnsByRange(Some(
data_column_sidecar.clone(),
)),
id: request_id,
});
}
for index in &req.columns {
match self.chain.get_data_column(&root, index) {
Ok(Some(data_column_sidecar)) => {
data_columns_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::DataColumnsByRange(Some(
data_column_sidecar.clone(),
)),
id: request_id,
});
}
Ok(None) => {} // no-op
Err(e) => {
error!(
self.log,
"Error fetching data columns block root";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root,
"error" => ?e
);
return Err((
RPCResponseErrorCode::ServerError,
"No data columns and failed fetching corresponding block",
));
}
}
Err(e) => {
error!(
self.log,
"Error fetching data columns block root";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root,
"error" => ?e
);
return Err((
RPCResponseErrorCode::ServerError,
"No data columns and failed fetching corresponding block",
));
}
}
}
Expand Down
Loading
Loading