diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index d3906bbe7fc..4d31bafbccb 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -32,6 +32,7 @@ use near_primitives::version::{ MIN_PROTOCOL_VERSION_NEP_92_FIX, }; use near_primitives::views::{QueryRequest, QueryResponse}; +use near_store::flat::{FlatStorage, FlatStorageStatus}; use near_store::{PartialStorage, ShardTries, Store, StoreUpdate, Trie, WrappedTrieChanges}; pub use near_epoch_manager::EpochManagerAdapter; @@ -306,6 +307,30 @@ pub trait RuntimeAdapter: Send + Sync { fn get_flat_storage_manager(&self) -> Option; + fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option; + + fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus; + + /// Creates flat storage state for given shard, assuming that all flat storage data + /// is already stored in DB. + /// TODO (#7327): consider returning flat storage creation errors here + fn create_flat_storage_for_shard(&self, shard_uid: ShardUId); + + /// Removes flat storage state for shard, if it exists. + /// Used to clear old flat storage data from disk and memory before syncing to newer state. + fn remove_flat_storage_for_shard( + &self, + shard_uid: ShardUId, + epoch_id: &EpochId, + ) -> Result<(), Error>; + + fn set_flat_storage_for_genesis( + &self, + genesis_block: &CryptoHash, + genesis_block_height: BlockHeight, + genesis_epoch_id: &EpochId, + ) -> Result; + /// Validates a given signed transaction. /// If the state root is given, then the verification will use the account. Otherwise it will /// only validate the transaction math, limits and signatures. diff --git a/core/store/src/flat/inlining_migration.rs b/core/store/src/flat/inlining_migration.rs index 0926905b61a..c12a4c587ba 100644 --- a/core/store/src/flat/inlining_migration.rs +++ b/core/store/src/flat/inlining_migration.rs @@ -8,7 +8,6 @@ use crossbeam::channel; use itertools::Itertools; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; -use near_primitives::state::FlatStateValue; use tracing::{debug, info}; use crate::flat::store_helper::set_flat_state_values_inlining_migration_status; @@ -22,7 +21,7 @@ use super::store_helper::{ decode_flat_state_db_key, get_flat_state_values_inlining_migration_status, }; use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD}; -use super::FlatStorageManager; +use super::{FlatStateValue, FlatStorageManager}; pub struct FlatStateValuesInliningMigrationHandle { handle: JoinHandle<()>, @@ -285,11 +284,10 @@ mod tests { use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::hash::hash; use near_primitives::shard_layout::ShardLayout; - use near_primitives::state::FlatStateValue; use crate::flat::store_helper::encode_flat_state_db_key; use crate::flat::types::INLINE_DISK_VALUE_THRESHOLD; - use crate::flat::FlatStorageManager; + use crate::flat::{FlatStateValue, FlatStorageManager}; use crate::{DBCol, NodeStorage, TrieCachingStorage}; use super::inline_flat_state_values; diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index b3cd1320e98..072c819c0cb 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -1,59 +1,55 @@ //! This file contains helper functions for accessing flat storage data in DB //! TODO(#8577): remove this file and move functions to the corresponding structs -use std::io; - use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY; use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta}; use crate::flat::types::FlatStorageError; use crate::{DBCol, Store, StoreUpdate}; -use borsh::BorshDeserialize; +use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::ShardUId; -use near_primitives::state::FlatStateValue; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; use super::delta::{FlatStateDelta, FlatStateDeltaMetadata}; -use super::types::{ - FlatStateIterator, FlatStateValuesInliningMigrationStatus, FlatStorageResult, FlatStorageStatus, -}; +use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus}; + +/// Prefixes for keys in `FlatStateMisc` DB column. +pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD"; +pub const FLAT_STATE_CREATION_STATUS_KEY_PREFIX: &[u8; 6] = b"STATUS"; pub fn get_delta_changes( store: &Store, shard_uid: ShardUId, block_hash: CryptoHash, -) -> FlatStorageResult> { +) -> Result, FlatStorageError> { let key = KeyForFlatStateDelta { shard_uid, block_hash }; - store.get_ser::(DBCol::FlatStateChanges, &key.to_bytes()).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read delta changes for {key:?}: {err}" - )) - }) + Ok(store + .get_ser::(DBCol::FlatStateChanges, &key.to_bytes()) + .map_err(|_| FlatStorageError::StorageInternalError)?) } pub fn get_all_deltas_metadata( store: &Store, shard_uid: ShardUId, -) -> FlatStorageResult> { +) -> Result, FlatStorageError> { store .iter_prefix_ser(DBCol::FlatStateDeltaMetadata, &shard_uid.to_bytes()) - .map(|res| { - res.map(|(_, value)| value).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read delta metadata: {err}" - )) - }) - }) + .map(|res| res.map(|(_, value)| value).map_err(|_| FlatStorageError::StorageInternalError)) .collect() } -pub fn set_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, delta: &FlatStateDelta) { +pub fn set_delta( + store_update: &mut StoreUpdate, + shard_uid: ShardUId, + delta: &FlatStateDelta, +) -> Result<(), FlatStorageError> { let key = KeyForFlatStateDelta { shard_uid, block_hash: delta.metadata.block.hash }.to_bytes(); store_update .set_ser(DBCol::FlatStateChanges, &key, &delta.changes) - .expect("Borsh should not have failed here"); + .map_err(|_| FlatStorageError::StorageInternalError)?; store_update .set_ser(DBCol::FlatStateDeltaMetadata, &key, &delta.metadata) - .expect("Borsh should not have failed here"); + .map_err(|_| FlatStorageError::StorageInternalError)?; + Ok(()) } pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_hash: CryptoHash) { @@ -62,19 +58,11 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h store_update.delete(DBCol::FlatStateDeltaMetadata, &key); } -fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) { +pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { let key_from = shard_uid.to_bytes(); let key_to = ShardUId::next_shard_prefix(&key_from); - store_update.delete_range(col, &key_from, &key_to); -} - -pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { - remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges); - remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata); -} - -pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { - remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState); + store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to); + store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to); } pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { @@ -84,60 +72,50 @@ pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec io::Result<(ShardUId, Vec)> { +pub(crate) fn decode_flat_state_db_key( + key: &Box<[u8]>, +) -> Result<(ShardUId, Vec), StorageError> { if key.len() < 8 { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("expected FlatState key length to be at least 8: {key:?}"), - )); + return Err(StorageError::StorageInconsistentState(format!( + "Found key in flat storage with length < 8: {key:?}" + ))); } let (shard_uid_bytes, trie_key) = key.split_at(8); - let shard_uid = shard_uid_bytes.try_into().map_err(|err| { - io::Error::new( - io::ErrorKind::Other, - format!("failed to decode shard_uid as part of FlatState key: {err}"), - ) + let shard_uid = shard_uid_bytes.try_into().map_err(|_| { + StorageError::StorageInconsistentState(format!( + "Incorrect raw shard uid: {shard_uid_bytes:?}" + )) })?; Ok((shard_uid, trie_key.to_vec())) } pub fn get_flat_state_values_inlining_migration_status( store: &Store, -) -> FlatStorageResult { +) -> Result { store .get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY) .map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty)) - .map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read FlatState values inlining migration status: {err}" - )) - }) + .map_err(|_| FlatStorageError::StorageInternalError) } pub fn set_flat_state_values_inlining_migration_status( store: &Store, status: FlatStateValuesInliningMigrationStatus, -) -> FlatStorageResult<()> { +) -> Result<(), FlatStorageError> { let mut store_update = store.store_update(); store_update .set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status) - .expect("Borsh should not have failed here"); - store_update.commit().map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to commit FlatState values inlining migration status: {err}" - )) - }) + .map_err(|_| FlatStorageError::StorageInternalError)?; + store_update.commit().map_err(|_| FlatStorageError::StorageInternalError) } pub(crate) fn get_flat_state_value( store: &Store, shard_uid: ShardUId, key: &[u8], -) -> FlatStorageResult> { +) -> Result, FlatStorageError> { let db_key = encode_flat_state_db_key(shard_uid, key); - store.get_ser(DBCol::FlatState, &db_key).map_err(|err| { - FlatStorageError::StorageInternalError(format!("failed to read FlatState value: {err}")) - }) + store.get_ser(DBCol::FlatState, &db_key).map_err(|_| FlatStorageError::StorageInternalError) } // TODO(#8577): make pub(crate) once flat storage creator is moved inside `flat` module. @@ -146,28 +124,21 @@ pub fn set_flat_state_value( shard_uid: ShardUId, key: Vec, value: Option, -) { +) -> Result<(), FlatStorageError> { let db_key = encode_flat_state_db_key(shard_uid, &key); match value { Some(value) => store_update .set_ser(DBCol::FlatState, &db_key, &value) - .expect("Borsh should not have failed here"), - None => store_update.delete(DBCol::FlatState, &db_key), + .map_err(|_| FlatStorageError::StorageInternalError), + None => Ok(store_update.delete(DBCol::FlatState, &db_key)), } } -pub fn get_flat_storage_status( - store: &Store, - shard_uid: ShardUId, -) -> FlatStorageResult { +pub fn get_flat_storage_status(store: &Store, shard_uid: ShardUId) -> FlatStorageStatus { store .get_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes()) - .map(|status| status.unwrap_or(FlatStorageStatus::Empty)) - .map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "failed to read flat storage status: {err}" - )) - }) + .expect("Error reading flat head from storage") + .unwrap_or(FlatStorageStatus::Empty) } pub fn set_flat_storage_status( @@ -177,19 +148,19 @@ pub fn set_flat_storage_status( ) { store_update .set_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes(), &status) - .expect("Borsh should not have failed here") + .expect("Borsh should not fail") } /// Returns iterator over flat storage entries for a given shard and range of /// state keys. `None` means that there is no bound in respective direction. /// It reads data only from `FlatState` column which represents the state at -/// flat storage head. Reads only commited changes. +/// flat storage head. pub fn iter_flat_state_entries<'a>( shard_uid: ShardUId, store: &'a Store, from: Option<&[u8]>, to: Option<&[u8]>, -) -> FlatStateIterator<'a> { +) -> impl Iterator, Box<[u8]>)> + 'a { // If left direction is unbounded, encoded `shard_uid` serves as the // smallest possible key in DB for the shard. let db_key_from = match from { @@ -203,68 +174,24 @@ pub fn iter_flat_state_entries<'a>( Some(to) => encode_flat_state_db_key(shard_uid, to), None => ShardUId::next_shard_prefix(&shard_uid.to_bytes()).to_vec(), }; - let iter = - store.iter_range(DBCol::FlatState, Some(&db_key_from), Some(&db_key_to)).map(|result| { - match result { - Ok((key, value)) => Ok(( - decode_flat_state_db_key(&key) - .map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "invalid FlatState key format: {err}" - )) - })? - .1, - FlatStateValue::try_from_slice(&value).map_err(|err| { - FlatStorageError::StorageInternalError(format!( - "invalid FlatState value format: {err}" - )) - })?, - )), - Err(err) => Err(FlatStorageError::StorageInternalError(format!( - "FlatState iterator error: {err}" - ))), + store.iter_range(DBCol::FlatState, Some(&db_key_from), Some(&db_key_to)).filter_map( + move |result| { + if let Ok((key, value)) = result { + let (_, trie_key) = decode_flat_state_db_key(&key).unwrap(); + return Some((trie_key, value)); } - }); - Box::new(iter) -} - -#[cfg(test)] -mod tests { - use crate::flat::store_helper::set_flat_state_value; - use crate::test_utils::create_test_store; - use near_primitives::shard_layout::ShardUId; - use near_primitives::state::FlatStateValue; - - #[test] - fn iter_flat_state_entries() { - // Setup shards and store - let store = create_test_store(); - let shard_uids = [0, 1, 2].map(|id| ShardUId { version: 0, shard_id: id }); - - for (i, shard_uid) in shard_uids.iter().enumerate() { - let mut store_update = store.store_update(); - let key: Vec = vec![0, 1, i as u8]; - let val: Vec = vec![0, 1, 2, i as u8]; - - // Add value to FlatState - set_flat_state_value( - &mut store_update, - *shard_uid, - key.clone(), - Some(FlatStateValue::inlined(&val)), - ); - - store_update.commit().unwrap(); - } - - for (i, shard_uid) in shard_uids.iter().enumerate() { - let entries: Vec<_> = - super::iter_flat_state_entries(*shard_uid, &store, None, None).collect(); - assert_eq!(entries.len(), 1); - let key: Vec = vec![0, 1, i as u8]; - let val: Vec = vec![0, 1, 2, i as u8]; - - assert_eq!(entries, vec![Ok((key, FlatStateValue::inlined(&val)))]); - } - } + return None; + }, + ) +} + +/// Currently all the data in flat storage is 'together' - so we have to parse the key, +/// to see if this element belongs to this shard. +pub fn key_belongs_to_shard( + key: &Box<[u8]>, + shard_layout: &ShardLayout, + shard_id: u64, +) -> Result { + let (key_shard_uid, _) = decode_flat_state_db_key(key)?; + Ok(key_shard_uid.version == shard_layout.version() && key_shard_uid.shard_id as u64 == shard_id) } diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index a10449ab599..541024879f5 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -60,6 +60,13 @@ pub enum FlatStateValuesInliningMigrationStatus { Finished, } +#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)] +pub enum FlatStateValuesInliningMigrationStatus { + Empty, + InProgress, + Finished, +} + #[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)] pub enum FlatStorageStatus { /// Flat Storage is not supported. diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index ce0b91c571f..cae9feef218 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -10,7 +10,7 @@ use cold_storage::ColdStoreLoopHandle; use near_async::actix::AddrWithAutoSpanContextExt; use near_async::messaging::{IntoSender, LateBoundSender}; use near_async::time; -use near_chain::state_snapshot_actor::{get_make_snapshot_callback, StateSnapshotActor}; +use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor}; @@ -18,7 +18,7 @@ use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; use near_network::PeerManagerActor; use near_primitives::block::GenesisId; -use near_store::flat::FlatStorageManager; +use near_store::flat::FlatStateValuesInliningMigrationHandle; use near_store::metadata::DbKind; use near_store::metrics::spawn_db_metrics_loop; use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError}; @@ -152,9 +152,6 @@ pub fn open_storage(home_dir: &Path, near_config: &mut NearConfig) -> anyhow::Re Err(StoreOpenerError::MigrationError(err)) => { Err(err) }, - Err(StoreOpenerError::CheckpointError(err)) => { - Err(err) - }, }.with_context(|| format!("unable to open database at {}", opener.path().display()))?; near_config.config.archive = storage.is_archive()?; @@ -198,6 +195,9 @@ pub struct NearNode { pub cold_store_loop_handle: Option, /// Contains handles to background threads that may be dumping state to S3. pub state_sync_dump_handle: Option, + /// A handle to control background flat state values inlining migration. + /// Needed temporarily, will be removed after the migration is completed. + pub flat_state_migration_handle: Option, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -271,11 +271,6 @@ pub fn start_with_config_and_synchronization( let client_adapter_for_shards_manager = Arc::new(LateBoundSender::default()); let adv = near_client::adversarial::Controls::new(config.client_config.archive); - let state_snapshot_actor = Arc::new( - StateSnapshotActor::new(FlatStorageManager::new(storage.get_hot_store()), runtime.clone()) - .start(), - ); - let view_client = start_view_client( config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), chain_genesis.clone(), @@ -297,10 +292,6 @@ pub fn start_with_config_and_synchronization( shards_manager_adapter.as_sender(), config.validator_signer.clone(), telemetry, - Some(get_make_snapshot_callback( - state_snapshot_actor, - FlatStorageManager::new(storage.get_hot_store()), - )), shutdown_signal, adv, config_updater, @@ -317,6 +308,18 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); + let flat_state_migration_handle = + if let Some(flat_storage_manager) = runtime.get_flat_storage_manager() { + let handle = FlatStateValuesInliningMigrationHandle::start_background_migration( + storage.get_hot_store(), + flat_storage_manager, + config.client_config.client_background_migration_threads, + ); + Some(handle) + } else { + None + }; + let state_sync_dump_handle = spawn_state_sync_dump( &config.client_config, chain_genesis, @@ -380,6 +383,7 @@ pub fn start_with_config_and_synchronization( arbiters, cold_store_loop_handle, state_sync_dump_handle, + flat_state_migration_handle, }) } diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 0e874d3a2a3..2c1be30b558 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -864,6 +864,10 @@ impl RuntimeAdapter for NightshadeRuntime { Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root)) } + fn get_flat_storage_manager(&self) -> Option { + Some(self.flat_storage_manager.clone()) + } + fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option { self.flat_storage_manager.get_flat_storage_for_shard(shard_uid) } diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index b99adae30d7..6464fac8d5c 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -11,6 +11,7 @@ use near_store::flat::{ }; use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener}; use nearcore::{load_config, NearConfig, NightshadeRuntime}; +use std::sync::atomic::AtomicBool; use std::{path::PathBuf, sync::Arc, time::Duration}; use tqdm::tqdm; @@ -318,6 +319,7 @@ impl FlatStorageCommand { inline_flat_state_values( store, &flat_storage_manager, + &AtomicBool::new(true), cmd.num_threads, cmd.batch_size, );