Skip to content

Commit

Permalink
Add database schema versioning (#1688)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes #673

## Proposed Changes

Store a schema version in the database so that future releases can check they're running against a compatible database version. This would also enable automatic migration on breaking database changes, but that's left as future work.

The database config is also stored in the database so that the `slots_per_restore_point` value can be checked for consistency, which closes #673
  • Loading branch information
michaelsproul committed Sep 30, 2020
1 parent 29709c5 commit 1ad28c0
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 42 deletions.
22 changes: 10 additions & 12 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// validator pubkey cache.
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

pub const BEACON_CHAIN_DB_KEY: [u8; 32] = [0; 32];
pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32];
pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32];
pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32];
// These keys are all zero because they get stored in different columns, see `DBColumn` type.
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero();
pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero();

/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
Expand Down Expand Up @@ -260,7 +261,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let fork_choice = self.fork_choice.read();

self.store.put_item(
&Hash256::from_slice(&FORK_CHOICE_DB_KEY),
&FORK_CHOICE_DB_KEY,
&PersistedForkChoice {
fork_choice: fork_choice.to_persisted(),
fork_choice_store: fork_choice.fc_store().to_persisted(),
Expand All @@ -272,8 +273,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::stop_timer(fork_choice_timer);
let head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);

self.store
.put_item(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY), &persisted_head)?;
self.store.put_item(&BEACON_CHAIN_DB_KEY, &persisted_head)?;

metrics::stop_timer(head_timer);

Expand All @@ -290,7 +290,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL);

self.store.put_item(
&Hash256::from_slice(&OP_POOL_DB_KEY),
&OP_POOL_DB_KEY,
&PersistedOperationPool::from_operation_pool(&self.op_pool),
)?;

Expand All @@ -302,10 +302,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL);

if let Some(eth1_chain) = self.eth1_chain.as_ref() {
self.store.put_item(
&Hash256::from_slice(&ETH1_CACHE_DB_KEY),
&eth1_chain.as_ssz_container(),
)?;
self.store
.put_item(&ETH1_CACHE_DB_KEY, &eth1_chain.as_ssz_container())?;
}

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ where
.ok_or_else(|| "get_persisted_eth1_backend requires a store.".to_string())?;

store
.get_item::<SszEth1>(&Hash256::from_slice(&ETH1_CACHE_DB_KEY))
.get_item::<SszEth1>(&ETH1_CACHE_DB_KEY)
.map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e))
}

Expand All @@ -241,7 +241,7 @@ where
.ok_or_else(|| "store_contains_beacon_chain requires a store.".to_string())?;

Ok(store
.get_item::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.is_some())
}
Expand Down Expand Up @@ -272,15 +272,15 @@ where
.ok_or_else(|| "resume_from_db requires a store.".to_string())?;

let chain = store
.get_item::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.ok_or_else(|| {
"No persisted beacon chain found in store. Try purging the beacon chain database."
.to_string()
})?;

let persisted_fork_choice = store
.get_item::<PersistedForkChoice>(&Hash256::from_slice(&FORK_CHOICE_DB_KEY))
.get_item::<PersistedForkChoice>(&FORK_CHOICE_DB_KEY)
.map_err(|e| format!("DB error when reading persisted fork choice: {:?}", e))?
.ok_or_else(|| "No persisted fork choice present in database.".to_string())?;

Expand All @@ -307,7 +307,7 @@ where

self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&Hash256::from_slice(&OP_POOL_DB_KEY))
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
.map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))?
.map(PersistedOperationPool::into_operation_pool)
.unwrap_or_else(OperationPool::new),
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,10 @@ fn roundtrip_operation_pool() {
.persist_op_pool()
.expect("should persist op pool");

let key = Hash256::from_slice(&OP_POOL_DB_KEY);
let restored_op_pool = harness
.chain
.store
.get_item::<PersistedOperationPool<MinimalEthSpec>>(&key)
.get_item::<PersistedOperationPool<MinimalEthSpec>>(&OP_POOL_DB_KEY)
.expect("should read db")
.expect("should find op pool")
.into_operation_pool();
Expand Down
18 changes: 7 additions & 11 deletions beacon_node/network/src/persisted_dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use std::sync::Arc;
use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem};
use types::{EthSpec, Hash256};

/// 32-byte key for accessing the `DhtEnrs`.
pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE";
/// 32-byte key for accessing the `DhtEnrs`. All zero because `DhtEnrs` has its own column.
pub const DHT_DB_KEY: Hash256 = Hash256::zero();

pub fn load_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
) -> Vec<Enr> {
// Load DHT from store
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
match store.get_item(&key) {
match store.get_item(&DHT_DB_KEY) {
Ok(Some(p)) => {
let p: PersistedDht = p;
p.enrs
Expand All @@ -25,9 +24,7 @@ pub fn persist_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
enrs: Vec<Enr>,
) -> Result<(), store::Error> {
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
store.put_item(&key, &PersistedDht { enrs })?;
Ok(())
store.put_item(&DHT_DB_KEY, &PersistedDht { enrs })
}

/// Wrapper around DHT for persistence to disk.
Expand Down Expand Up @@ -61,7 +58,7 @@ mod tests {
use std::str::FromStr;
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use types::{ChainSpec, Hash256, MinimalEthSpec};
use types::{ChainSpec, MinimalEthSpec};
#[test]
fn test_persisted_dht() {
let log = NullLoggerBuilder.build().unwrap();
Expand All @@ -71,11 +68,10 @@ mod tests {
MemoryStore<MinimalEthSpec>,
> = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap();
let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()];
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
store
.put_item(&key, &PersistedDht { enrs: enrs.clone() })
.put_item(&DHT_DB_KEY, &PersistedDht { enrs: enrs.clone() })
.unwrap();
let dht: PersistedDht = store.get_item(&key).unwrap().unwrap();
let dht: PersistedDht = store.get_item(&DHT_DB_KEY).unwrap().unwrap();
assert_eq!(dht.enrs, enrs);
}
}
3 changes: 2 additions & 1 deletion beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.long("slots-per-restore-point")
.value_name("SLOT_COUNT")
.help("Specifies how often a freezer DB restore point should be stored. \
DO NOT DECREASE AFTER INITIALIZATION. [default: 2048 (mainnet) or 64 (minimal)]")
Cannot be changed after initialization. \
[default: 2048 (mainnet) or 64 (minimal)]")
.takes_value(true)
)
.arg(
Expand Down
36 changes: 35 additions & 1 deletion beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{EthSpec, MinimalEthSpec};

pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;

/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub struct StoreConfig {
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
}

#[derive(Debug, Clone)]
pub enum StoreConfigError {
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
}

impl Default for StoreConfig {
fn default() -> Self {
Self {
Expand All @@ -22,3 +30,29 @@ impl Default for StoreConfig {
}
}
}

impl StoreConfig {
pub fn check_compatibility(&self, on_disk_config: &Self) -> Result<(), StoreConfigError> {
if self.slots_per_restore_point != on_disk_config.slots_per_restore_point {
return Err(StoreConfigError::MismatchedSlotsPerRestorePoint {
config: self.slots_per_restore_point,
on_disk: on_disk_config.slots_per_restore_point,
});
}
Ok(())
}
}

impl StoreItem for StoreConfig {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
8 changes: 8 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::chunked_vector::ChunkError;
use crate::config::StoreConfigError;
use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError;
use types::{BeaconStateError, Hash256, Slot};
Expand All @@ -17,6 +18,7 @@ pub enum Error {
BlockNotFound(Hash256),
NoContinuationData,
SplitPointModified(Slot, Slot),
ConfigError(StoreConfigError),
}

impl From<DecodeError> for Error {
Expand Down Expand Up @@ -49,6 +51,12 @@ impl From<DBError> for Error {
}
}

impl From<StoreConfigError> for Error {
fn from(e: StoreConfigError) -> Error {
Error::ConfigError(e)
}
}

#[derive(Debug)]
pub struct DBError {
pub message: String,
Expand Down
63 changes: 54 additions & 9 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
Expand All @@ -27,9 +30,6 @@ use std::path::Path;
use std::sync::Arc;
use types::*;

/// 32-byte key for accessing the `split` of the freezer DB.
pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";

/// Defines how blocks should be replayed on states.
#[derive(PartialEq)]
pub enum BlockReplay {
Expand All @@ -46,6 +46,8 @@ pub enum BlockReplay {
/// intermittent "restore point" states pre-finalization.
#[derive(Debug)]
pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The schema version. Loaded from disk on initialization.
schema_version: SchemaVersion,
/// The slot and state root at the point where the database is split between hot and cold.
///
/// States with slots less than `split.slot` are in the cold DB, while states with slots
Expand All @@ -70,6 +72,10 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {

#[derive(Debug, PartialEq)]
pub enum HotColdDBError {
UnsupportedSchemaVersion {
software_version: SchemaVersion,
disk_version: SchemaVersion,
},
/// Recoverable error indicating that the database freeze point couldn't be updated
/// due to the finalized block not lying on an epoch boundary (should be infrequent).
FreezeSlotUnaligned(Slot),
Expand Down Expand Up @@ -106,6 +112,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;

let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
Expand Down Expand Up @@ -134,6 +141,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;

let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
Expand All @@ -144,12 +152,33 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
_phantom: PhantomData,
};

// Ensure that the schema version of the on-disk database matches the software.
// In the future, this would be the spot to hook in auto-migration, etc.
if let Some(schema_version) = db.load_schema_version()? {
if schema_version != CURRENT_SCHEMA_VERSION {
return Err(HotColdDBError::UnsupportedSchemaVersion {
software_version: CURRENT_SCHEMA_VERSION,
disk_version: schema_version,
}
.into());
}
} else {
db.store_schema_version(CURRENT_SCHEMA_VERSION)?;
}

// Ensure that any on-disk config is compatible with the supplied config.
if let Some(disk_config) = db.load_config()? {
db.config.check_compatibility(&disk_config)?;
}
db.store_config()?;

// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
info!(
db.log,
"Hot-Cold DB initialized";
"version" => db.schema_version.0,
"split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root)
);
Expand Down Expand Up @@ -744,11 +773,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
* self.config.slots_per_restore_point
}

/// Load the database schema version from disk.
fn load_schema_version(&self) -> Result<Option<SchemaVersion>, Error> {
self.hot_db.get(&SCHEMA_VERSION_KEY)
}

/// Store the database schema version.
fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> {
self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version)
}

/// Load previously-stored config from disk.
fn load_config(&self) -> Result<Option<StoreConfig>, Error> {
self.hot_db.get(&CONFIG_KEY)
}

/// Write the config to disk.
fn store_config(&self) -> Result<(), Error> {
self.hot_db.put(&CONFIG_KEY, &self.config)
}

/// Load the split point from disk.
fn load_split(&self) -> Result<Option<Split>, Error> {
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
let split: Option<Split> = self.hot_db.get(&key)?;
Ok(split)
self.hot_db.get(&SPLIT_KEY)
}

/// Load the state root of a restore point.
Expand Down Expand Up @@ -927,9 +974,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store
.hot_db
.put_sync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?;
store.hot_db.put_sync(&SPLIT_KEY, &split)?;

// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
Expand Down
Loading

0 comments on commit 1ad28c0

Please sign in to comment.