Skip to content

Commit

Permalink
feat: add SnapshotCursor wrapper and impl HeaderProvider (#5170)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
joshieDo and shekhirin authored Oct 30, 2023
1 parent d51bc45 commit 74a2bf3
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 72 deletions.
15 changes: 14 additions & 1 deletion crates/primitives/src/snapshot/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,22 @@ impl SnapshotSegment {
/// A segment header that contains information common to all segments. Used for storage.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
pub struct SegmentHeader {
/// Block range of the snapshot segment
block_range: RangeInclusive<BlockNumber>,
/// Transaction range of the snapshot segment
tx_range: RangeInclusive<TxNumber>,
/// Segment type
segment: SnapshotSegment,
}

impl SegmentHeader {
/// Returns [`SegmentHeader`].
pub fn new(
block_range: RangeInclusive<BlockNumber>,
tx_range: RangeInclusive<TxNumber>,
segment: SnapshotSegment,
) -> Self {
Self { block_range, tx_range }
Self { block_range, tx_range, segment }
}

/// Returns the first block number of the segment.
Expand All @@ -103,4 +108,12 @@ impl SegmentHeader {
pub fn tx_start(&self) -> TxNumber {
*self.tx_range.start()
}

/// Returns the row offset which depends on whether the segment is block or transaction based.
pub fn start(&self) -> u64 {
match self.segment {
SnapshotSegment::Headers => self.block_start(),
SnapshotSegment::Transactions | SnapshotSegment::Receipts => self.tx_start(),
}
}
}
2 changes: 1 addition & 1 deletion crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let mut nippy_jar = NippyJar::new(
COLUMNS,
&segment.filename_with_configuration(filters, compression, &block_range),
SegmentHeader::new(block_range, tx_range),
SegmentHeader::new(block_range, tx_range, segment),
);

nippy_jar = match compression {
Expand Down
124 changes: 120 additions & 4 deletions crates/storage/db/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! reth's snapshot creation from database tables
//! reth's snapshot creation from database tables and access
use crate::{
abstraction::cursor::DbCursorRO,
table::{Key, Table},
table::{Decompress, Key, Table},
transaction::DbTx,
RawKey, RawTable,
};
use reth_interfaces::RethResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use derive_more::{Deref, DerefMut};
use reth_interfaces::{RethError, RethResult};
use reth_nippy_jar::{ColumnResult, MmapHandle, NippyJar, NippyJarCursor, PHFKey};
use reth_primitives::{snapshot::SegmentHeader, B256};
use reth_tracing::tracing::*;
use serde::{Deserialize, Serialize};
use std::{error::Error as StdError, ops::RangeInclusive};
Expand Down Expand Up @@ -102,3 +104,117 @@ macro_rules! generate_snapshot_func {
}

generate_snapshot_func!((T1), (T1, T2), (T1, T2, T3), (T1, T2, T3, T4), (T1, T2, T3, T4, T5),);

/// Cursor of a snapshot segment.
#[derive(Debug, Deref, DerefMut)]
pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>);

impl<'a> SnapshotCursor<'a> {
/// Returns a new [`SnapshotCursor`].
pub fn new(
jar: &'a NippyJar<SegmentHeader>,
mmap_handle: MmapHandle,
) -> Result<Self, RethError> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
}

/// Gets a row of values.
pub fn get<const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<Vec<&'_ [u8]>>> {
let row = match key_or_num {
KeyOrNumber::Hash(k) => self.row_by_key_with_cols::<SELECTOR, COLUMNS>(k),
KeyOrNumber::Number(n) => {
let offset = self.jar().user_header().start();
if offset > n {
return Ok(None)
}
self.row_by_number_with_cols::<SELECTOR, COLUMNS>((n - offset) as usize)
}
}?;

Ok(row)
}

/// Gets one column value from a row.
pub fn get_one<T: Decompress, const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<T>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;

match row {
Some(row) => Ok(Some(T::decompress(row[0])?)),
None => Ok(None),
}
}

/// Gets two column values from a row.
pub fn get_two<T: Decompress, K: Decompress, const SELECTOR: usize, const COLUMNS: usize>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(T, K)>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;

match row {
Some(row) => Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?))),
None => Ok(None),
}
}

/// Gets three column values from a row.
pub fn get_three<
T: Decompress,
K: Decompress,
J: Decompress,
const SELECTOR: usize,
const COLUMNS: usize,
>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(T, K, J)>> {
let row = self.get::<SELECTOR, COLUMNS>(key_or_num)?;

match row {
Some(row) => {
Ok(Some((T::decompress(row[0])?, K::decompress(row[1])?, J::decompress(row[2])?)))
}
None => Ok(None),
}
}
}

/// Either a key _or_ a block number
#[derive(Debug)]
pub enum KeyOrNumber<'a> {
/// A slice used as a key. Usually a block hash
Hash(&'a [u8]),
/// A block number
Number(u64),
}

impl<'a> From<&'a B256> for KeyOrNumber<'a> {
fn from(value: &'a B256) -> Self {
KeyOrNumber::Hash(value.as_slice())
}
}

impl<'a> From<u64> for KeyOrNumber<'a> {
fn from(value: u64) -> Self {
KeyOrNumber::Number(value)
}
}

/// Snapshot segment total columns.
pub const HEADER_COLUMNS: usize = 3;
/// Selector for header.
pub const S_HEADER: usize = 0b001;
/// Selector for header td.
pub const S_HEADER_TD: usize = 0b010;
/// Selector for header hash.
pub const S_HEADER_HASH: usize = 0b100;
/// Selector for header td and header hash.
pub const S_HEADER_TD_WITH_HASH: usize = 0b110;
/// Selector for header and header hash.
pub const S_HEADER_WITH_HASH: usize = 0b101;
4 changes: 4 additions & 0 deletions crates/storage/nippy-jar/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ where
})
}

pub fn jar(&self) -> &NippyJar<H> {
self.jar
}

/// Resets cursor to the beginning.
pub fn reset(&mut self) {
self.row = 0;
Expand Down
137 changes: 82 additions & 55 deletions crates/storage/provider/src/providers/snapshot/jar.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use super::LoadedJarRef;
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use reth_db::{
snapshot::{
SnapshotCursor, HEADER_COLUMNS, S_HEADER, S_HEADER_HASH, S_HEADER_TD,
S_HEADER_TD_WITH_HASH, S_HEADER_WITH_HASH,
},
table::{Decompress, Table},
HeaderTD,
CanonicalHeaders, HeaderTD,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::NippyJarCursor;
use reth_primitives::{
snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header,
SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use std::ops::{Deref, RangeBounds};
use std::ops::{Deref, Range, RangeBounds};

/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
Expand All @@ -32,84 +34,93 @@ impl<'a> From<LoadedJarRef<'a>> for SnapshotJarProvider<'a> {

impl<'a> SnapshotJarProvider<'a> {
/// Provides a cursor for more granular data access.
pub fn cursor<'b>(&'b self) -> RethResult<NippyJarCursor<'a, SegmentHeader>>
pub fn cursor<'b>(&'b self) -> RethResult<SnapshotCursor<'a>>
where
'b: 'a,
{
Ok(NippyJarCursor::with_handle(self.value(), self.mmap_handle())?)
SnapshotCursor::new(self.value(), self.mmap_handle())
}
}

impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
// WIP
let mut cursor = self.cursor()?;

let header = Header::decompress(
cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
)
.unwrap();

if &header.hash_slow() == block_hash {
return Ok(Some(header))
} else {
// check next snapshot
}
Ok(None)
Ok(self
.cursor()?
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(block_hash.into())?
.filter(|(_, hash)| hash == block_hash)
.map(|(header, _)| header))
}

fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
self.cursor()?
.row_by_number_with_cols::<0b01, 2>(
(num - self.user_header().block_start()) as usize,
)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?[0],
)
.map(Some)
.map_err(Into::into)
self.cursor()?.get_one::<Header, S_HEADER, HEADER_COLUMNS>(num.into())
}

fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
// WIP
let mut cursor = NippyJarCursor::with_handle(self.value(), self.mmap_handle())?;
Ok(self
.cursor()?
.get_two::<<HeaderTD as Table>::Value, <CanonicalHeaders as Table>::Value, S_HEADER_TD_WITH_HASH, HEADER_COLUMNS>(
block_hash.into(),
)?
.filter(|(_, hash)| hash == block_hash)
.map(|(td, _)| td.into()))
}

let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap();
fn header_td_by_number(&self, num: BlockNumber) -> RethResult<Option<U256>> {
Ok(self
.cursor()?
.get_one::<<HeaderTD as Table>::Value, S_HEADER_TD, HEADER_COLUMNS>(num.into())?
.map(Into::into))
}

let header = Header::decompress(row[0]).unwrap();
let td = <HeaderTD as Table>::Value::decompress(row[1]).unwrap();
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
let range = to_range(range);

if &header.hash_slow() == block_hash {
return Ok(Some(td.0))
} else {
// check next snapshot
}
Ok(None)
}
let mut cursor = self.cursor()?;
let mut headers = Vec::with_capacity((range.end - range.start) as usize);

fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
unimplemented!();
}
for num in range.start..range.end {
match cursor.get_one::<Header, S_HEADER, HEADER_COLUMNS>(num.into())? {
Some(header) => headers.push(header),
None => return Ok(headers),
}
}

fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
unimplemented!();
Ok(headers)
}

fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
unimplemented!();
let range = to_range(range);

let mut cursor = self.cursor()?;
let mut headers = Vec::with_capacity((range.end - range.start) as usize);

for number in range.start..range.end {
match cursor
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())?
{
Some((header, hash)) => headers.push(header.seal(hash)),
None => return Ok(headers),
}
}
Ok(headers)
}

fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
unimplemented!();
fn sealed_header(&self, number: BlockNumber) -> RethResult<Option<SealedHeader>> {
Ok(self
.cursor()?
.get_two::<Header, <CanonicalHeaders as Table>::Value, S_HEADER_WITH_HASH, HEADER_COLUMNS>(number.into())?
.map(|(header, hash)| header.seal(hash)))
}
}

impl<'a> BlockHashReader for SnapshotJarProvider<'a> {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
fn block_hash(&self, number: u64) -> RethResult<Option<B256>> {
self.cursor()?.get_one::<<CanonicalHeaders as Table>::Value, S_HEADER_HASH, HEADER_COLUMNS>(
number.into(),
)
}

fn canonical_hashes_range(
Expand Down Expand Up @@ -148,7 +159,7 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
TransactionSignedNoHash::decompress(
self.cursor()?
.row_by_number_with_cols::<0b1, 1>((num - self.user_header().tx_start()) as usize)?
.ok_or(ProviderError::TransactionNotFound(num.into()))?[0],
.ok_or_else(|| ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
.map(Some)
Expand Down Expand Up @@ -220,3 +231,19 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
todo!()
}
}

fn to_range<R: RangeBounds<u64>>(bounds: R) -> Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
std::ops::Bound::Excluded(&v) => v + 1,
std::ops::Bound::Unbounded => 0,
};

let end = match bounds.end_bound() {
std::ops::Bound::Included(&v) => v + 1,
std::ops::Bound::Excluded(&v) => v,
std::ops::Bound::Unbounded => u64::MAX,
};

start..end
}
Loading

0 comments on commit 74a2bf3

Please sign in to comment.