Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
pugachAG committed May 26, 2023
1 parent 8e483e3 commit d6d350d
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 164 deletions.
53 changes: 33 additions & 20 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,21 @@ impl Chain {
// Set the root block of flat state to be the genesis block. Later, when we
// init FlatStorages, we will read the from this column in storage, so it
// must be set here.
let tmp_store_update = runtime_adapter.set_flat_storage_for_genesis(
genesis.hash(),
genesis.header().height(),
genesis.header().epoch_id(),
)?;
store_update.merge(tmp_store_update);
if let Some(flat_storage_manager) = runtime_adapter.get_flat_storage_manager() {
let genesis_epoch_id = genesis.header().epoch_id();
let mut tmp_store_update = store_update.store().store_update();
for shard_uid in
epoch_manager.get_shard_layout(genesis_epoch_id)?.get_shard_uids()
{
flat_storage_manager.set_flat_storage_for_genesis(
&mut tmp_store_update,
shard_uid,
genesis.hash(),
genesis.header().height(),
)
}
store_update.merge(tmp_store_update);
}

info!(target: "chain", "Init: saved genesis: #{} {} / {:?}", block_head.height, block_head.last_block_hash, state_roots);

Expand Down Expand Up @@ -2136,7 +2145,11 @@ impl Chain {
block: &Block,
shard_uid: ShardUId,
) -> Result<(), Error> {
if let Some(flat_storage) = self.runtime_adapter.get_flat_storage_for_shard(shard_uid) {
if let Some(flat_storage) = self
.runtime_adapter
.get_flat_storage_manager()
.and_then(|manager| manager.get_flat_storage_for_shard(shard_uid))
{
let mut new_flat_head = *block.header().last_final_block();
if new_flat_head == CryptoHash::default() {
new_flat_head = *self.genesis.hash();
Expand Down Expand Up @@ -3200,14 +3213,16 @@ impl Chain {
num_parts: u64,
state_parts_task_scheduler: &dyn Fn(ApplyStatePartsRequest),
) -> Result<(), Error> {
// Before working with state parts, remove existing flat storage data.
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid)?;

// Before working with state parts, remove existing flat storage data.
if let Some(flat_storage_manager) = self.runtime_adapter.get_flat_storage_manager() {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
flat_storage_manager.remove_flat_storage_for_shard(shard_uid)?;
}

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();

state_parts_task_scheduler(ApplyStatePartsRequest {
runtime_adapter: self.runtime_adapter.clone(),
Expand Down Expand Up @@ -3245,14 +3260,9 @@ impl Chain {
let epoch_id = block_header.epoch_id();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;

// Check if flat storage is disabled, which may be the case when runtime is implemented with
// `KeyValueRuntime`.
if !matches!(
self.runtime_adapter.get_flat_storage_status(shard_uid),
FlatStorageStatus::Disabled
) {
if let Some(flat_storage_manager) = self.runtime_adapter.get_flat_storage_manager() {
// Flat storage must not exist at this point because leftover keys corrupt its state.
assert!(self.runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_none());

let mut store_update = self.runtime_adapter.store().store_update();
store_helper::set_flat_storage_status(
Expand All @@ -3267,7 +3277,7 @@ impl Chain {
}),
);
store_update.commit()?;
self.runtime_adapter.create_flat_storage_for_shard(shard_uid);
flat_storage_manager.add_flat_storage_for_shard(shard_uid);
}
}

Expand Down Expand Up @@ -4995,7 +5005,10 @@ impl<'a> ChainUpdate<'a> {
},
};

if let Some(chain_flat_storage) = self.runtime_adapter.get_flat_storage_for_shard(shard_uid)
if let Some(chain_flat_storage) = self
.runtime_adapter
.get_flat_storage_manager()
.and_then(|manager| manager.get_flat_storage_for_shard(shard_uid))
{
// If flat storage exists, we add a block to it.
let store_update =
Expand Down
13 changes: 10 additions & 3 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,9 @@ impl FlatStorageShardCreator {
);
store_update.commit()?;
info!(target: "chain", %shard_id, %flat_head, %height, "Garbage collected {gc_count} deltas");
self.runtime.create_flat_storage_for_shard(shard_uid);
if let Some(manager) = self.runtime.get_flat_storage_manager() {
manager.add_flat_storage_for_shard(shard_uid);
}
info!(target: "chain", %shard_id, %flat_head, %height, "Flat storage creation done");
}
}
Expand Down Expand Up @@ -438,14 +440,19 @@ impl FlatStorageCreator {
let num_shards = epoch_manager.num_shards(&chain_head.epoch_id)?;
let mut shard_creators: HashMap<ShardUId, FlatStorageShardCreator> = HashMap::new();
let mut creation_needed = false;
let flat_storage_manager = if let Some(manager) = runtime.get_flat_storage_manager() {
manager
} else {
return Ok(None);
};
for shard_id in 0..num_shards {
if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true) {
let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &chain_head.epoch_id)?;
let status = runtime.get_flat_storage_status(shard_uid);
let status = flat_storage_manager.get_flat_storage_status(shard_uid);

match status {
FlatStorageStatus::Ready(_) => {
runtime.create_flat_storage_for_shard(shard_uid);
flat_storage_manager.add_flat_storage_for_shard(shard_uid);
}
FlatStorageStatus::Empty | FlatStorageStatus::Creation(_) => {
creation_needed = true;
Expand Down
27 changes: 0 additions & 27 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ use crate::BlockHeader;

use near_primitives::epoch_manager::ShardConfig;

use near_store::flat::{FlatStorage, FlatStorageStatus};

use super::ValidatorSchedule;

/// Simple key value runtime for tests.
Expand Down Expand Up @@ -976,31 +974,6 @@ impl RuntimeAdapter for KeyValueRuntime {
))
}

fn get_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Option<FlatStorage> {
None
}

fn get_flat_storage_status(&self, _shard_uid: ShardUId) -> FlatStorageStatus {
FlatStorageStatus::Disabled
}

fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) {
panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this");
}

fn remove_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Result<(), Error> {
Ok(())
}

fn set_flat_storage_for_genesis(
&self,
_genesis_block: &CryptoHash,
_genesis_block_height: BlockHeight,
_genesis_epoch_id: &EpochId,
) -> Result<StoreUpdate, Error> {
Ok(self.store.store_update())
}

fn validate_tx(
&self,
_gas_price: Balance,
Expand Down
21 changes: 0 additions & 21 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use near_primitives::version::{
MIN_PROTOCOL_VERSION_NEP_92_FIX,
};
use near_primitives::views::{QueryRequest, QueryResponse};
use near_store::flat::{FlatStorage, FlatStorageStatus};
use near_store::{PartialStorage, ShardTries, Store, StoreUpdate, Trie, WrappedTrieChanges};

pub use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -302,26 +301,6 @@ pub trait RuntimeAdapter: Send + Sync {

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager>;

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage>;

fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus;

/// Creates flat storage state for given shard, assuming that all flat storage data
/// is already stored in DB.
/// TODO (#7327): consider returning flat storage creation errors here
fn create_flat_storage_for_shard(&self, shard_uid: ShardUId);

/// Removes flat storage state for shard, if it exists.
/// Used to clear old flat storage data from disk and memory before syncing to newer state.
fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error>;

fn set_flat_storage_for_genesis(
&self,
genesis_block: &CryptoHash,
genesis_block_height: BlockHeight,
genesis_epoch_id: &EpochId,
) -> Result<StoreUpdate, Error>;

/// Validates a given signed transaction.
/// If the state root is given, then the verification will use the account. Otherwise it will
/// only validate the transaction math, limits and signatures.
Expand Down
9 changes: 7 additions & 2 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,20 @@ impl FlatStorageManager {
/// the shard's flat storage state hasn't been set before, otherwise it panics.
/// TODO (#7327): this behavior may change when we implement support for state sync
/// and resharding.
pub fn add_flat_storage_for_shard(&self, shard_uid: ShardUId, flat_storage: FlatStorage) {
pub fn add_flat_storage_for_shard(&self, shard_uid: ShardUId) {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);
let original_value = flat_storages.insert(shard_uid, flat_storage);
let original_value =
flat_storages.insert(shard_uid, FlatStorage::new(self.0.store.clone(), shard_uid));
// TODO (#7327): maybe we should propagate the error instead of assert here
// assert is fine now because this function is only called at construction time, but we
// will need to be more careful when we want to implement flat storage for resharding
assert!(original_value.is_none());
}

pub fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus {
store_helper::get_flat_storage_status(&self.0.store, shard_uid)
}

/// Creates `FlatStorageChunkView` to access state for `shard_uid` and block `block_hash`.
/// Note that:
/// * the state includes changes by the block `block_hash`;
Expand Down
10 changes: 3 additions & 7 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ impl FlatStorage {
mod tests {
use crate::flat::delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
use crate::flat::manager::FlatStorageManager;
use crate::flat::storage::FlatStorage;
use crate::flat::types::{BlockInfo, FlatStorageError};
use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use crate::test_utils::create_test_store;
Expand Down Expand Up @@ -471,9 +470,8 @@ mod tests {
}
store_update.commit().unwrap();

let flat_storage = FlatStorage::new(store.clone(), shard_uid);
let flat_storage_manager = FlatStorageManager::new(store.clone());
flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage);
flat_storage_manager.add_flat_storage_for_shard(shard_uid);
let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap();

// Check `BlockNotSupported` errors which are fine to occur during regular block processing.
Expand Down Expand Up @@ -531,9 +529,8 @@ mod tests {
store_update.commit().unwrap();

// Check that flat storage state is created correctly for chain which has skipped heights.
let flat_storage = FlatStorage::new(store.clone(), shard_uid);
let flat_storage_manager = FlatStorageManager::new(store);
flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage);
flat_storage_manager.add_flat_storage_for_shard(shard_uid);
let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap();

// Check that flat head can be moved to block 8.
Expand Down Expand Up @@ -578,8 +575,7 @@ mod tests {
store_update.commit().unwrap();

let flat_storage_manager = FlatStorageManager::new(store.clone());
flat_storage_manager
.add_flat_storage_for_shard(shard_uid, FlatStorage::new(store.clone(), shard_uid));
flat_storage_manager.add_flat_storage_for_shard(shard_uid);
let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap();

// 2. Check that the chunk_view at block i reads the value of key &[1] as &[i]
Expand Down
29 changes: 17 additions & 12 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::types::AccountId;
use near_primitives_core::types::BlockHeight;
use near_store::flat::{
store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageReadyStatus,
FlatStorageStatus, NUM_PARTS_IN_ONE_STEP,
store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageManager,
FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP,
};
use near_store::test_utils::create_test_store;
use near_store::{KeyLookupMode, Store, TrieTraversalItem};
Expand Down Expand Up @@ -98,13 +98,14 @@ fn wait_for_flat_storage_creation(

thread::sleep(Duration::from_secs(1));
}
let status = store_helper::get_flat_storage_status(&store, shard_uid);
let flat_storage_manager = get_flat_storage_manager(&env);
let status = flat_storage_manager.get_flat_storage_status(shard_uid);
assert_matches!(
status,
FlatStorageStatus::Ready(_),
"Client couldn't create flat storage until block {next_height}, status: {status:?}"
);
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_some());
assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_some());

// We don't expect any forks in the chain after flat storage head, so the number of
// deltas stored on DB should be exactly 2, as there are only 2 blocks after
Expand Down Expand Up @@ -159,7 +160,7 @@ fn test_flat_storage_creation_sanity() {
);
}

env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap();
}

// Create new chain and runtime using the same store. It should produce next blocks normally, but now it should
Expand All @@ -168,7 +169,7 @@ fn test_flat_storage_creation_sanity() {
for height in START_HEIGHT..START_HEIGHT + 2 {
env.produce_block(0, height);
}
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());

assert_eq!(store_helper::get_flat_storage_status(&store, shard_uid), FlatStorageStatus::Empty);
assert!(!env.clients[0].run_flat_storage_creation_step().unwrap());
Expand Down Expand Up @@ -241,17 +242,17 @@ fn test_flat_storage_creation_two_shards() {
);
}

env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uids[0]).unwrap();
get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uids[0]).unwrap();
}

// Check that flat storage is not ready for shard 0 but ready for shard 1.
let mut env = setup_env(&genesis, store.clone());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uids[0]).is_none());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[0]).is_none());
assert_matches!(
store_helper::get_flat_storage_status(&store, shard_uids[0]),
FlatStorageStatus::Empty
);
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uids[1]).is_some());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[1]).is_some());
assert_matches!(
store_helper::get_flat_storage_status(&store, shard_uids[1]),
FlatStorageStatus::Ready(_)
Expand Down Expand Up @@ -342,7 +343,7 @@ fn test_flat_storage_creation_start_from_state_part() {

// Re-create runtime, check that flat storage is not created yet.
let mut env = setup_env(&genesis, store);
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());

// Run chain for a couple of blocks and check that flat storage for shard 0 is eventually created.
let next_height = wait_for_flat_storage_creation(&mut env, START_HEIGHT, shard_uid, true);
Expand Down Expand Up @@ -381,10 +382,10 @@ fn test_catchup_succeeds_even_if_no_new_blocks() {
env.produce_block(0, height);
}
// Remove flat storage.
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap();
}
let mut env = setup_env(&genesis, store.clone());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());
assert_eq!(store_helper::get_flat_storage_status(&store, shard_uid), FlatStorageStatus::Empty);
// Create 3 more blocks (so that the deltas are generated) - and assume that no new blocks are received.
// In the future, we should also support the scenario where no new blocks are created.
Expand Down Expand Up @@ -507,3 +508,7 @@ fn test_not_supported_block() {
// For the second result chunk view is valid, so result is Ok.
assert_matches!(get_ref_results[1], Ok(Some(_)));
}

fn get_flat_storage_manager(env: &TestEnv) -> FlatStorageManager {
env.clients[0].chain.runtime_adapter.get_flat_storage_manager().unwrap()
}
Loading

0 comments on commit d6d350d

Please sign in to comment.