From b40dceaae9a9ba3d7ba6504f56ad42f954868a0a Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 22 Mar 2023 01:56:32 +1100 Subject: [PATCH 1/2] Update get blobs endpoint to return a list of BlobSidecars (#4109) * Update get blobs endpoint to return BlobSidecarList * Update code comment * Update blob retrieval to return BlobSidecarList without Arc * Remove usage of BlobSidecarList type alias to avoid code conflicts * Add clippy allow exception --- beacon_node/beacon_chain/src/beacon_chain.rs | 17 +++++ beacon_node/http_api/src/block_id.rs | 21 +++--- beacon_node/http_api/src/lib.rs | 76 ++++++++++---------- common/eth2/src/lib.rs | 18 +++-- 4 files changed, 77 insertions(+), 55 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index dc7541b6385..016bda13ab3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1057,6 +1057,23 @@ impl BeaconChain { .map(Some) } + // FIXME(jimmy): temporary method added to unblock API work. This method will be replaced by + // the `get_blobs` method below once the new blob sidecar structure (`BlobSidecarList`) is + // implemented in that method. + #[allow(clippy::type_complexity)] // FIXME: this will be fixed by the `BlobSidecarList` alias in Sean's PR + pub fn get_blob_sidecar_list( + &self, + _block_root: &Hash256, + _data_availability_boundary: Epoch, + ) -> Result< + Option< + VariableList>, ::MaxBlobsPerBlock>, + >, + Error, + > { + unimplemented!("update to use the updated `get_blobs` method instead once this PR is merged: https://github.com/sigp/lighthouse/pull/4104") + } + /// Returns the blobs at the given root, if any. /// /// Returns `Ok(None)` if the blobs and associated block are not found. diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index b484f4079aa..9183437f990 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,10 +1,10 @@ use crate::{state_id::checkpoint_slot_and_execution_optimistic, ExecutionOptimistic}; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use eth2::types::BlockId as CoreBlockId; +use eth2::types::{BlockId as CoreBlockId, VariableList}; use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlobsSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; +use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -212,19 +212,22 @@ impl BlockId { } } - /// Return the `BlobsSidecar` identified by `self`. - pub async fn blobs_sidecar( + /// Return the `BlobSidecarList` identified by `self`. + pub async fn blob_sidecar_list( &self, chain: &BeaconChain, - ) -> Result>, warp::Rejection> { + ) -> Result< + VariableList>, ::MaxBlobsPerBlock>, + warp::Rejection, + > { let root = self.root(chain)?.0; let Some(data_availability_boundary) = chain.data_availability_boundary() else { - return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into())); + return Err(warp_utils::reject::custom_not_found("Deneb fork disabled".into())); }; - match chain.get_blobs(&root, data_availability_boundary) { - Ok(Some(blob)) => Ok(Arc::new(blob)), + match chain.get_blob_sidecar_list(&root, data_availability_boundary) { + Ok(Some(blobs)) => Ok(blobs), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( - "Blob with block root {} is not in the store", + "No blobs with block root {} found in the store", root ))), Err(e) => Err(warp_utils::reject::beacon_chain_error(e)), diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 6b0518a23c0..797e8f72b45 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1293,6 +1293,45 @@ pub fn serve( }, ); + /* + * beacon/blobs + */ + + // GET beacon/blobs/{block_id} + let get_blobs = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blobs")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and(warp::header::optional::("accept")) + .and_then( + |block_id: BlockId, + chain: Arc>, + accept_header: Option| { + async move { + let blob_sidecar_list = block_id.blob_sidecar_list(&chain).await?; + + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .body(blob_sidecar_list.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => Ok(warp::reply::json(&api_types::GenericResponse::from( + blob_sidecar_list, + )) + .into_response()), + } + } + }, + ); + /* * beacon/pool */ @@ -3498,41 +3537,6 @@ pub fn serve( ) }); - // GET lighthouse/beacon/blobs_sidecars/{block_id} - let get_lighthouse_blobs_sidecars = warp::path("lighthouse") - .and(warp::path("beacon")) - .and(warp::path("blobs_sidecars")) - .and(block_id_or_err) - .and(warp::path::end()) - .and(chain_filter.clone()) - .and(warp::header::optional::("accept")) - .and_then( - |block_id: BlockId, - chain: Arc>, - accept_header: Option| { - async move { - let blobs_sidecar = block_id.blobs_sidecar(&chain).await?; - - match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() - .status(200) - .header("Content-Type", "application/octet-stream") - .body(blobs_sidecar.as_ssz_bytes().into()) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "failed to create response: {}", - e - )) - }), - _ => Ok(warp::reply::json(&api_types::GenericResponse::from( - blobs_sidecar, - )) - .into_response()), - } - } - }, - ); - let get_events = eth_v1 .and(warp::path("events")) .and(warp::path::end()) @@ -3627,6 +3631,7 @@ pub fn serve( .uor(get_beacon_block_attestations) .uor(get_beacon_blinded_block) .uor(get_beacon_block_root) + .uor(get_blobs) .uor(get_beacon_pool_attestations) .uor(get_beacon_pool_attester_slashings) .uor(get_beacon_pool_proposer_slashings) @@ -3672,7 +3677,6 @@ pub fn serve( .uor(get_lighthouse_attestation_performance) .uor(get_lighthouse_block_packing_efficiency) .uor(get_lighthouse_merge_readiness) - .uor(get_lighthouse_blobs_sidecars.boxed()) .uor(get_events) .recover(warp_utils::reject::handle_rejection), ) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 2a27d31da9b..a57c2ca3d71 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -658,15 +658,13 @@ impl BeaconNodeHttpClient { Ok(path) } - /// Path for `lighthouse/beacon/blobs_sidecars/{block_id}` - pub fn get_blobs_sidecar_path(&self, block_id: BlockId) -> Result { - let mut path = self.server.full.clone(); - + /// Path for `v1/beacon/blobs/{block_id}` + pub fn get_blobs_path(&self, block_id: BlockId) -> Result { + let mut path = self.eth_path(V1)?; path.path_segments_mut() .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") .push("beacon") - .push("blobs_sidecars") + .push("blobs") .push(&block_id.to_string()); Ok(path) } @@ -698,14 +696,14 @@ impl BeaconNodeHttpClient { Ok(Some(response.json().await?)) } - /// `GET lighthouse/beacon/blobs_sidecars/{block_id}` + /// `GET v1/beacon/blobs/{block_id}` /// /// Returns `Ok(None)` on a 404 error. - pub async fn get_blobs_sidecar( + pub async fn get_blobs( &self, block_id: BlockId, - ) -> Result>>, Error> { - let path = self.get_blobs_sidecar_path(block_id)?; + ) -> Result>>, Error> { + let path = self.get_blobs_path(block_id)?; let response = match self.get_response(path, |b| b).await.optional()? { Some(res) => res, None => return Ok(None), From d1e653cfdbac04eddb69396f2a52322140ab5e04 Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Tue, 21 Mar 2023 14:33:06 -0500 Subject: [PATCH 2/2] Update Blob Storage Structure (#4104) * Initial Changes to Blob Storage * Add Arc to SignedBlobSidecar Definition --- beacon_node/beacon_chain/src/beacon_chain.rs | 65 ++++----------- .../beacon_chain/src/early_attester_cache.rs | 4 +- beacon_node/http_api/src/block_id.rs | 13 ++- .../beacon_processor/worker/rpc_methods.rs | 80 +++++++------------ beacon_node/store/src/hot_cold_store.rs | 20 ++--- beacon_node/store/src/lib.rs | 3 +- consensus/tree_hash/src/impls.rs | 19 +++++ consensus/types/src/blob_sidecar.rs | 3 +- consensus/types/src/signed_blob.rs | 3 +- 9 files changed, 86 insertions(+), 124 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 016bda13ab3..00be8a25c97 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -959,35 +959,20 @@ impl BeaconChain { Ok(self.get_block(block_root).await?.map(Arc::new)) } - pub async fn get_block_and_blobs_checking_early_attester_cache( + pub async fn get_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result>, Error> { + ) -> Result>, Error> { // If there is no data availability boundary, the Eip4844 fork is disabled. if let Some(finalized_data_availability_boundary) = self.finalized_data_availability_boundary() { - // Only use the attester cache if we can find both the block and blob - if let (Some(block), Some(blobs)) = ( - self.early_attester_cache.get_block(*block_root), - self.early_attester_cache.get_blobs(*block_root), - ) { - Ok(Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - // Attempt to get the block and blobs from the database - } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { - let blobs = self - .get_blobs(block_root, finalized_data_availability_boundary)? - .map(Arc::new); - Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })) - } else { - Ok(None) - } + self.early_attester_cache + .get_blobs(*block_root) + .map_or_else( + || self.get_blobs(block_root, finalized_data_availability_boundary), + |blobs| Ok(Some(blobs)), + ) } else { Ok(None) } @@ -1057,23 +1042,6 @@ impl BeaconChain { .map(Some) } - // FIXME(jimmy): temporary method added to unblock API work. This method will be replaced by - // the `get_blobs` method below once the new blob sidecar structure (`BlobSidecarList`) is - // implemented in that method. - #[allow(clippy::type_complexity)] // FIXME: this will be fixed by the `BlobSidecarList` alias in Sean's PR - pub fn get_blob_sidecar_list( - &self, - _block_root: &Hash256, - _data_availability_boundary: Epoch, - ) -> Result< - Option< - VariableList>, ::MaxBlobsPerBlock>, - >, - Error, - > { - unimplemented!("update to use the updated `get_blobs` method instead once this PR is merged: https://github.com/sigp/lighthouse/pull/4104") - } - /// Returns the blobs at the given root, if any. /// /// Returns `Ok(None)` if the blobs and associated block are not found. @@ -1091,9 +1059,9 @@ impl BeaconChain { &self, block_root: &Hash256, data_availability_boundary: Epoch, - ) -> Result>, Error> { + ) -> Result>, Error> { match self.store.get_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), + Some(blob_sidecar_list) => Ok(Some(blob_sidecar_list)), None => { // Check for the corresponding block to understand whether we *should* have blobs. self.get_blinded_block(block_root)? @@ -1106,7 +1074,8 @@ impl BeaconChain { Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), }; if expected_kzg_commitments.is_empty() { - Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot())) + // TODO (mark): verify this + Ok(BlobSidecarList::empty()) } else if data_availability_boundary <= block.epoch() { // We should have blobs for all blocks younger than the boundary. Err(Error::BlobsUnavailable) @@ -3052,7 +3021,7 @@ impl BeaconChain { // margin, or younger (of higher epoch number). if block_epoch >= import_boundary { if let Some(blobs) = blobs { - if !blobs.blobs.is_empty() { + if !blobs.is_empty() { //FIXME(sean) using this for debugging for now info!( self.log, "Writing blobs to store"; @@ -4814,7 +4783,7 @@ impl BeaconChain { ) .map_err(BlockProductionError::KzgError)?; - let blob_sidecars = VariableList::from( + let blob_sidecars = BlobSidecarList::from( blobs .into_iter() .enumerate() @@ -4827,7 +4796,7 @@ impl BeaconChain { .get(blob_index) .expect("KZG proof should exist for blob"); - Ok(BlobSidecar { + Ok(Arc::new(BlobSidecar { block_root: beacon_block_root, index: blob_index as u64, slot, @@ -4836,9 +4805,9 @@ impl BeaconChain { blob, kzg_commitment: *kzg_commitment, kzg_proof: *kzg_proof, - }) + })) }) - .collect::>, BlockProductionError>>()?, + .collect::, BlockProductionError>>()?, ); self.blob_cache.put(beacon_block_root, blob_sidecars); diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index dd4109da9b4..5fe14c7e252 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -21,7 +21,7 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, - blobs: Option>>, + blobs: Option>, proto_block: ProtoBlock, } @@ -160,7 +160,7 @@ impl EarlyAttesterCache { } /// Returns the blobs, if `block_root` matches the cached item. - pub fn get_blobs(&self, block_root: Hash256) -> Option>> { + pub fn get_blobs(&self, block_root: Hash256) -> Option> { self.item .read() .as_ref() diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 9183437f990..ef7affeb7f4 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,10 +1,10 @@ use crate::{state_id::checkpoint_slot_and_execution_optimistic, ExecutionOptimistic}; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use eth2::types::{BlockId as CoreBlockId, VariableList}; +use eth2::types::BlockId as CoreBlockId; use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; +use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -216,16 +216,13 @@ impl BlockId { pub async fn blob_sidecar_list( &self, chain: &BeaconChain, - ) -> Result< - VariableList>, ::MaxBlobsPerBlock>, - warp::Rejection, - > { + ) -> Result, warp::Rejection> { let root = self.root(chain)?.0; let Some(data_availability_boundary) = chain.data_availability_boundary() else { return Err(warp_utils::reject::custom_not_found("Deneb fork disabled".into())); }; - match chain.get_blob_sidecar_list(&root, data_availability_boundary) { - Ok(Some(blobs)) => Ok(blobs), + match chain.get_blobs(&root, data_availability_boundary) { + Ok(Some(blob_sidecar_list)) => Ok(blob_sidecar_list), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "No blobs with block root {} found in the store", root diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 78b9de303fc..565b1ce8867 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; +use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use task_executor::TaskExecutor; use types::blob_sidecar::BlobIdentifier; @@ -225,42 +226,34 @@ impl Worker { executor.spawn( async move { let requested_blobs = request.blob_ids.len(); - let mut send_block_count = 0; + let mut send_blob_count = 0; let mut send_response = true; + + let mut blob_list_results = HashMap::new(); for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() { - match self - .chain - .get_block_and_blobs_checking_early_attester_cache(&root) - .await - { - Ok(Some(block_and_blobs)) => { - // - // TODO: HORRIBLE NSFW CODE AHEAD - // - let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; - let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); - // TODO: this should be unreachable after this is addressed seriously, - // so for now let's be ok with a panic in the expect. - let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); - // Intentionally not accessing the list directly - for (known_index, blob) in blob_bundle.into_iter().enumerate() { - if (known_index as u64) == index { - let blob_sidecar = types::BlobSidecar{ - block_root: beacon_block_root, - index, - slot: beacon_block_slot, - block_parent_root: block.parent_root, - proposer_index: block.proposer_index, - blob, - kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. - kzg_proof: kzg_aggregated_proof // TODO: yeah - }; + let blob_list_result = match blob_list_results.entry(root) { + Entry::Vacant(entry) => { + entry.insert(self + .chain + .get_blobs_checking_early_attester_cache(&root) + .await) + } + Entry::Occupied(entry) => { + entry.into_mut() + } + }; + + match blob_list_result.as_ref() { + Ok(Some(blobs_sidecar_list)) => { + for blob_sidecar in blobs_sidecar_list.iter() { + if blob_sidecar.index == index { self.send_response( peer_id, - Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), + Response::BlobsByRoot(Some(blob_sidecar.clone())), request_id, ); - send_block_count += 1; + send_blob_count += 1; + break; } } } @@ -355,7 +348,7 @@ impl Worker { "Received BlobsByRoot Request"; "peer" => %peer_id, "requested" => requested_blobs, - "returned" => send_block_count + "returned" => send_blob_count ); // send stream termination @@ -837,31 +830,12 @@ impl Worker { for root in block_roots { match self.chain.get_blobs(&root, data_availability_boundary) { - Ok(Some(blobs)) => { - // TODO: more GROSS code ahead. Reader beware - let types::BlobsSidecar { - beacon_block_root, - beacon_block_slot, - blobs: blob_bundle, - kzg_aggregated_proof: _, - }: types::BlobsSidecar<_> = blobs; - - for (blob_index, blob) in blob_bundle.into_iter().enumerate() { - let blob_sidecar = types::BlobSidecar { - block_root: beacon_block_root, - index: blob_index as u64, - slot: beacon_block_slot, - block_parent_root: Hash256::zero(), - proposer_index: 0, - blob, - kzg_commitment: types::KzgCommitment::default(), - kzg_proof: types::KzgProof::default(), - }; - + Ok(Some(blob_sidecar_list)) => { + for blob_sidecar in blob_sidecar_list.iter() { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), + response: Response::BlobsByRange(Some(blob_sidecar.clone())), id: request_id, }); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 60e2f775959..2c80c2c1ac7 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -66,7 +66,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// The hot database also contains all blocks. pub hot_db: Hot, /// LRU cache of deserialized blobs. Updated whenever a blob is loaded. - blob_cache: Mutex>>, + blob_cache: Mutex>>, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -568,7 +568,7 @@ impl, Cold: ItemStore> HotColdDB blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } - pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { + pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList) -> Result<(), Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); blobs_db.put_bytes( DBColumn::BeaconBlob.into(), @@ -582,7 +582,7 @@ impl, Cold: ItemStore> HotColdDB pub fn blobs_as_kv_store_ops( &self, key: &Hash256, - blobs: &BlobsSidecar, + blobs: BlobSidecarList, ops: &mut Vec, ) { let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes()); @@ -817,7 +817,7 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::PutBlobs(block_root, blobs) => { - self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch); + self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch); } StoreOp::PutStateSummary(state_root, summary) => { @@ -885,8 +885,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { - Ok(Some(blobs_sidecar)) => { - blobs_to_delete.push(blobs_sidecar); + Ok(Some(blobs_sidecar_list)) => { + blobs_to_delete.push((*block_root, blobs_sidecar_list)); } Err(e) => { error!( @@ -926,7 +926,7 @@ impl, Cold: ItemStore> HotColdDB let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { - Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)), + Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, _ => return Err(HotColdDBError::Rollback.into()), @@ -972,7 +972,7 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops { match op { StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(block_root, (*blobs).clone()); + guard_blob.put(block_root, blobs); } StoreOp::DeleteBlobs(block_root) => { @@ -1320,12 +1320,12 @@ impl, Cold: ItemStore> HotColdDB } /// Fetch a blobs sidecar from the store. - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { - let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?; + let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?; // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, // may want to attempt to use one again self.blob_cache.lock().put(*block_root, blobs.clone()); diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3056c292923..29fded5fa6d 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -159,7 +159,8 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), - PutBlobs(Hash256, Arc>), + // TODO (mark): space can be optimized here by de-duplicating data + PutBlobs(Hash256, BlobSidecarList), PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), diff --git a/consensus/tree_hash/src/impls.rs b/consensus/tree_hash/src/impls.rs index 899356f8331..134be402194 100644 --- a/consensus/tree_hash/src/impls.rs +++ b/consensus/tree_hash/src/impls.rs @@ -1,5 +1,6 @@ use super::*; use ethereum_types::{H160, H256, U128, U256}; +use std::sync::Arc; fn int_to_hash256(int: u64) -> Hash256 { let mut bytes = [0; HASHSIZE]; @@ -186,6 +187,24 @@ impl TreeHash for H256 { } } +impl TreeHash for Arc { + fn tree_hash_type() -> TreeHashType { + T::tree_hash_type() + } + + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + self.as_ref().tree_hash_packed_encoding() + } + + fn tree_hash_packing_factor() -> usize { + T::tree_hash_packing_factor() + } + + fn tree_hash_root(&self) -> Hash256 { + self.as_ref().tree_hash_root() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 169c570d291..29eaadc5842 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -6,6 +6,7 @@ use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -47,7 +48,7 @@ pub struct BlobSidecar { pub kzg_proof: KzgProof, } -pub type BlobSidecarList = VariableList, ::MaxBlobsPerBlock>; +pub type BlobSidecarList = VariableList>, ::MaxBlobsPerBlock>; impl SignedRoot for BlobSidecar {} diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs index f9ae4812478..4eb28794ed5 100644 --- a/consensus/types/src/signed_blob.rs +++ b/consensus/types/src/signed_blob.rs @@ -3,6 +3,7 @@ use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -23,7 +24,7 @@ use tree_hash_derive::TreeHash; #[arbitrary(bound = "T: EthSpec")] #[derivative(Hash(bound = "T: EthSpec"))] pub struct SignedBlobSidecar { - pub message: BlobSidecar, + pub message: Arc>, pub signature: Signature, }