diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 3023f287558..334ddcd7d51 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -2568,7 +2568,8 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::FlatState | DBCol::FlatStateChanges | DBCol::FlatStateDeltaMetadata - | DBCol::FlatStorageStatus => { + | DBCol::FlatStorageStatus + | DBCol::Misc => { unreachable!(); } } diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 664f8b3bc52..c28e5e33589 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -960,6 +960,10 @@ impl RuntimeAdapter for KeyValueRuntime { .get_trie_for_shard(ShardUId { version: 0, shard_id: shard_id as u32 }, state_root)) } + fn get_flat_storage_manager(&self) -> Option { + None + } + fn get_view_trie_for_shard( &self, shard_id: ShardId, diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 2a26a694247..34a2b314661 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -5,6 +5,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use chrono::DateTime; use chrono::Utc; use near_primitives::sandbox::state_patch::SandboxStatePatch; +use near_store::flat::FlatStorageManager; use num_rational::Rational32; use crate::metrics; @@ -299,6 +300,8 @@ pub trait RuntimeAdapter: Send + Sync { state_root: StateRoot, ) -> Result; + fn get_flat_storage_manager(&self) -> Option; + fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option; fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus; diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index f0fcdbdb98a..682a670321d 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -23,7 +23,7 @@ pub enum DBCol { /// - *Rows*: single row `"VERSION"` /// - *Content type*: The version of the database (u32), serialized as JSON. DbVersion, - /// Column that store Misc cells. + /// Column that stores miscellaneous block-related cells. /// - *Rows*: multiple, for example `"GENESIS_JSON_HASH"`, `"HEAD_KEY"`, `"LATEST_KNOWN_KEY"` etc. /// - *Content type*: cell specific. BlockMisc, @@ -273,6 +273,11 @@ pub enum DBCol { /// - *Rows*: `shard_uid` /// - *Column type*: `FlatStorageStatus` FlatStorageStatus, + /// Column to persist pieces of miscellaneous small data. Should only be used to store + /// constant or small (for example per-shard) amount of data. + /// - *Rows*: arbitrary string, see `crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY` for example + /// - *Column type*: arbitrary bytes + Misc, } /// Defines different logical parts of a db key. @@ -422,6 +427,7 @@ impl DBCol { // TODO DBCol::ChallengedBlocks => false, + DBCol::Misc => false, // BlockToCatchup is only needed while syncing and it is not immutable. DBCol::BlocksToCatchup => false, // BlockRefCount is only needed when handling forks and it is not immutable. @@ -479,6 +485,7 @@ impl DBCol { match self { DBCol::DbVersion => &[DBKeyType::StringLiteral], DBCol::BlockMisc => &[DBKeyType::StringLiteral], + DBCol::Misc => &[DBKeyType::StringLiteral], DBCol::Block => &[DBKeyType::BlockHash], DBCol::BlockHeader => &[DBKeyType::BlockHash], DBCol::BlockHeight => &[DBKeyType::BlockHeight], diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 378f06c348f..2b4aa1bdf18 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -20,6 +20,7 @@ pub use self::splitdb::SplitDB; pub use self::slice::DBSlice; pub use self::testdb::TestDB; +// `DBCol::BlockMisc` keys pub const HEAD_KEY: &[u8; 4] = b"HEAD"; pub const TAIL_KEY: &[u8; 4] = b"TAIL"; pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL"; @@ -32,6 +33,10 @@ pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH"; pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS"; pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD"; +// `DBCol::Misc` keys +pub const FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY: &[u8] = + b"FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS"; + #[derive(Default, Debug)] pub struct DBTransaction { pub(crate) ops: Vec, diff --git a/core/store/src/flat/inlining_migration.rs b/core/store/src/flat/inlining_migration.rs index 20ba653cc3e..c12a4c587ba 100644 --- a/core/store/src/flat/inlining_migration.rs +++ b/core/store/src/flat/inlining_migration.rs @@ -1,4 +1,7 @@ use std::collections::HashMap; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::thread::JoinHandle; use borsh::{BorshDeserialize, BorshSerialize}; use crossbeam::channel; @@ -7,16 +10,70 @@ use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use tracing::{debug, info}; +use crate::flat::store_helper::set_flat_state_values_inlining_migration_status; use crate::metrics::flat_state_metrics::inlining_migration::{ FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT, PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT, }; use crate::{DBCol, Store, TrieDBStorage, TrieStorage}; -use super::store_helper::decode_flat_state_db_key; -use super::types::INLINE_DISK_VALUE_THRESHOLD; +use super::store_helper::{ + decode_flat_state_db_key, get_flat_state_values_inlining_migration_status, +}; +use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD}; use super::{FlatStateValue, FlatStorageManager}; +pub struct FlatStateValuesInliningMigrationHandle { + handle: JoinHandle<()>, + keep_running: Arc, +} + +const BACKGROUND_MIGRATION_BATCH_SIZE: usize = 50_000; + +impl FlatStateValuesInliningMigrationHandle { + pub fn start_background_migration( + store: Store, + flat_storage_manager: FlatStorageManager, + read_state_threads: usize, + ) -> Self { + let keep_running = Arc::new(AtomicBool::new(true)); + let keep_runnning_clone = keep_running.clone(); + let handle = std::thread::spawn(move || { + let status = get_flat_state_values_inlining_migration_status(&store) + .expect("failed to read fs migration status"); + info!(target: "store", ?status, "Read FlatState values inlining migration status"); + if status == FlatStateValuesInliningMigrationStatus::Finished { + return; + } + set_flat_state_values_inlining_migration_status( + &store, + FlatStateValuesInliningMigrationStatus::InProgress, + ) + .expect("failed to set fs migration in-progress status"); + let completed = inline_flat_state_values( + store.clone(), + &flat_storage_manager, + &keep_running, + read_state_threads, + BACKGROUND_MIGRATION_BATCH_SIZE, + ); + if completed { + set_flat_state_values_inlining_migration_status( + &store, + FlatStateValuesInliningMigrationStatus::Finished, + ) + .expect("failed to set fs migration finished status"); + } + }); + Self { handle, keep_running: keep_runnning_clone } + } + + pub fn stop(self) { + self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); + self.handle.join().expect("join should not fail here"); + } +} + struct ReadValueRequest { shard_uid: ShardUId, value_hash: CryptoHash, @@ -33,7 +90,7 @@ struct StateValueReader { pending_requests: usize, value_request_send: channel::Sender, value_response_recv: channel::Receiver, - join_handles: Vec>, + join_handles: Vec>, } impl StateValueReader { @@ -110,16 +167,23 @@ impl StateValueReader { pub fn inline_flat_state_values( store: Store, flat_storage_manager: &FlatStorageManager, + keep_running: &AtomicBool, read_state_threads: usize, batch_size: usize, -) { +) -> bool { info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration"); let migration_start = std::time::Instant::now(); let mut value_reader = StateValueReader::new(store.clone(), read_state_threads); let mut inlined_total_count = 0; + let mut interrupted = false; for (batch_index, batch) in store.iter(DBCol::FlatState).chunks(batch_size).into_iter().enumerate() { + if !keep_running.load(std::sync::atomic::Ordering::Relaxed) { + info!(target: "store", %batch_index, "FlatState value inlining migration was interrupted"); + interrupted = true; + break; + } let (mut min_key, mut max_key) = (None, None); for entry in batch { PROCESSED_COUNT.inc(); @@ -204,7 +268,8 @@ pub fn inline_flat_state_values( } value_reader.close(); let migration_elapsed = migration_start.elapsed(); - info!(target: "store", %inlined_total_count, ?migration_elapsed, "Finished FlatState value inlining migration"); + info!(target: "store", %inlined_total_count, ?migration_elapsed, %interrupted, "Finished FlatState value inlining migration"); + !interrupted } fn log_skipped(reason: &str, err: impl std::error::Error) { @@ -214,6 +279,8 @@ fn log_skipped(reason: &str, err: impl std::error::Error) { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicBool; + use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::hash::hash; use near_primitives::shard_layout::ShardLayout; @@ -243,7 +310,13 @@ mod tests { } store_update.commit().unwrap(); } - inline_flat_state_values(store.clone(), &FlatStorageManager::new(store.clone()), 2, 4); + inline_flat_state_values( + store.clone(), + &FlatStorageManager::new(store.clone()), + &AtomicBool::new(true), + 2, + 4, + ); assert_eq!( store .iter(DBCol::FlatState) diff --git a/core/store/src/flat/mod.rs b/core/store/src/flat/mod.rs index 4461834b6f4..07de86409b2 100644 --- a/core/store/src/flat/mod.rs +++ b/core/store/src/flat/mod.rs @@ -36,7 +36,7 @@ mod types; pub use chunk_view::FlatStorageChunkView; pub use delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata}; -pub use inlining_migration::inline_flat_state_values; +pub use inlining_migration::{inline_flat_state_values, FlatStateValuesInliningMigrationHandle}; pub use manager::FlatStorageManager; pub use metrics::FlatStorageCreationMetrics; pub use storage::FlatStorage; diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index 01b1296286b..072c819c0cb 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -1,6 +1,7 @@ //! This file contains helper functions for accessing flat storage data in DB //! TODO(#8577): remove this file and move functions to the corresponding structs +use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY; use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta}; use crate::flat::types::FlatStorageError; use crate::{DBCol, Store, StoreUpdate}; @@ -9,7 +10,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{ShardLayout, ShardUId}; use super::delta::{FlatStateDelta, FlatStateDeltaMetadata}; -use super::types::{FlatStateValue, FlatStorageStatus}; +use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus}; /// Prefixes for keys in `FlatStateMisc` DB column. pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD"; @@ -88,6 +89,26 @@ pub(crate) fn decode_flat_state_db_key( Ok((shard_uid, trie_key.to_vec())) } +pub fn get_flat_state_values_inlining_migration_status( + store: &Store, +) -> Result { + store + .get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY) + .map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty)) + .map_err(|_| FlatStorageError::StorageInternalError) +} + +pub fn set_flat_state_values_inlining_migration_status( + store: &Store, + status: FlatStateValuesInliningMigrationStatus, +) -> Result<(), FlatStorageError> { + let mut store_update = store.store_update(); + store_update + .set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status) + .map_err(|_| FlatStorageError::StorageInternalError)?; + store_update.commit().map_err(|_| FlatStorageError::StorageInternalError) +} + pub(crate) fn get_flat_state_value( store: &Store, shard_uid: ShardUId, diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index ba6b2be1065..ef6d5f88637 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -74,6 +74,13 @@ impl From for StorageError { } } +#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)] +pub enum FlatStateValuesInliningMigrationStatus { + Empty, + InProgress, + Finished, +} + #[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)] pub enum FlatStorageStatus { /// Flat Storage is not supported. diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 3eacaaddbb3..cae9feef218 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -10,6 +10,7 @@ use cold_storage::ColdStoreLoopHandle; use near_async::actix::AddrWithAutoSpanContextExt; use near_async::messaging::{IntoSender, LateBoundSender}; use near_async::time; +use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor}; @@ -17,6 +18,7 @@ use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; use near_network::PeerManagerActor; use near_primitives::block::GenesisId; +use near_store::flat::FlatStateValuesInliningMigrationHandle; use near_store::metadata::DbKind; use near_store::metrics::spawn_db_metrics_loop; use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError}; @@ -193,6 +195,9 @@ pub struct NearNode { pub cold_store_loop_handle: Option, /// Contains handles to background threads that may be dumping state to S3. pub state_sync_dump_handle: Option, + /// A handle to control background flat state values inlining migration. + /// Needed temporarily, will be removed after the migration is completed. + pub flat_state_migration_handle: Option, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -303,6 +308,18 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); + let flat_state_migration_handle = + if let Some(flat_storage_manager) = runtime.get_flat_storage_manager() { + let handle = FlatStateValuesInliningMigrationHandle::start_background_migration( + storage.get_hot_store(), + flat_storage_manager, + config.client_config.client_background_migration_threads, + ); + Some(handle) + } else { + None + }; + let state_sync_dump_handle = spawn_state_sync_dump( &config.client_config, chain_genesis, @@ -366,6 +383,7 @@ pub fn start_with_config_and_synchronization( arbiters, cold_store_loop_handle, state_sync_dump_handle, + flat_state_migration_handle, }) } diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 3790d0bb5bd..98a05359374 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -733,6 +733,10 @@ impl RuntimeAdapter for NightshadeRuntime { Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root)) } + fn get_flat_storage_manager(&self) -> Option { + Some(self.flat_storage_manager.clone()) + } + fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option { self.flat_storage_manager.get_flat_storage_for_shard(shard_uid) } diff --git a/neard/src/cli.rs b/neard/src/cli.rs index f8156b523df..4cf2b3d19e4 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -518,6 +518,7 @@ impl RunCmd { rpc_servers, cold_store_loop_handle, state_sync_dump_handle, + flat_state_migration_handle, .. } = nearcore::start_with_config_and_synchronization( home_dir, @@ -544,6 +545,9 @@ impl RunCmd { if let Some(handle) = state_sync_dump_handle { handle.stop() } + if let Some(handle) = flat_state_migration_handle { + handle.stop(); + } futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move { server.stop(true).await; debug!(target: "neard", "{} server stopped", name); diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index b99adae30d7..6464fac8d5c 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -11,6 +11,7 @@ use near_store::flat::{ }; use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener}; use nearcore::{load_config, NearConfig, NightshadeRuntime}; +use std::sync::atomic::AtomicBool; use std::{path::PathBuf, sync::Arc, time::Duration}; use tqdm::tqdm; @@ -318,6 +319,7 @@ impl FlatStorageCommand { inline_flat_state_values( store, &flat_storage_manager, + &AtomicBool::new(true), cmd.num_threads, cmd.batch_size, );