Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Clean flat state via range #9109

Merged
merged 8 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3203,7 +3203,7 @@ impl Chain {
// Before working with state parts, remove existing flat storage data.
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();
Expand Down
6 changes: 1 addition & 5 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,11 +988,7 @@ impl RuntimeAdapter for KeyValueRuntime {
panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this");
}

fn remove_flat_storage_for_shard(
&self,
_shard_uid: ShardUId,
_epoch_id: &EpochId,
) -> Result<(), Error> {
fn remove_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Result<(), Error> {
Ok(())
}

Expand Down
6 changes: 1 addition & 5 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,7 @@ pub trait RuntimeAdapter: Send + Sync {

/// Removes flat storage state for shard, if it exists.
/// Used to clear old flat storage data from disk and memory before syncing to newer state.
fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
epoch_id: &EpochId,
) -> Result<(), Error>;
fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error>;

fn set_flat_storage_for_genesis(
&self,
Expand Down
15 changes: 4 additions & 11 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::flat::{
};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -121,18 +121,11 @@ impl FlatStorageManager {
flat_storages.get(&shard_uid).cloned()
}

pub fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
shard_layout: ShardLayout,
) -> Result<(), StorageError> {
pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);

match flat_storages.remove(&shard_uid) {
None => {}
Some(flat_storage) => {
flat_storage.clear_state(shard_layout)?;
}
if let Some(flat_store) = flat_storages.remove(&shard_uid) {
flat_store.clear_state()?;
}

Ok(())
Expand Down
34 changes: 7 additions & 27 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock};

use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use tracing::{debug, info, warn};
use tracing::{debug, warn};

use crate::flat::delta::CachedFlatStateChanges;
use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use crate::{DBCol, Store, StoreUpdate};
use crate::{Store, StoreUpdate};

use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
Expand Down Expand Up @@ -299,33 +299,13 @@ impl FlatStorage {
}

/// Clears all State key-value pairs from flat storage.
pub fn clear_state(&self, _shard_layout: ShardLayout) -> Result<(), StorageError> {
pub fn clear_state(&self) -> Result<(), StorageError> {
let guard = self.0.write().expect(super::POISONED_LOCK_ERR);
let shard_id = guard.shard_uid.shard_id();

// Removes all items belonging to the shard one by one.
// Note that it does not work for resharding.
// TODO (#7327): call it just after we stopped tracking a shard.
// TODO (#7327): remove FlatStateChanges. Consider custom serialization of keys to remove them by
// prefix.
// TODO (#7327): support range deletions which are much faster than naive deletions. For that, we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also remove the following comments above, that is no longer relevant:

        // Removes all items belonging to the shard one by one.
        // Note that it does not work for resharding.
        // TODO (#7327): remove FlatStateChanges.

// can delete ranges of keys like
// [ [0]+boundary_accounts(shard_id) .. [0]+boundary_accounts(shard_id+1) ), etc.
// We should also take fixed accounts into account.
let mut store_update = guard.store.store_update();
let mut removed_items = 0;
for item in guard.store.iter(DBCol::FlatState) {
let (key, _) =
item.map_err(|e| StorageError::StorageInconsistentState(e.to_string()))?;

if store_helper::key_belongs_to_shard(&key, &guard.shard_uid)? {
removed_items += 1;
store_update.delete(DBCol::FlatState, &key);
}
}
info!(target: "store", %shard_id, %removed_items, "Removing old items from flat storage");

let mut store_update = guard.store.store_update();
store_helper::remove_all_flat_state_values(&mut store_update, guard.shard_uid);
store_helper::remove_all_deltas(&mut store_update, guard.shard_uid);

store_helper::set_flat_storage_status(
&mut store_update,
guard.shard_uid,
Expand Down
14 changes: 11 additions & 3 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,19 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h
store_update.delete(DBCol::FlatStateDeltaMetadata, &key);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) {
let key_from = shard_uid.to_bytes();
let key_to = ShardUId::next_shard_prefix(&key_from);
store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to);
store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to);
store_update.delete_range(col, &key_from, &key_to);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges);
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata);
}

pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
}

pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
Expand Down
24 changes: 3 additions & 21 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,7 @@ fn test_flat_storage_creation_sanity() {
);
}

let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap();
env.clients[0]
.chain
.runtime_adapter
.remove_flat_storage_for_shard(shard_uid, &epoch_id)
.unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
}

// Create new chain and runtime using the same store. It should produce next blocks normally, but now it should
Expand Down Expand Up @@ -247,13 +241,7 @@ fn test_flat_storage_creation_two_shards() {
);
}

let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap();
env.clients[0]
.chain
.runtime_adapter
.remove_flat_storage_for_shard(shard_uids[0], &epoch_id)
.unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uids[0]).unwrap();
}

// Check that flat storage is not ready for shard 0 but ready for shard 1.
Expand Down Expand Up @@ -393,13 +381,7 @@ fn test_catchup_succeeds_even_if_no_new_blocks() {
env.produce_block(0, height);
}
// Remove flat storage.
let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap();
env.clients[0]
.chain
.runtime_adapter
.remove_flat_storage_for_shard(shard_uid, &epoch_id)
.unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
}
let mut env = setup_env(&genesis, store.clone());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
Expand Down
9 changes: 2 additions & 7 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,14 +753,9 @@ impl RuntimeAdapter for NightshadeRuntime {
self.flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage);
}

fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
epoch_id: &EpochId,
) -> Result<(), Error> {
let shard_layout = self.epoch_manager.get_shard_layout(epoch_id)?;
fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error> {
self.flat_storage_manager
.remove_flat_storage_for_shard(shard_uid, shard_layout)
.remove_flat_storage_for_shard(shard_uid)
.map_err(Error::StorageError)?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion tools/flat-storage/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl FlatStorageCommand {
let shard_uid = epoch_manager.shard_id_to_uid(reset_cmd.shard_id, &tip.epoch_id)?;
rw_hot_runtime.create_flat_storage_for_shard(shard_uid);

rw_hot_runtime.remove_flat_storage_for_shard(shard_uid, &tip.epoch_id)?;
rw_hot_runtime.remove_flat_storage_for_shard(shard_uid)?;
}
SubCommand::Init(init_cmd) => {
let (_, epoch_manager, rw_hot_runtime, rw_chain_store, rw_hot_store) = Self::get_db(
Expand Down