Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commmand for pruning states #4648

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions beacon_node/beacon_chain/src/otb_verification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,13 @@ pub fn start_otb_verification_service<T: BeaconChainTypes>(
pub fn load_optimistic_transition_blocks<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<Vec<OptimisticTransitionBlock>, StoreError> {
process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
})?
process_results(
chain.store.hot_db.iter_column::<Hash256>(OTBColumn),
|iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
},
)?
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum Error {
SlotClockUnavailableForMigration,
UnableToDowngrade,
InconsistentFork(InconsistentFork),
InvalidKey,
}

pub trait HandleUnavailable<T> {
Expand Down
83 changes: 83 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,89 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
);
Ok(())
}

/// Delete *all* states from the freezer database and update the anchor accordingly.
///
/// WARNING: this method deletes the genesis state and replaces it with the provided
/// `genesis_state`. This is to support its use in schema migrations where the storage scheme of
/// the genesis state may be modified. It is the responsibility of the caller to ensure that the
/// genesis state is correct, else a corrupt database will be created.
pub fn prune_historic_states(
&self,
genesis_state_root: Hash256,
genesis_state: &BeaconState<E>,
) -> Result<(), Error> {
// Update the anchor to use the dummy state upper limit and disable historic state storage.
let old_anchor = self.get_anchor_info();
let new_anchor = if let Some(old_anchor) = old_anchor.clone() {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::zero(),
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
};

// Commit the anchor change immediately: if the cold database ops fail they can always be
// retried, and we can't do them atomically with this change anyway.
self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?;

// Stage freezer data for deletion. Do not bother loading and deserializing values as this
// wastes time and is less schema-agnostic. My hope is that this method will be useful for
// migrating to the tree-states schema (delete everything in the freezer then start afresh).
let mut cold_ops = vec![];

let columns = [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconStateRoots,
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
];

for column in columns {
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
column.as_str(),
&key,
)));
}
}

// XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as
// the current schema performs reads as part of `store_cold_state`. This can be deleted
// once the target schema is tree-states. If the process is killed before the genesis state
// is written this can be fixed by re-running.
info!(
self.log,
"Deleting historic states";
"num_kv" => cold_ops.len(),
);
self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?;

// If we just deleted the the genesis state, re-store it using the *current* schema, which
// may be different from the schema of the genesis state we just deleted.
if self.get_split_slot() > 0 {
info!(
self.log,
"Re-storing genesis state";
"state_root" => ?genesis_state_root,
);
self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?;
self.cold_db.do_atomically(cold_ops)?;
}

Ok(())
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down
56 changes: 30 additions & 26 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::*;
use crate::hot_cold_store::HotColdDBError;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
Expand Down Expand Up @@ -170,52 +169,49 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
for (start_key, end_key) in [
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
Ok(())
}

/// Iterate through all keys and values in a particular column.
fn iter_column(&self, column: DBColumn) -> ColumnIter {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K> {
let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from));

let iter = self.db.iter(self.read_options());
iter.seek(&start_key);

Box::new(
iter.take_while(move |(key, _)| key.matches_column(column))
.map(move |(bytes_key, value)| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok((key, value))
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
Ok((K::from_bytes(key)?, value))
}),
)
}

/// Iterate through all keys and values in a particular column.
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()]));

let iter = self.db.keys_iter(self.read_options());
iter.seek(&start_key);

Box::new(
iter.take_while(move |key| key.matches_column(column))
.map(move |bytes_key| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok(key)
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
K::from_bytes(key)
}),
)
}
Expand All @@ -224,12 +220,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}

/// Used for keying leveldb.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct BytesKey {
key: Vec<u8>,
}

impl Key for BytesKey {
impl db_key::Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
Expand All @@ -245,12 +241,20 @@ impl BytesKey {
self.key.starts_with(column.as_bytes())
}

/// Remove the column from a key, returning its `Hash256` portion.
/// Remove the column from a 32 byte key, yielding the `Hash256` key.
pub fn remove_column(&self, column: DBColumn) -> Option<Hash256> {
let key = self.remove_column_variable(column)?;
(column.key_size() == 32).then(|| Hash256::from_slice(key))
}

/// Remove the column from a key.
///
/// Will return `None` if the value doesn't match the column or has the wrong length.
pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == 32 {
return Some(Hash256::from_slice(subkey));
if subkey.len() == column.key_size() {
return Some(subkey);
}
}
None
Expand Down
62 changes: 54 additions & 8 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
pub use types::*;

pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a, K> = Box<dyn Iterator<Item = Result<K, Error>> + 'a>;

pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// Retrieve some bytes in `column` with `key`.
Expand Down Expand Up @@ -80,15 +80,34 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
fn compact(&self) -> Result<(), Error>;

/// Iterate through all keys and values in a particular column.
fn iter_column(&self, _column: DBColumn) -> ColumnIter {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
self.iter_column_from(column, &vec![0; column.key_size()])
}

/// Iterate through all keys and values in a column from a given starting point.
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K>;

/// Iterate through all keys in a particular column.
fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
}

pub trait Key: Sized + 'static {
fn from_bytes(key: &[u8]) -> Result<Self, Error>;
}

impl Key for Hash256 {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
if key.len() == 32 {
Ok(Hash256::from_slice(key))
} else {
Err(Error::InvalidKey)
}
}
}

impl Key for Vec<u8> {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
Ok(key.to_vec())
}
}

Expand Down Expand Up @@ -230,6 +249,33 @@ impl DBColumn {
pub fn as_bytes(self) -> &'static [u8] {
self.as_str().as_bytes()
}

/// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes.
///
/// This function returns the number of bytes used by keys in a given column.
pub fn key_size(self) -> usize {
match self {
Self::BeaconMeta
| Self::BeaconBlock
| Self::BeaconState
| Self::BeaconStateSummary
| Self::BeaconStateTemporary
| Self::ExecPayload
| Self::BeaconChain
| Self::OpPool
| Self::Eth1Cache
| Self::ForkChoice
| Self::PubkeyCache
| Self::BeaconRestorePoint
| Self::DhtEnrs
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconStateRoots
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes => 8,
}
}
}

/// An item that may stored in a `Store` by serializing and deserializing from bytes.
Expand Down
Loading