From 11ab1611ef49b1563053740260145f2a8db24f26 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Thu, 25 May 2023 19:15:06 +0200 Subject: [PATCH] feat: enable FlatState values inlining migration in the background (#9093) Part of #8243. This PR enables the migration added in #9037 to be executed in the background on the running node. It supports graceful stop when the node is shut down. The implementation is heavily inspired by state sync background dumping to S3. This PR also introduces a new column `DBCol::Misc`. For now it only stores the status of the migration, but it can hold any small pieces of data, similar to `DBCol::BlockMisc`. `FlatStorageManager` is exposed as part of `RuntimeAdapter` in this PR. This is the first step in cleaning `RuntimeAdapter` from all other flat storage related methods, as the manager can be directly used instead. Tested by manually running a node and checking metrics and log messages. After that flat storage was checked with `flat-storage verify` cmd. --- chain/chain/src/store.rs | 3 +- chain/chain/src/test_utils/kv_runtime.rs | 4 ++ chain/chain/src/types.rs | 3 + core/store/src/columns.rs | 9 ++- core/store/src/db.rs | 5 ++ core/store/src/flat/inlining_migration.rs | 85 +++++++++++++++++++++-- core/store/src/flat/mod.rs | 2 +- core/store/src/flat/store_helper.rs | 23 +++++- core/store/src/flat/types.rs | 7 ++ nearcore/src/lib.rs | 18 +++++ nearcore/src/runtime/mod.rs | 4 ++ neard/src/cli.rs | 4 ++ tools/flat-storage/src/commands.rs | 2 + 13 files changed, 159 insertions(+), 10 deletions(-) diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 3023f287558..334ddcd7d51 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -2568,7 +2568,8 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::FlatState | DBCol::FlatStateChanges | DBCol::FlatStateDeltaMetadata - | DBCol::FlatStorageStatus => { + | DBCol::FlatStorageStatus + | DBCol::Misc => { unreachable!(); } } diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 664f8b3bc52..c28e5e33589 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -960,6 +960,10 @@ impl RuntimeAdapter for KeyValueRuntime { .get_trie_for_shard(ShardUId { version: 0, shard_id: shard_id as u32 }, state_root)) } + fn get_flat_storage_manager(&self) -> Option { + None + } + fn get_view_trie_for_shard( &self, shard_id: ShardId, diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 2a26a694247..34a2b314661 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -5,6 +5,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use chrono::DateTime; use chrono::Utc; use near_primitives::sandbox::state_patch::SandboxStatePatch; +use near_store::flat::FlatStorageManager; use num_rational::Rational32; use crate::metrics; @@ -299,6 +300,8 @@ pub trait RuntimeAdapter: Send + Sync { state_root: StateRoot, ) -> Result; + fn get_flat_storage_manager(&self) -> Option; + fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option; fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus; diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index f0fcdbdb98a..682a670321d 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -23,7 +23,7 @@ pub enum DBCol { /// - *Rows*: single row `"VERSION"` /// - *Content type*: The version of the database (u32), serialized as JSON. DbVersion, - /// Column that store Misc cells. + /// Column that stores miscellaneous block-related cells. /// - *Rows*: multiple, for example `"GENESIS_JSON_HASH"`, `"HEAD_KEY"`, `"LATEST_KNOWN_KEY"` etc. /// - *Content type*: cell specific. BlockMisc, @@ -273,6 +273,11 @@ pub enum DBCol { /// - *Rows*: `shard_uid` /// - *Column type*: `FlatStorageStatus` FlatStorageStatus, + /// Column to persist pieces of miscellaneous small data. Should only be used to store + /// constant or small (for example per-shard) amount of data. + /// - *Rows*: arbitrary string, see `crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY` for example + /// - *Column type*: arbitrary bytes + Misc, } /// Defines different logical parts of a db key. @@ -422,6 +427,7 @@ impl DBCol { // TODO DBCol::ChallengedBlocks => false, + DBCol::Misc => false, // BlockToCatchup is only needed while syncing and it is not immutable. DBCol::BlocksToCatchup => false, // BlockRefCount is only needed when handling forks and it is not immutable. @@ -479,6 +485,7 @@ impl DBCol { match self { DBCol::DbVersion => &[DBKeyType::StringLiteral], DBCol::BlockMisc => &[DBKeyType::StringLiteral], + DBCol::Misc => &[DBKeyType::StringLiteral], DBCol::Block => &[DBKeyType::BlockHash], DBCol::BlockHeader => &[DBKeyType::BlockHash], DBCol::BlockHeight => &[DBKeyType::BlockHeight], diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 378f06c348f..2b4aa1bdf18 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -20,6 +20,7 @@ pub use self::splitdb::SplitDB; pub use self::slice::DBSlice; pub use self::testdb::TestDB; +// `DBCol::BlockMisc` keys pub const HEAD_KEY: &[u8; 4] = b"HEAD"; pub const TAIL_KEY: &[u8; 4] = b"TAIL"; pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL"; @@ -32,6 +33,10 @@ pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH"; pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS"; pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD"; +// `DBCol::Misc` keys +pub const FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY: &[u8] = + b"FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS"; + #[derive(Default, Debug)] pub struct DBTransaction { pub(crate) ops: Vec, diff --git a/core/store/src/flat/inlining_migration.rs b/core/store/src/flat/inlining_migration.rs index 20ba653cc3e..c12a4c587ba 100644 --- a/core/store/src/flat/inlining_migration.rs +++ b/core/store/src/flat/inlining_migration.rs @@ -1,4 +1,7 @@ use std::collections::HashMap; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::thread::JoinHandle; use borsh::{BorshDeserialize, BorshSerialize}; use crossbeam::channel; @@ -7,16 +10,70 @@ use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use tracing::{debug, info}; +use crate::flat::store_helper::set_flat_state_values_inlining_migration_status; use crate::metrics::flat_state_metrics::inlining_migration::{ FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT, PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT, }; use crate::{DBCol, Store, TrieDBStorage, TrieStorage}; -use super::store_helper::decode_flat_state_db_key; -use super::types::INLINE_DISK_VALUE_THRESHOLD; +use super::store_helper::{ + decode_flat_state_db_key, get_flat_state_values_inlining_migration_status, +}; +use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD}; use super::{FlatStateValue, FlatStorageManager}; +pub struct FlatStateValuesInliningMigrationHandle { + handle: JoinHandle<()>, + keep_running: Arc, +} + +const BACKGROUND_MIGRATION_BATCH_SIZE: usize = 50_000; + +impl FlatStateValuesInliningMigrationHandle { + pub fn start_background_migration( + store: Store, + flat_storage_manager: FlatStorageManager, + read_state_threads: usize, + ) -> Self { + let keep_running = Arc::new(AtomicBool::new(true)); + let keep_runnning_clone = keep_running.clone(); + let handle = std::thread::spawn(move || { + let status = get_flat_state_values_inlining_migration_status(&store) + .expect("failed to read fs migration status"); + info!(target: "store", ?status, "Read FlatState values inlining migration status"); + if status == FlatStateValuesInliningMigrationStatus::Finished { + return; + } + set_flat_state_values_inlining_migration_status( + &store, + FlatStateValuesInliningMigrationStatus::InProgress, + ) + .expect("failed to set fs migration in-progress status"); + let completed = inline_flat_state_values( + store.clone(), + &flat_storage_manager, + &keep_running, + read_state_threads, + BACKGROUND_MIGRATION_BATCH_SIZE, + ); + if completed { + set_flat_state_values_inlining_migration_status( + &store, + FlatStateValuesInliningMigrationStatus::Finished, + ) + .expect("failed to set fs migration finished status"); + } + }); + Self { handle, keep_running: keep_runnning_clone } + } + + pub fn stop(self) { + self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); + self.handle.join().expect("join should not fail here"); + } +} + struct ReadValueRequest { shard_uid: ShardUId, value_hash: CryptoHash, @@ -33,7 +90,7 @@ struct StateValueReader { pending_requests: usize, value_request_send: channel::Sender, value_response_recv: channel::Receiver, - join_handles: Vec>, + join_handles: Vec>, } impl StateValueReader { @@ -110,16 +167,23 @@ impl StateValueReader { pub fn inline_flat_state_values( store: Store, flat_storage_manager: &FlatStorageManager, + keep_running: &AtomicBool, read_state_threads: usize, batch_size: usize, -) { +) -> bool { info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration"); let migration_start = std::time::Instant::now(); let mut value_reader = StateValueReader::new(store.clone(), read_state_threads); let mut inlined_total_count = 0; + let mut interrupted = false; for (batch_index, batch) in store.iter(DBCol::FlatState).chunks(batch_size).into_iter().enumerate() { + if !keep_running.load(std::sync::atomic::Ordering::Relaxed) { + info!(target: "store", %batch_index, "FlatState value inlining migration was interrupted"); + interrupted = true; + break; + } let (mut min_key, mut max_key) = (None, None); for entry in batch { PROCESSED_COUNT.inc(); @@ -204,7 +268,8 @@ pub fn inline_flat_state_values( } value_reader.close(); let migration_elapsed = migration_start.elapsed(); - info!(target: "store", %inlined_total_count, ?migration_elapsed, "Finished FlatState value inlining migration"); + info!(target: "store", %inlined_total_count, ?migration_elapsed, %interrupted, "Finished FlatState value inlining migration"); + !interrupted } fn log_skipped(reason: &str, err: impl std::error::Error) { @@ -214,6 +279,8 @@ fn log_skipped(reason: &str, err: impl std::error::Error) { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicBool; + use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::hash::hash; use near_primitives::shard_layout::ShardLayout; @@ -243,7 +310,13 @@ mod tests { } store_update.commit().unwrap(); } - inline_flat_state_values(store.clone(), &FlatStorageManager::new(store.clone()), 2, 4); + inline_flat_state_values( + store.clone(), + &FlatStorageManager::new(store.clone()), + &AtomicBool::new(true), + 2, + 4, + ); assert_eq!( store .iter(DBCol::FlatState) diff --git a/core/store/src/flat/mod.rs b/core/store/src/flat/mod.rs index 4461834b6f4..07de86409b2 100644 --- a/core/store/src/flat/mod.rs +++ b/core/store/src/flat/mod.rs @@ -36,7 +36,7 @@ mod types; pub use chunk_view::FlatStorageChunkView; pub use delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata}; -pub use inlining_migration::inline_flat_state_values; +pub use inlining_migration::{inline_flat_state_values, FlatStateValuesInliningMigrationHandle}; pub use manager::FlatStorageManager; pub use metrics::FlatStorageCreationMetrics; pub use storage::FlatStorage; diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index 01b1296286b..072c819c0cb 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -1,6 +1,7 @@ //! This file contains helper functions for accessing flat storage data in DB //! TODO(#8577): remove this file and move functions to the corresponding structs +use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY; use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta}; use crate::flat::types::FlatStorageError; use crate::{DBCol, Store, StoreUpdate}; @@ -9,7 +10,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{ShardLayout, ShardUId}; use super::delta::{FlatStateDelta, FlatStateDeltaMetadata}; -use super::types::{FlatStateValue, FlatStorageStatus}; +use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus}; /// Prefixes for keys in `FlatStateMisc` DB column. pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD"; @@ -88,6 +89,26 @@ pub(crate) fn decode_flat_state_db_key( Ok((shard_uid, trie_key.to_vec())) } +pub fn get_flat_state_values_inlining_migration_status( + store: &Store, +) -> Result { + store + .get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY) + .map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty)) + .map_err(|_| FlatStorageError::StorageInternalError) +} + +pub fn set_flat_state_values_inlining_migration_status( + store: &Store, + status: FlatStateValuesInliningMigrationStatus, +) -> Result<(), FlatStorageError> { + let mut store_update = store.store_update(); + store_update + .set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status) + .map_err(|_| FlatStorageError::StorageInternalError)?; + store_update.commit().map_err(|_| FlatStorageError::StorageInternalError) +} + pub(crate) fn get_flat_state_value( store: &Store, shard_uid: ShardUId, diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index ba6b2be1065..ef6d5f88637 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -74,6 +74,13 @@ impl From for StorageError { } } +#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)] +pub enum FlatStateValuesInliningMigrationStatus { + Empty, + InProgress, + Finished, +} + #[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)] pub enum FlatStorageStatus { /// Flat Storage is not supported. diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 3eacaaddbb3..cae9feef218 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -10,6 +10,7 @@ use cold_storage::ColdStoreLoopHandle; use near_async::actix::AddrWithAutoSpanContextExt; use near_async::messaging::{IntoSender, LateBoundSender}; use near_async::time; +use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor}; @@ -17,6 +18,7 @@ use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; use near_network::PeerManagerActor; use near_primitives::block::GenesisId; +use near_store::flat::FlatStateValuesInliningMigrationHandle; use near_store::metadata::DbKind; use near_store::metrics::spawn_db_metrics_loop; use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError}; @@ -193,6 +195,9 @@ pub struct NearNode { pub cold_store_loop_handle: Option, /// Contains handles to background threads that may be dumping state to S3. pub state_sync_dump_handle: Option, + /// A handle to control background flat state values inlining migration. + /// Needed temporarily, will be removed after the migration is completed. + pub flat_state_migration_handle: Option, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -303,6 +308,18 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); + let flat_state_migration_handle = + if let Some(flat_storage_manager) = runtime.get_flat_storage_manager() { + let handle = FlatStateValuesInliningMigrationHandle::start_background_migration( + storage.get_hot_store(), + flat_storage_manager, + config.client_config.client_background_migration_threads, + ); + Some(handle) + } else { + None + }; + let state_sync_dump_handle = spawn_state_sync_dump( &config.client_config, chain_genesis, @@ -366,6 +383,7 @@ pub fn start_with_config_and_synchronization( arbiters, cold_store_loop_handle, state_sync_dump_handle, + flat_state_migration_handle, }) } diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 3790d0bb5bd..98a05359374 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -733,6 +733,10 @@ impl RuntimeAdapter for NightshadeRuntime { Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root)) } + fn get_flat_storage_manager(&self) -> Option { + Some(self.flat_storage_manager.clone()) + } + fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option { self.flat_storage_manager.get_flat_storage_for_shard(shard_uid) } diff --git a/neard/src/cli.rs b/neard/src/cli.rs index f8156b523df..4cf2b3d19e4 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -518,6 +518,7 @@ impl RunCmd { rpc_servers, cold_store_loop_handle, state_sync_dump_handle, + flat_state_migration_handle, .. } = nearcore::start_with_config_and_synchronization( home_dir, @@ -544,6 +545,9 @@ impl RunCmd { if let Some(handle) = state_sync_dump_handle { handle.stop() } + if let Some(handle) = flat_state_migration_handle { + handle.stop(); + } futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move { server.stop(true).await; debug!(target: "neard", "{} server stopped", name); diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index b99adae30d7..6464fac8d5c 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -11,6 +11,7 @@ use near_store::flat::{ }; use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener}; use nearcore::{load_config, NearConfig, NightshadeRuntime}; +use std::sync::atomic::AtomicBool; use std::{path::PathBuf, sync::Arc, time::Duration}; use tqdm::tqdm; @@ -318,6 +319,7 @@ impl FlatStorageCommand { inline_flat_state_values( store, &flat_storage_manager, + &AtomicBool::new(true), cmd.num_threads, cmd.batch_size, );