Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
pugachAG committed May 24, 2023
1 parent 5b78611 commit 5ce743b
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 9 deletions.
3 changes: 2 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,8 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::FlatState
| DBCol::FlatStateChanges
| DBCol::FlatStateDeltaMetadata
| DBCol::FlatStorageStatus => {
| DBCol::FlatStorageStatus
| DBCol::Misc => {
unreachable!();
}
}
Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,10 @@ impl RuntimeAdapter for KeyValueRuntime {
.get_trie_for_shard(ShardUId { version: 0, shard_id: shard_id as u32 }, state_root))
}

fn get_flat_storage_manager(&self) -> Option<near_store::flat::FlatStorageManager> {
None
}

fn get_view_trie_for_shard(
&self,
shard_id: ShardId,
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use chrono::DateTime;
use chrono::Utc;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
use num_rational::Rational32;

use crate::metrics;
Expand Down Expand Up @@ -297,6 +298,8 @@ pub trait RuntimeAdapter: Send + Sync {
state_root: StateRoot,
) -> Result<Trie, Error>;

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager>;

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage>;

fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus;
Expand Down
9 changes: 8 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum DBCol {
/// - *Rows*: single row `"VERSION"`
/// - *Content type*: The version of the database (u32), serialized as JSON.
DbVersion,
/// Column that store Misc cells.
/// Column that stores miscellaneous block-related cells.
/// - *Rows*: multiple, for example `"GENESIS_JSON_HASH"`, `"HEAD_KEY"`, `"LATEST_KNOWN_KEY"` etc.
/// - *Content type*: cell specific.
BlockMisc,
Expand Down Expand Up @@ -273,6 +273,11 @@ pub enum DBCol {
/// - *Rows*: `shard_uid`
/// - *Column type*: `FlatStorageStatus`
FlatStorageStatus,
/// Column to persist pieces of miscellaneous small data. Should only be used to store
/// constant or small (for example per-shard) amount of data.
/// - *Rows*: arbitrary string, see `crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY` for example
/// - *Column type*: arbitrary bytes
Misc,
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -422,6 +427,7 @@ impl DBCol {

// TODO
DBCol::ChallengedBlocks => false,
DBCol::Misc => false,
// BlockToCatchup is only needed while syncing and it is not immutable.
DBCol::BlocksToCatchup => false,
// BlockRefCount is only needed when handling forks and it is not immutable.
Expand Down Expand Up @@ -479,6 +485,7 @@ impl DBCol {
match self {
DBCol::DbVersion => &[DBKeyType::StringLiteral],
DBCol::BlockMisc => &[DBKeyType::StringLiteral],
DBCol::Misc => &[DBKeyType::StringLiteral],
DBCol::Block => &[DBKeyType::BlockHash],
DBCol::BlockHeader => &[DBKeyType::BlockHash],
DBCol::BlockHeight => &[DBKeyType::BlockHeight],
Expand Down
5 changes: 5 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::splitdb::SplitDB;
pub use self::slice::DBSlice;
pub use self::testdb::TestDB;

// `DBCol::BlockMisc` keys
pub const HEAD_KEY: &[u8; 4] = b"HEAD";
pub const TAIL_KEY: &[u8; 4] = b"TAIL";
pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL";
Expand All @@ -32,6 +33,10 @@ pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH";
pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS";
pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD";

// `DBCol::Misc` keys
pub const FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY: &[u8] =
b"FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS";

#[derive(Default, Debug)]
pub struct DBTransaction {
pub(crate) ops: Vec<DBOp>,
Expand Down
83 changes: 78 additions & 5 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::JoinHandle;

use borsh::{BorshDeserialize, BorshSerialize};
use crossbeam::channel;
Expand All @@ -7,16 +10,70 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use tracing::{debug, info};

use crate::flat::store_helper::set_flat_state_values_inlining_migration_status;
use crate::metrics::flat_state_metrics::inlining_migration::{
FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT,
PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT,
};
use crate::{DBCol, Store, TrieDBStorage, TrieStorage};

use super::store_helper::decode_flat_state_db_key;
use super::types::INLINE_DISK_VALUE_THRESHOLD;
use super::store_helper::{
decode_flat_state_db_key, get_flat_state_values_inlining_migration_status,
};
use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD};
use super::{FlatStateValue, FlatStorageManager};

pub struct FlatStateValuesInliningMigrationHandle {
handle: JoinHandle<()>,
keep_running: Arc<AtomicBool>,
}

const BACKGROUND_MIGRATION_BATCH_SIZE: usize = 50_000;

impl FlatStateValuesInliningMigrationHandle {
pub fn start_background_migration(
store: Store,
flat_storage_manager: FlatStorageManager,
read_state_threads: usize,
) -> Self {
let keep_running = Arc::new(AtomicBool::new(true));
let keep_runnning_clone = keep_running.clone();
let handle = std::thread::spawn(move || {
let status = get_flat_state_values_inlining_migration_status(&store)
.expect("failed to read fs migration status");
info!(target: "store", ?status, "Read FlatState values inlining migration status");
if status == FlatStateValuesInliningMigrationStatus::Finished {
return;
}
set_flat_state_values_inlining_migration_status(
&store,
FlatStateValuesInliningMigrationStatus::InProgress,
)
.expect("failed to set fs migration in-progress status");
let completed = inline_flat_state_values(
store.clone(),
&flat_storage_manager,
&keep_running,
read_state_threads,
BACKGROUND_MIGRATION_BATCH_SIZE,
);
if completed {
set_flat_state_values_inlining_migration_status(
&store,
FlatStateValuesInliningMigrationStatus::Finished,
)
.expect("failed to set fs migration finished status");
}
});
Self { handle, keep_running: keep_runnning_clone }
}

pub fn stop(self) {
self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
self.handle.join().expect("join should not fail here");
}
}

struct ReadValueRequest {
shard_uid: ShardUId,
value_hash: CryptoHash,
Expand All @@ -33,7 +90,7 @@ struct StateValueReader {
pending_requests: usize,
value_request_send: channel::Sender<ReadValueRequest>,
value_response_recv: channel::Receiver<ReadValueResponse>,
join_handles: Vec<std::thread::JoinHandle<()>>,
join_handles: Vec<JoinHandle<()>>,
}

impl StateValueReader {
Expand Down Expand Up @@ -110,16 +167,23 @@ impl StateValueReader {
pub fn inline_flat_state_values(
store: Store,
flat_storage_manager: &FlatStorageManager,
keep_running: &AtomicBool,
read_state_threads: usize,
batch_size: usize,
) {
) -> bool {
info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration");
let migration_start = std::time::Instant::now();
let mut value_reader = StateValueReader::new(store.clone(), read_state_threads);
let mut inlined_total_count = 0;
let mut interrupted = false;
for (batch_index, batch) in
store.iter(DBCol::FlatState).chunks(batch_size).into_iter().enumerate()
{
if !keep_running.load(std::sync::atomic::Ordering::Relaxed) {
info!(target: "store", %batch_index, "FlatState value inlining migration was interrupted");
interrupted = true;
break;
}
let (mut min_key, mut max_key) = (None, None);
for entry in batch {
PROCESSED_COUNT.inc();
Expand Down Expand Up @@ -205,6 +269,7 @@ pub fn inline_flat_state_values(
value_reader.close();
let migration_elapsed = migration_start.elapsed();
info!(target: "store", %inlined_total_count, ?migration_elapsed, "Finished FlatState value inlining migration");
!interrupted
}

fn log_skipped(reason: &str, err: impl std::error::Error) {
Expand All @@ -214,6 +279,8 @@ fn log_skipped(reason: &str, err: impl std::error::Error) {

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;

use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
Expand Down Expand Up @@ -243,7 +310,13 @@ mod tests {
}
store_update.commit().unwrap();
}
inline_flat_state_values(store.clone(), &FlatStorageManager::new(store.clone()), 2, 4);
inline_flat_state_values(
store.clone(),
&FlatStorageManager::new(store.clone()),
&AtomicBool::new(true),
2,
4,
);
assert_eq!(
store
.iter(DBCol::FlatState)
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod types;

pub use chunk_view::FlatStorageChunkView;
pub use delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
pub use inlining_migration::inline_flat_state_values;
pub use inlining_migration::{inline_flat_state_values, FlatStateValuesInliningMigrationHandle};
pub use manager::FlatStorageManager;
pub use metrics::FlatStorageCreationMetrics;
pub use storage::FlatStorage;
Expand Down
23 changes: 22 additions & 1 deletion core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This file contains helper functions for accessing flat storage data in DB
//! TODO(#8577): remove this file and move functions to the corresponding structs
use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY;
use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta};
use crate::flat::types::FlatStorageError;
use crate::{DBCol, Store, StoreUpdate};
Expand All @@ -9,7 +10,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};

use super::delta::{FlatStateDelta, FlatStateDeltaMetadata};
use super::types::{FlatStateValue, FlatStorageStatus};
use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus};

/// Prefixes for keys in `FlatStateMisc` DB column.
pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD";
Expand Down Expand Up @@ -88,6 +89,26 @@ pub(crate) fn decode_flat_state_db_key(
Ok((shard_uid, trie_key.to_vec()))
}

pub fn get_flat_state_values_inlining_migration_status(
store: &Store,
) -> Result<FlatStateValuesInliningMigrationStatus, FlatStorageError> {
store
.get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY)
.map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty))
.map_err(|_| FlatStorageError::StorageInternalError)
}

pub fn set_flat_state_values_inlining_migration_status(
store: &Store,
status: FlatStateValuesInliningMigrationStatus,
) -> Result<(), FlatStorageError> {
let mut store_update = store.store_update();
store_update
.set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status)
.map_err(|_| FlatStorageError::StorageInternalError)?;
store_update.commit().map_err(|_| FlatStorageError::StorageInternalError)
}

pub(crate) fn get_flat_state_value(
store: &Store,
shard_uid: ShardUId,
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/flat/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl From<FlatStorageError> for StorageError {
}
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStateValuesInliningMigrationStatus {
Empty,
InProgress,
Finished,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStorageStatus {
/// Flat Storage is not supported.
Expand Down
18 changes: 18 additions & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use cold_storage::ColdStoreLoopHandle;
use near_async::actix::AddrWithAutoSpanContextExt;
use near_async::messaging::{IntoSender, LateBoundSender};
use near_async::time;
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_store::flat::FlatStateValuesInliningMigrationHandle;
use near_store::metadata::DbKind;
use near_store::metrics::spawn_db_metrics_loop;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
Expand Down Expand Up @@ -193,6 +195,9 @@ pub struct NearNode {
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
/// A handle to control background flat state values inlining migration.
/// Needed temporarily, will be removed after the migration is completed.
pub flat_state_migration_handle: Option<FlatStateValuesInliningMigrationHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -303,6 +308,18 @@ pub fn start_with_config_and_synchronization(
);
shards_manager_adapter.bind(shards_manager_actor);

let flat_state_migration_handle =
if let Some(flat_storage_manager) = runtime.get_flat_storage_manager() {
let handle = FlatStateValuesInliningMigrationHandle::start_background_migration(
storage.get_hot_store(),
flat_storage_manager,
config.client_config.client_background_migration_threads,
);
Some(handle)
} else {
None
};

let state_sync_dump_handle = spawn_state_sync_dump(
&config.client_config,
chain_genesis,
Expand Down Expand Up @@ -366,6 +383,7 @@ pub fn start_with_config_and_synchronization(
arbiters,
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
})
}

Expand Down
4 changes: 4 additions & 0 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ impl RuntimeAdapter for NightshadeRuntime {
Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root))
}

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager> {
Some(self.flat_storage_manager.clone())
}

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage> {
self.flat_storage_manager.get_flat_storage_for_shard(shard_uid)
}
Expand Down
4 changes: 4 additions & 0 deletions neard/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ impl RunCmd {
rpc_servers,
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
..
} = nearcore::start_with_config_and_synchronization(
home_dir,
Expand All @@ -544,6 +545,9 @@ impl RunCmd {
if let Some(handle) = state_sync_dump_handle {
handle.stop()
}
if let Some(handle) = flat_state_migration_handle {
handle.stop();
}
futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move {
server.stop(true).await;
debug!(target: "neard", "{} server stopped", name);
Expand Down
Loading

0 comments on commit 5ce743b

Please sign in to comment.