diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3bf5ae282d4..d189b01e2de 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -66,10 +66,11 @@ pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); /// validator pubkey cache. pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); -pub const BEACON_CHAIN_DB_KEY: [u8; 32] = [0; 32]; -pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32]; -pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32]; -pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32]; +// These keys are all zero because they get stored in different columns, see `DBColumn` type. +pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero(); +pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero(); +pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero(); +pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero(); /// The result of a chain segment processing. pub enum ChainSegmentResult { @@ -260,7 +261,7 @@ impl BeaconChain { let fork_choice = self.fork_choice.read(); self.store.put_item( - &Hash256::from_slice(&FORK_CHOICE_DB_KEY), + &FORK_CHOICE_DB_KEY, &PersistedForkChoice { fork_choice: fork_choice.to_persisted(), fork_choice_store: fork_choice.fc_store().to_persisted(), @@ -272,8 +273,7 @@ impl BeaconChain { metrics::stop_timer(fork_choice_timer); let head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); - self.store - .put_item(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY), &persisted_head)?; + self.store.put_item(&BEACON_CHAIN_DB_KEY, &persisted_head)?; metrics::stop_timer(head_timer); @@ -290,7 +290,7 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL); self.store.put_item( - &Hash256::from_slice(&OP_POOL_DB_KEY), + &OP_POOL_DB_KEY, &PersistedOperationPool::from_operation_pool(&self.op_pool), )?; @@ -302,10 +302,8 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL); if let Some(eth1_chain) = self.eth1_chain.as_ref() { - self.store.put_item( - &Hash256::from_slice(Ð1_CACHE_DB_KEY), - ð1_chain.as_ssz_container(), - )?; + self.store + .put_item(Ð1_CACHE_DB_KEY, ð1_chain.as_ssz_container())?; } Ok(()) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ff47c7a2b81..5dbabcdd862 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -229,7 +229,7 @@ where .ok_or_else(|| "get_persisted_eth1_backend requires a store.".to_string())?; store - .get_item::(&Hash256::from_slice(Ð1_CACHE_DB_KEY)) + .get_item::(Ð1_CACHE_DB_KEY) .map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e)) } @@ -241,7 +241,7 @@ where .ok_or_else(|| "store_contains_beacon_chain requires a store.".to_string())?; Ok(store - .get_item::(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY)) + .get_item::(&BEACON_CHAIN_DB_KEY) .map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))? .is_some()) } @@ -272,7 +272,7 @@ where .ok_or_else(|| "resume_from_db requires a store.".to_string())?; let chain = store - .get_item::(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY)) + .get_item::(&BEACON_CHAIN_DB_KEY) .map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))? .ok_or_else(|| { "No persisted beacon chain found in store. Try purging the beacon chain database." @@ -280,7 +280,7 @@ where })?; let persisted_fork_choice = store - .get_item::(&Hash256::from_slice(&FORK_CHOICE_DB_KEY)) + .get_item::(&FORK_CHOICE_DB_KEY) .map_err(|e| format!("DB error when reading persisted fork choice: {:?}", e))? .ok_or_else(|| "No persisted fork choice present in database.".to_string())?; @@ -307,7 +307,7 @@ where self.op_pool = Some( store - .get_item::>(&Hash256::from_slice(&OP_POOL_DB_KEY)) + .get_item::>(&OP_POOL_DB_KEY) .map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))? .map(PersistedOperationPool::into_operation_pool) .unwrap_or_else(OperationPool::new), diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 721eb409167..cd8b564787c 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -357,11 +357,10 @@ fn roundtrip_operation_pool() { .persist_op_pool() .expect("should persist op pool"); - let key = Hash256::from_slice(&OP_POOL_DB_KEY); let restored_op_pool = harness .chain .store - .get_item::>(&key) + .get_item::>(&OP_POOL_DB_KEY) .expect("should read db") .expect("should find op pool") .into_operation_pool(); diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index 2149324422b..c11fcd44852 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -3,15 +3,14 @@ use std::sync::Arc; use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; use types::{EthSpec, Hash256}; -/// 32-byte key for accessing the `DhtEnrs`. -pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE"; +/// 32-byte key for accessing the `DhtEnrs`. All zero because `DhtEnrs` has its own column. +pub const DHT_DB_KEY: Hash256 = Hash256::zero(); pub fn load_dht, Cold: ItemStore>( store: Arc>, ) -> Vec { // Load DHT from store - let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); - match store.get_item(&key) { + match store.get_item(&DHT_DB_KEY) { Ok(Some(p)) => { let p: PersistedDht = p; p.enrs @@ -25,9 +24,7 @@ pub fn persist_dht, Cold: ItemStore>( store: Arc>, enrs: Vec, ) -> Result<(), store::Error> { - let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); - store.put_item(&key, &PersistedDht { enrs })?; - Ok(()) + store.put_item(&DHT_DB_KEY, &PersistedDht { enrs }) } /// Wrapper around DHT for persistence to disk. @@ -61,7 +58,7 @@ mod tests { use std::str::FromStr; use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; - use types::{ChainSpec, Hash256, MinimalEthSpec}; + use types::{ChainSpec, MinimalEthSpec}; #[test] fn test_persisted_dht() { let log = NullLoggerBuilder.build().unwrap(); @@ -71,11 +68,10 @@ mod tests { MemoryStore, > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(); let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()]; - let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); store - .put_item(&key, &PersistedDht { enrs: enrs.clone() }) + .put_item(&DHT_DB_KEY, &PersistedDht { enrs: enrs.clone() }) .unwrap(); - let dht: PersistedDht = store.get_item(&key).unwrap().unwrap(); + let dht: PersistedDht = store.get_item(&DHT_DB_KEY).unwrap().unwrap(); assert_eq!(dht.enrs, enrs); } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index fd838e03384..2ee3fa41784 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -267,7 +267,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("slots-per-restore-point") .value_name("SLOT_COUNT") .help("Specifies how often a freezer DB restore point should be stored. \ - DO NOT DECREASE AFTER INITIALIZATION. [default: 2048 (mainnet) or 64 (minimal)]") + Cannot be changed after initialization. \ + [default: 2048 (mainnet) or 64 (minimal)]") .takes_value(true) ) .arg( diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index bebddf8fac5..91cf5ec1cb0 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -1,11 +1,14 @@ +use crate::{DBColumn, Error, StoreItem}; use serde_derive::{Deserialize, Serialize}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; use types::{EthSpec, MinimalEthSpec}; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5; /// Database configuration parameters. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] pub struct StoreConfig { /// Number of slots to wait between storing restore points in the freezer database. pub slots_per_restore_point: u64, @@ -13,6 +16,11 @@ pub struct StoreConfig { pub block_cache_size: usize, } +#[derive(Debug, Clone)] +pub enum StoreConfigError { + MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 }, +} + impl Default for StoreConfig { fn default() -> Self { Self { @@ -22,3 +30,29 @@ impl Default for StoreConfig { } } } + +impl StoreConfig { + pub fn check_compatibility(&self, on_disk_config: &Self) -> Result<(), StoreConfigError> { + if self.slots_per_restore_point != on_disk_config.slots_per_restore_point { + return Err(StoreConfigError::MismatchedSlotsPerRestorePoint { + config: self.slots_per_restore_point, + on_disk: on_disk_config.slots_per_restore_point, + }); + } + Ok(()) + } +} + +impl StoreItem for StoreConfig { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 8e9237361c4..622cd2ac73f 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,4 +1,5 @@ use crate::chunked_vector::ChunkError; +use crate::config::StoreConfigError; use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; use types::{BeaconStateError, Hash256, Slot}; @@ -17,6 +18,7 @@ pub enum Error { BlockNotFound(Hash256), NoContinuationData, SplitPointModified(Slot, Slot), + ConfigError(StoreConfigError), } impl From for Error { @@ -49,6 +51,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: StoreConfigError) -> Error { + Error::ConfigError(e) + } +} + #[derive(Debug)] pub struct DBError { pub message: String, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 08e810866f9..55c403aa8a2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -7,6 +7,9 @@ use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; +use crate::metadata::{ + SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY, +}; use crate::metrics; use crate::{ get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, @@ -27,9 +30,6 @@ use std::path::Path; use std::sync::Arc; use types::*; -/// 32-byte key for accessing the `split` of the freezer DB. -pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE"; - /// Defines how blocks should be replayed on states. #[derive(PartialEq)] pub enum BlockReplay { @@ -46,6 +46,8 @@ pub enum BlockReplay { /// intermittent "restore point" states pre-finalization. #[derive(Debug)] pub struct HotColdDB, Cold: ItemStore> { + /// The schema version. Loaded from disk on initialization. + schema_version: SchemaVersion, /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -70,6 +72,10 @@ pub struct HotColdDB, Cold: ItemStore> { #[derive(Debug, PartialEq)] pub enum HotColdDBError { + UnsupportedSchemaVersion { + software_version: SchemaVersion, + disk_version: SchemaVersion, + }, /// Recoverable error indicating that the database freeze point couldn't be updated /// due to the finalized block not lying on an epoch boundary (should be infrequent). FreezeSlotUnaligned(Slot), @@ -106,6 +112,7 @@ impl HotColdDB, MemoryStore> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { + schema_version: CURRENT_SCHEMA_VERSION, split: RwLock::new(Split::default()), cold_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -134,6 +141,7 @@ impl HotColdDB, LevelDB> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { + schema_version: CURRENT_SCHEMA_VERSION, split: RwLock::new(Split::default()), cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, @@ -144,12 +152,33 @@ impl HotColdDB, LevelDB> { _phantom: PhantomData, }; + // Ensure that the schema version of the on-disk database matches the software. + // In the future, this would be the spot to hook in auto-migration, etc. + if let Some(schema_version) = db.load_schema_version()? { + if schema_version != CURRENT_SCHEMA_VERSION { + return Err(HotColdDBError::UnsupportedSchemaVersion { + software_version: CURRENT_SCHEMA_VERSION, + disk_version: schema_version, + } + .into()); + } + } else { + db.store_schema_version(CURRENT_SCHEMA_VERSION)?; + } + + // Ensure that any on-disk config is compatible with the supplied config. + if let Some(disk_config) = db.load_config()? { + db.config.check_compatibility(&disk_config)?; + } + db.store_config()?; + // Load the previous split slot from the database (if any). This ensures we can // stop and restart correctly. if let Some(split) = db.load_split()? { info!( db.log, "Hot-Cold DB initialized"; + "version" => db.schema_version.0, "split_slot" => split.slot, "split_state" => format!("{:?}", split.state_root) ); @@ -744,11 +773,29 @@ impl, Cold: ItemStore> HotColdDB * self.config.slots_per_restore_point } + /// Load the database schema version from disk. + fn load_schema_version(&self) -> Result, Error> { + self.hot_db.get(&SCHEMA_VERSION_KEY) + } + + /// Store the database schema version. + fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { + self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version) + } + + /// Load previously-stored config from disk. + fn load_config(&self) -> Result, Error> { + self.hot_db.get(&CONFIG_KEY) + } + + /// Write the config to disk. + fn store_config(&self) -> Result<(), Error> { + self.hot_db.put(&CONFIG_KEY, &self.config) + } + /// Load the split point from disk. fn load_split(&self) -> Result, Error> { - let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes()); - let split: Option = self.hot_db.get(&key)?; - Ok(split) + self.hot_db.get(&SPLIT_KEY) } /// Load the state root of a restore point. @@ -927,9 +974,7 @@ pub fn migrate_database, Cold: ItemStore>( slot: frozen_head.slot, state_root: frozen_head_root, }; - store - .hot_db - .put_sync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?; + store.hot_db.put_sync(&SPLIT_KEY, &split)?; // Split point is now persisted in the hot database on disk. The in-memory split point // hasn't been modified elsewhere since we keep a write lock on it. It's safe to update diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 27187022686..f249be1f897 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -19,6 +19,7 @@ pub mod hot_cold_store; mod impls; mod leveldb_store; mod memory_store; +mod metadata; mod metrics; mod partial_beacon_state; @@ -153,7 +154,7 @@ pub enum DBColumn { } impl Into<&'static str> for DBColumn { - /// Returns a `&str` that can be used for keying a key-value data base. + /// Returns a `&str` prefix to be added to keys before they hit the key-value database. fn into(self) -> &'static str { match self { DBColumn::BeaconMeta => "bma", diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs new file mode 100644 index 00000000000..2d4733d6362 --- /dev/null +++ b/beacon_node/store/src/metadata.rs @@ -0,0 +1,29 @@ +use crate::{DBColumn, Error, StoreItem}; +use ssz::{Decode, Encode}; +use types::Hash256; + +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(1); + +// All the keys that get stored under the `BeaconMeta` column. +// +// We use `repeat_byte` because it's a const fn. +pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0); +pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1); +pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct SchemaVersion(pub u64); + +impl StoreItem for SchemaVersion { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?)) + } +}