Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Mar 26, 2024
1 parent c9f1098 commit 6eac8cd
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 39 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ impl RuntimeAdapter for NightshadeRuntime {
shard_uid: &ShardUId,
state_root: &StateRoot,
) -> Result<(), StorageError> {
if !self.get_tries().trie_config().load_mem_trie_for_tracked_shards {
if !self.get_tries().trie_config().load_mem_tries_for_tracked_shards {
return Ok(());
}
// It should not happen that memtrie is already loaded for a shard
Expand Down
29 changes: 28 additions & 1 deletion chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use near_chain::{types::EpochManagerAdapter, validate::validate_chunk_proofs, Chain, ChainStore};
use near_chain::{
types::EpochManagerAdapter, validate::validate_chunk_proofs, BlockHeader, Chain, ChainStore,
};
use near_chunks_primitives::Error;
use near_epoch_manager::shard_tracker::ShardTracker;
use near_primitives::{
Expand Down Expand Up @@ -46,6 +48,31 @@ pub fn cares_about_shard_this_or_next_epoch(
|| shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me)
}

pub fn get_shards_cares_about_this_or_next_epoch(
account_id: Option<&AccountId>,
is_me: bool,
block_header: &BlockHeader,
shard_tracker: &ShardTracker,
epoch_manager: &dyn EpochManagerAdapter,
) -> Vec<ShardId> {
let parent_hash = *block_header.prev_hash();
let epoch_id = block_header.epoch_id().clone();
epoch_manager
.shard_ids(&epoch_id)
.unwrap()
.into_iter()
.filter(|&shard_id| {
cares_about_shard_this_or_next_epoch(
account_id,
&parent_hash,
shard_id,
is_me,
shard_tracker,
)
})
.collect()
}

pub fn chunk_needs_to_be_fetched_from_archival(
chunk_prev_block_hash: &CryptoHash,
header_head: &CryptoHash,
Expand Down
32 changes: 27 additions & 5 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use near_chain_configs::{ClientConfig, LogSummaryStyle, UpdateableClientConfig};
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::client::ShardedTransactionPool;
use near_chunks::logic::{
cares_about_shard_this_or_next_epoch, decode_encoded_chunk, persist_chunk,
cares_about_shard_this_or_next_epoch, decode_encoded_chunk, get_shards_cares_about_this_or_next_epoch, persist_chunk
};
use near_chunks::ShardsManager;
use near_client_primitives::debug::ChunkProduction;
Expand Down Expand Up @@ -2340,13 +2340,15 @@ impl Client {
let _span = debug_span!(target: "sync", "run_catchup").entered();
let mut notify_state_sync = false;
let me = &self.validator_signer.as_ref().map(|x| x.validator_id().clone());

for (sync_hash, state_sync_info) in self.chain.chain_store().iterate_state_sync_infos()? {
assert_eq!(sync_hash, state_sync_info.epoch_tail_hash);
let network_adapter = self.network_adapter.clone();

let shards_to_split = self.get_shards_to_split(sync_hash, &state_sync_info, me)?;
let state_sync_timeout = self.config.state_sync_timeout;
let epoch_id = self.chain.get_block(&sync_hash)?.header().epoch_id().clone();
let block_header = self.chain.get_block(&sync_hash)?.header().clone();
let epoch_id = block_header.epoch_id();

let (state_sync, shards_to_split, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
Expand Down Expand Up @@ -2374,9 +2376,29 @@ impl Client {
state_sync_info.shards.iter().map(|tuple| tuple.0).collect();
// Notify each shard to sync.
if notify_state_sync {
for shard_id in &tracking_shards {
let shard_uid =
self.epoch_manager.shard_id_to_uid(*shard_id, &epoch_id).unwrap();
let shard_layout = self
.epoch_manager
.get_shard_layout(&epoch_id)
.expect("Cannot get shard layout");

// Make sure mem-tries for shards we do not care about are unloaded before we start a new state sync.
let shards_cares_this_or_next_epoch = get_shards_cares_about_this_or_next_epoch(
me.as_ref(),
true,
&block_header,
&self.shard_tracker,
self.epoch_manager.as_ref(),
);
let shard_uids: Vec<_> = shards_cares_this_or_next_epoch
.iter()
.map(|id| {
self.epoch_manager.shard_id_to_uid(*id, &epoch_id).unwrap()
})
.collect();
self.runtime_adapter.retain_mem_tries(&shard_uids);

for &shard_id in &tracking_shards {
let shard_uid = ShardUId::from_shard_id_and_layout(shard_id, &shard_layout);
match self.state_sync_adapter.clone().read() {
Ok(sync_adapter) => sync_adapter.send(
shard_uid,
Expand Down
33 changes: 8 additions & 25 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use near_chain::{
use near_chain_configs::{ClientConfig, LogSummaryStyle};
use near_chain_primitives::error::EpochErrorResultToChainError;
use near_chunks::client::ShardsManagerResponse;
use near_chunks::logic::cares_about_shard_this_or_next_epoch;
use near_chunks::logic::get_shards_cares_about_this_or_next_epoch;
use near_client_primitives::types::{
Error, GetClientConfig, GetClientConfigError, GetNetworkInfo, NetworkInfoResponse,
StateSyncStatus, Status, StatusError, StatusSyncInfo, SyncStatus,
Expand Down Expand Up @@ -1543,30 +1543,13 @@ impl ClientActions {
unwrap_and_report!(self.client.chain.get_block_header(&sync_hash));
let prev_hash = *block_header.prev_hash();
let epoch_id = block_header.epoch_id().clone();
let shards_to_sync: Vec<_> = self
.client
.epoch_manager
.shard_ids(&epoch_id)
.unwrap()
.into_iter()
.filter(|&shard_id| {
cares_about_shard_this_or_next_epoch(
me.as_ref(),
&prev_hash,
shard_id,
true,
&self.client.shard_tracker,
)
})
.collect();
let shard_uids: Vec<_> = shards_to_sync
.iter()
.map(|id| {
self.client.epoch_manager.shard_id_to_uid(*id, &epoch_id).unwrap()
})
.collect();
// Make sure mem-tries can be loaded only for shards we care about this or next epoch.
self.client.runtime_adapter.retain_mem_tries(&shard_uids);
let shards_to_sync = get_shards_cares_about_this_or_next_epoch(
me.as_ref(),
true,
&block_header,
&self.client.shard_tracker,
self.client.epoch_manager.as_ref(),
);

let use_colour =
matches!(self.client.config.log_summary_style, LogSummaryStyle::Colored);
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct StoreConfig {
/// If true, load mem tries for all shards; this has priority over `load_mem_tries_for_shards`.
pub load_mem_tries_for_all_shards: bool,
/// If true, load mem trie for each shard being tracked.
pub load_mem_trie_for_tracked_shards: bool,
pub load_mem_tries_for_tracked_shards: bool,

/// Path where to create RocksDB checkpoints during database migrations or
/// `false` to disable that feature.
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Default for StoreConfig {
// requires more RAM and takes several minutes on startup.
load_mem_tries_for_shards: Default::default(),
load_mem_tries_for_all_shards: false,
load_mem_trie_for_tracked_shards: false,
load_mem_tries_for_tracked_shards: false,

migration_snapshot: Default::default(),

Expand Down
4 changes: 2 additions & 2 deletions core/store/src/trie/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct TrieConfig {
pub load_mem_tries_for_shards: Vec<ShardUId>,
pub load_mem_tries_for_all_shards: bool,
/// Whether mem-trie should be loaded for each tracked shard.
pub load_mem_trie_for_tracked_shards: bool,
pub load_mem_tries_for_tracked_shards: bool,
}

impl TrieConfig {
Expand All @@ -61,7 +61,7 @@ impl TrieConfig {
}
this.load_mem_tries_for_shards = config.load_mem_tries_for_shards.clone();
this.load_mem_tries_for_all_shards = config.load_mem_tries_for_all_shards;
this.load_mem_trie_for_tracked_shards = config.load_mem_trie_for_tracked_shards;
this.load_mem_tries_for_tracked_shards = config.load_mem_tries_for_tracked_shards;

this
}
Expand Down
10 changes: 8 additions & 2 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,16 @@ impl ShardTries {

/// Remove trie from memory for shards not included in the given list.
pub fn retain_mem_tries(&self, shard_uids: &[ShardUId]) {
info!(target: "memtrie", "Retaining memtries for shards {:?}...", shard_uids);
self.0.mem_tries.write().unwrap().retain(|shard_uid, _| shard_uids.contains(shard_uid));
info!(target: "memtrie", "Memtries retaining complete for shards {:?}", shard_uids);
}

/// Remove trie from memory for given shard.
pub fn unload_mem_trie(&self, shard_uid: &ShardUId) {
info!(target: "memtrie", "Unloading trie from memory for shard {:?}...", shard_uid);
self.0.mem_tries.write().unwrap().remove(shard_uid);
info!(target: "memtrie", "Memtrie unloading complete for shard {:?}", shard_uid);
}

/// Loads in-memory-trie for given shard and state root (if given).
Expand All @@ -399,8 +403,10 @@ impl ShardTries {
shard_uid: &ShardUId,
state_root: Option<StateRoot>,
) -> Result<(), StorageError> {
info!(target: "memtrie", "Loading trie to memory for shard {:?}...", shard_uid);
let mem_tries = load_trie_from_flat_state_and_delta(&self.0.store, *shard_uid, state_root)?;
self.0.mem_tries.write().unwrap().insert(*shard_uid, Arc::new(RwLock::new(mem_tries)));
info!(target: "memtrie", "Memtrie loading complete for shard {:?}", shard_uid);
Ok(())
}

Expand Down Expand Up @@ -725,7 +731,7 @@ mod test {
sweat_prefetch_senders: Vec::new(),
load_mem_tries_for_shards: Vec::new(),
load_mem_tries_for_all_shards: false,
load_mem_trie_for_tracked_shards: false,
load_mem_tries_for_tracked_shards: false,
};
let shard_uids = Vec::from([ShardUId::single_shard()]);
ShardTries::new(
Expand Down Expand Up @@ -846,7 +852,7 @@ mod test {
sweat_prefetch_senders: Vec::new(),
load_mem_tries_for_shards: Vec::new(),
load_mem_tries_for_all_shards: false,
load_mem_trie_for_tracked_shards: false,
load_mem_tries_for_tracked_shards: false,
};
let shard_uids = Vec::from([ShardUId { shard_id: 0, version: 0 }]);
let shard_uid = *shard_uids.first().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/state_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl StateSnaptshotTestEnv {
sweat_prefetch_senders: Vec::new(),
load_mem_tries_for_shards: Vec::new(),
load_mem_tries_for_all_shards: false,
load_mem_trie_for_tracked_shards: false,
load_mem_tries_for_tracked_shards: false,
};
let flat_storage_manager = FlatStorageManager::new(store.clone());
let shard_uids = [ShardUId::single_shard()];
Expand Down

0 comments on commit 6eac8cd

Please sign in to comment.