From 74a2bf38ddb63f80ef126b3747b9b96cd989def1 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 30 Oct 2023 11:58:03 +0000 Subject: [PATCH] feat: add `SnapshotCursor` wrapper and impl `HeaderProvider` (#5170) Co-authored-by: Alexey Shekhirin --- crates/primitives/src/snapshot/segment.rs | 15 +- crates/snapshot/src/segments/mod.rs | 2 +- crates/storage/db/src/snapshot.rs | 124 +++++++++++++++- crates/storage/nippy-jar/src/cursor.rs | 4 + .../provider/src/providers/snapshot/jar.rs | 137 +++++++++++------- .../provider/src/providers/snapshot/mod.rs | 23 +-- 6 files changed, 233 insertions(+), 72 deletions(-) diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 79d98fe0e8ba..8a86768ede68 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -81,8 +81,12 @@ impl SnapshotSegment { /// A segment header that contains information common to all segments. Used for storage. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] pub struct SegmentHeader { + /// Block range of the snapshot segment block_range: RangeInclusive, + /// Transaction range of the snapshot segment tx_range: RangeInclusive, + /// Segment type + segment: SnapshotSegment, } impl SegmentHeader { @@ -90,8 +94,9 @@ impl SegmentHeader { pub fn new( block_range: RangeInclusive, tx_range: RangeInclusive, + segment: SnapshotSegment, ) -> Self { - Self { block_range, tx_range } + Self { block_range, tx_range, segment } } /// Returns the first block number of the segment. @@ -103,4 +108,12 @@ impl SegmentHeader { pub fn tx_start(&self) -> TxNumber { *self.tx_range.start() } + + /// Returns the row offset which depends on whether the segment is block or transaction based. + pub fn start(&self) -> u64 { + match self.segment { + SnapshotSegment::Headers => self.block_start(), + SnapshotSegment::Transactions | SnapshotSegment::Receipts => self.tx_start(), + } + } } diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index 991677a781c6..9a8bb462789f 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -62,7 +62,7 @@ pub(crate) fn prepare_jar( let mut nippy_jar = NippyJar::new( COLUMNS, &segment.filename_with_configuration(filters, compression, &block_range), - SegmentHeader::new(block_range, tx_range), + SegmentHeader::new(block_range, tx_range, segment), ); nippy_jar = match compression { diff --git a/crates/storage/db/src/snapshot.rs b/crates/storage/db/src/snapshot.rs index 7cb27847f974..063662db855e 100644 --- a/crates/storage/db/src/snapshot.rs +++ b/crates/storage/db/src/snapshot.rs @@ -1,13 +1,15 @@ -//! reth's snapshot creation from database tables +//! reth's snapshot creation from database tables and access use crate::{ abstraction::cursor::DbCursorRO, - table::{Key, Table}, + table::{Decompress, Key, Table}, transaction::DbTx, RawKey, RawTable, }; -use reth_interfaces::RethResult; -use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey}; +use derive_more::{Deref, DerefMut}; +use reth_interfaces::{RethError, RethResult}; +use reth_nippy_jar::{ColumnResult, MmapHandle, NippyJar, NippyJarCursor, PHFKey}; +use reth_primitives::{snapshot::SegmentHeader, B256}; use reth_tracing::tracing::*; use serde::{Deserialize, Serialize}; use std::{error::Error as StdError, ops::RangeInclusive}; @@ -102,3 +104,117 @@ macro_rules! generate_snapshot_func { } generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),); + +/// Cursor of a snapshot segment. +#[derive(Debug, Deref, DerefMut)] +pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>); + +impl<'a> SnapshotCursor<'a> { + /// Returns a new [`SnapshotCursor`]. + pub fn new( + jar: &'a NippyJar, + mmap_handle: MmapHandle, + ) -> Result { + Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?)) + } + + /// Gets a row of values. + pub fn get( + &mut self, + key_or_num: KeyOrNumber<'_>, + ) -> RethResult>> { + let row = match key_or_num { + KeyOrNumber::Hash(k) => self.row_by_key_with_cols::(k), + KeyOrNumber::Number(n) => { + let offset = self.jar().user_header().start(); + if offset > n { + return Ok(None) + } + self.row_by_number_with_cols::((n - offset) as usize) + } + }?; + + Ok(row) + } + + /// Gets one column value from a row. + pub fn get_one( + &mut self, + key_or_num: KeyOrNumber<'_>, + ) -> RethResult> { + let row = self.get::(key_or_num)?; + + match row { + Some(row) => Ok(Some(T::decompress(row[0])?)), + None => Ok(None), + } + } + + /// Gets two column values from a row. + pub fn get_two( + &mut self, + key_or_num: KeyOrNumber<'_>, + ) -> RethResult> { + let row = self.get::(key_or_num)?; + + match row { + Some(row) => Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?))), + None => Ok(None), + } + } + + /// Gets three column values from a row. + pub fn get_three< + T: Decompress, + K: Decompress, + J: Decompress, + const SELECTOR: usize, + const COLUMNS: usize, + >( + &mut self, + key_or_num: KeyOrNumber<'_>, + ) -> RethResult> { + let row = self.get::(key_or_num)?; + + match row { + Some(row) => { + Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?, J::decompress(row[2])?))) + } + None => Ok(None), + } + } +} + +/// Either a key _or_ a block number +#[derive(Debug)] +pub enum KeyOrNumber<'a> { + /// A slice used as a key. Usually a block hash + Hash(&'a [u8]), + /// A block number + Number(u64), +} + +impl<'a> From<&'a B256> for KeyOrNumber<'a> { + fn from(value: &'a B256) -> Self { + KeyOrNumber::Hash(value.as_slice()) + } +} + +impl<'a> From for KeyOrNumber<'a> { + fn from(value: u64) -> Self { + KeyOrNumber::Number(value) + } +} + +/// Snapshot segment total columns. +pub const HEADER_COLUMNS: usize = 3; +/// Selector for header. +pub const S_HEADER: usize = 0b001; +/// Selector for header td. +pub const S_HEADER_TD: usize = 0b010; +/// Selector for header hash. +pub const S_HEADER_HASH: usize = 0b100; +/// Selector for header td and header hash. +pub const S_HEADER_TD_WITH_HASH: usize = 0b110; +/// Selector for header and header hash. +pub const S_HEADER_WITH_HASH: usize = 0b101; diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index 010bb1df2a0f..996fae00a334 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -58,6 +58,10 @@ where }) } + pub fn jar(&self) -> &NippyJar { + self.jar + } + /// Resets cursor to the beginning. pub fn reset(&mut self) { self.row = 0; diff --git a/crates/storage/provider/src/providers/snapshot/jar.rs b/crates/storage/provider/src/providers/snapshot/jar.rs index 4dd8099cf6c9..f018144c5616 100644 --- a/crates/storage/provider/src/providers/snapshot/jar.rs +++ b/crates/storage/provider/src/providers/snapshot/jar.rs @@ -1,17 +1,19 @@ use super::LoadedJarRef; use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; use reth_db::{ + snapshot::{ + SnapshotCursor, HEADER_COLUMNS, S_HEADER, S_HEADER_HASH, S_HEADER_TD, + S_HEADER_TD_WITH_HASH, S_HEADER_WITH_HASH, + }, table::{Decompress, Table}, - HeaderTD, + CanonicalHeaders, HeaderTD, }; use reth_interfaces::{provider::ProviderError, RethResult}; -use reth_nippy_jar::NippyJarCursor; use reth_primitives::{ - snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, - SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, - B256, U256, + Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader, + TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256, }; -use std::ops::{Deref, RangeBounds}; +use std::ops::{Deref, Range, RangeBounds}; /// Provider over a specific `NippyJar` and range. #[derive(Debug)] @@ -32,84 +34,93 @@ impl<'a> From> for SnapshotJarProvider<'a> { impl<'a> SnapshotJarProvider<'a> { /// Provides a cursor for more granular data access. - pub fn cursor<'b>(&'b self) -> RethResult> + pub fn cursor<'b>(&'b self) -> RethResult> where 'b: 'a, { - Ok(NippyJarCursor::with_handle(self.value(), self.mmap_handle())?) + SnapshotCursor::new(self.value(), self.mmap_handle()) } } impl<'a> HeaderProvider for SnapshotJarProvider<'a> { fn header(&self, block_hash: &BlockHash) -> RethResult> { - // WIP - let mut cursor = self.cursor()?; - - let header = Header::decompress( - cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], - ) - .unwrap(); - - if &header.hash_slow() == block_hash { - return Ok(Some(header)) - } else { - // check next snapshot - } - Ok(None) + Ok(self + .cursor()? + .get_two::::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(block_hash.into())? + .filter(|(_, hash)| hash == block_hash) + .map(|(header, _)| header)) } fn header_by_number(&self, num: BlockNumber) -> RethResult> { - Header::decompress( - self.cursor()? - .row_by_number_with_cols::<0b01, 2>( - (num - self.user_header().block_start()) as usize, - )? - .ok_or(ProviderError::HeaderNotFound(num.into()))?[0], - ) - .map(Some) - .map_err(Into::into) + self.cursor()?.get_one::(num.into()) } fn header_td(&self, block_hash: &BlockHash) -> RethResult> { - // WIP - let mut cursor = NippyJarCursor::with_handle(self.value(), self.mmap_handle())?; + Ok(self + .cursor()? + .get_two::<::Value, ::Value, S_HEADER_TD_WITH_HASH, HEADER_COLUMNS>( + block_hash.into(), + )? + .filter(|(_, hash)| hash == block_hash) + .map(|(td, _)| td.into())) + } - let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap(); + fn header_td_by_number(&self, num: BlockNumber) -> RethResult> { + Ok(self + .cursor()? + .get_one::<::Value, S_HEADER_TD, HEADER_COLUMNS>(num.into())? + .map(Into::into)) + } - let header = Header::decompress(row[0]).unwrap(); - let td = ::Value::decompress(row[1]).unwrap(); + fn headers_range(&self, range: impl RangeBounds) -> RethResult> { + let range = to_range(range); - if &header.hash_slow() == block_hash { - return Ok(Some(td.0)) - } else { - // check next snapshot - } - Ok(None) - } + let mut cursor = self.cursor()?; + let mut headers = Vec::with_capacity((range.end - range.start) as usize); - fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { - unimplemented!(); - } + for num in range.start..range.end { + match cursor.get_one::(num.into())? { + Some(header) => headers.push(header), + None => return Ok(headers), + } + } - fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { - unimplemented!(); + Ok(headers) } fn sealed_headers_range( &self, - _range: impl RangeBounds, + range: impl RangeBounds, ) -> RethResult> { - unimplemented!(); + let range = to_range(range); + + let mut cursor = self.cursor()?; + let mut headers = Vec::with_capacity((range.end - range.start) as usize); + + for number in range.start..range.end { + match cursor + .get_two::::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())? + { + Some((header, hash)) => headers.push(header.seal(hash)), + None => return Ok(headers), + } + } + Ok(headers) } - fn sealed_header(&self, _number: BlockNumber) -> RethResult> { - unimplemented!(); + fn sealed_header(&self, number: BlockNumber) -> RethResult> { + Ok(self + .cursor()? + .get_two::::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())? + .map(|(header, hash)| header.seal(hash))) } } impl<'a> BlockHashReader for SnapshotJarProvider<'a> { - fn block_hash(&self, _number: u64) -> RethResult> { - todo!() + fn block_hash(&self, number: u64) -> RethResult> { + self.cursor()?.get_one::<::Value, S_HEADER_HASH, HEADER_COLUMNS>( + number.into(), + ) } fn canonical_hashes_range( @@ -148,7 +159,7 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { TransactionSignedNoHash::decompress( self.cursor()? .row_by_number_with_cols::<0b1, 1>((num - self.user_header().tx_start()) as usize)? - .ok_or(ProviderError::TransactionNotFound(num.into()))?[0], + .ok_or_else(|| ProviderError::TransactionNotFound(num.into()))?[0], ) .map(Into::into) .map(Some) @@ -220,3 +231,19 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { todo!() } } + +fn to_range>(bounds: R) -> Range { + let start = match bounds.start_bound() { + std::ops::Bound::Included(&v) => v, + std::ops::Bound::Excluded(&v) => v + 1, + std::ops::Bound::Unbounded => 0, + }; + + let end = match bounds.end_bound() { + std::ops::Bound::Included(&v) => v + 1, + std::ops::Bound::Excluded(&v) => v, + std::ops::Bound::Unbounded => u64::MAX, + }; + + start..end +} diff --git a/crates/storage/provider/src/providers/snapshot/mod.rs b/crates/storage/provider/src/providers/snapshot/mod.rs index 2f0dad3c5821..f7fed480f4b9 100644 --- a/crates/storage/provider/src/providers/snapshot/mod.rs +++ b/crates/storage/provider/src/providers/snapshot/mod.rs @@ -46,7 +46,7 @@ mod test { use reth_db::{ cursor::DbCursorRO, database::Database, - snapshot::create_snapshot_T1_T2, + snapshot::create_snapshot_T1_T2_T3, test_utils::create_test_rw_db, transaction::{DbTx, DbTxMut}, CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable, @@ -60,7 +60,8 @@ mod test { // Ranges let row_count = 100u64; let range = 0..=(row_count - 1); - let segment_header = SegmentHeader::new(range.clone(), range.clone()); + let segment_header = + SegmentHeader::new(range.clone(), range.clone(), SnapshotSegment::Headers); // Data sources let db = create_test_rw_db(); @@ -95,7 +96,7 @@ mod test { let with_compression = true; let with_filter = true; - let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header); + let mut nippy_jar = NippyJar::new(3, snap_file.path(), segment_header); if with_compression { nippy_jar = nippy_jar.with_zstd(false, 0); @@ -118,14 +119,14 @@ mod test { .unwrap() .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())); - create_snapshot_T1_T2::( - &tx, - range, - None, - none_vec, - Some(hashes), - row_count as usize, - &mut nippy_jar, + create_snapshot_T1_T2_T3::< + Headers, + HeaderTD, + CanonicalHeaders, + BlockNumber, + SegmentHeader, + >( + &tx, range, None, none_vec, Some(hashes), row_count as usize, &mut nippy_jar ) .unwrap(); }