Skip to content

Commit

Permalink
Implement PeerDAS RPC handlers (sigp#6237)
Browse files Browse the repository at this point in the history
* Implement PeerDAS RPC handlers

* use terminate_response_stream

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into peerdas-network-rpc-handler

* cargo fmt
  • Loading branch information
dapplion authored Aug 15, 2024
1 parent 9fc0a66 commit 6566705
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 11 deletions.
31 changes: 31 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_column_checking_all_caches(
&self,
block_root: Hash256,
index: ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
if let Some(column) = self
.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })?
{
return Ok(Some(column));
}

if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) {
return Ok(columns.iter().find(|c| c.index == index).cloned());
}

self.get_data_column(&block_root, &index)
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -1230,6 +1249,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Returns the data columns at the given root, if any.
///
/// ## Errors
/// May return a database error.
pub fn get_data_column(
&self,
block_root: &Hash256,
column_index: &ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
Ok(self.store.get_data_column(block_root, column_index)?)
}

pub fn get_blinded_block(
&self,
block_root: &Hash256,
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

mod error;
Expand Down Expand Up @@ -173,6 +173,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}

/// Get a data column from the availability cache.
pub fn get_data_column(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
self.availability_cache.peek_data_column(data_column_id)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock};
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec,
Hash256, SignedBeaconBlock,
};

/// This represents the components of a partially available block
///
Expand Down Expand Up @@ -389,6 +392,22 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

/// Fetch a data column from the cache without affecting the LRU ordering
pub fn peek_data_column(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) {
Ok(pending_components
.verified_data_columns
.iter()
.find(|data_column| data_column.as_data_column().index == data_column_id.index)
.map(|data_column| data_column.clone_arc()))
} else {
Ok(None)
}
}

pub fn peek_pending_components<R, F: FnOnce(Option<&PendingComponents<T::EthSpec>>) -> R>(
&self,
block_root: &Hash256,
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data
}
pub fn clone_arc(&self) -> Arc<DataColumnSidecar<E>> {
self.data.clone()
}
}

/// Complete kzg verification for a `DataColumnSidecar`.
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
Expand Down Expand Up @@ -426,6 +427,17 @@ impl DataColumnsByRootRequest {
pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
}

pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
for request_id in self.data_column_ids.as_slice() {
column_indexes_by_block
.entry(request_id.block_root)
.or_default()
.push(request_id.index);
}
column_indexes_by_block.into_iter().collect()
}
}

/* RPC Handling and Grouping */
Expand Down
Loading

0 comments on commit 6566705

Please sign in to comment.