Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: single shard tracking State cleanup #12734

Merged
merged 10 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 187 additions & 14 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ use std::{fmt, io};

use near_chain_configs::GCConfig;
use near_chain_primitives::Error;
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::get_block_shard_uid;
use near_primitives::state_sync::{StateHeaderKey, StatePartKey};
use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId};
use near_primitives::types::{
AccountId, BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId,
};
use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes};
use near_store::adapter::trie_store::get_shard_uid_mapping;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId, StoreUpdate};

use crate::types::RuntimeAdapter;
use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate};
Expand Down Expand Up @@ -41,10 +45,21 @@ impl fmt::Debug for GCMode {
/// TODO - the reset_data_pre_state_sync function seems to also be used in
/// production code. It's used in update_sync_status <- handle_sync_needed <- run_sync_step
impl Chain {
pub fn clear_data(&mut self, gc_config: &GCConfig) -> Result<(), Error> {
pub fn clear_data(
&mut self,
gc_config: &GCConfig,
me: Option<&AccountId>,
) -> Result<(), Error> {
let runtime_adapter = self.runtime_adapter.clone();
let epoch_manager = self.epoch_manager.clone();
self.mut_chain_store().clear_data(gc_config, runtime_adapter, epoch_manager)
let shard_tracker = self.shard_tracker.clone();
self.mut_chain_store().clear_data(
gc_config,
runtime_adapter,
epoch_manager,
&shard_tracker,
me,
)
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
Expand Down Expand Up @@ -81,7 +96,12 @@ impl ChainStore {
// 2. `clear_data()` runs GC process for all blocks from the Tail to GC Stop Height provided by Epoch Manager.
// 3. `clear_data()` executes separately:
// a. Forks Clearing runs for each height from Tail up to GC Stop Height.
// b. Canonical Chain Clearing from (Tail + 1) up to GC Stop Height.
// b. Canonical Chain Clearing (CCC) from (Tail + 1) up to GC Stop Height.
// i) After CCC for the last block of an epoch, we check what shards tracked in the epoch qualify for trie State cleanup.
// ii) A shard qualify for trie State cleanup, if we did not care about it up to the Head,
// and we won't care about it in the next epoch after the Head.
// iii) `gc_state()` handles trie State cleanup, and it uses current tracking config (`shard_tracker` and optional validator ID),
// to determine what shards we care about at the Head or in the next epoch after the Head.
// 4. Before actual clearing is started, Block Reference Map should be built.
// 5. `clear_data()` executes every time when block at new height is added.
// 6. In case of State Sync, State Sync Clearing happens.
Expand Down Expand Up @@ -137,6 +157,8 @@ impl ChainStore {
gc_config: &GCConfig,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
me: Option<&AccountId>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered();
let tries = runtime_adapter.get_tries();
Expand Down Expand Up @@ -206,19 +228,39 @@ impl ChainStore {
if prev_block_refcount > 1 {
// Block of `prev_hash` starts a Fork, stopping
break;
} else if prev_block_refcount == 1 {
debug_assert_eq!(blocks_current_height.len(), 1);
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;
} else {
}
if prev_block_refcount < 1 {
return Err(Error::GCError(
"block on canonical chain shouldn't have refcount 0".into(),
));
}
debug_assert_eq!(blocks_current_height.len(), 1);

// Do not clean up immediately, as we still need the State in order to run gc for this block.
let potential_shards_for_cleanup = get_potential_shards_for_cleanup(
&chain_store_update,
&epoch_manager,
shard_tracker,
block_hash,
)?;

chain_store_update.clear_block_data(
epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;

if let Some(potential_shards_for_cleanup) = potential_shards_for_cleanup {
gc_state(
&mut chain_store_update,
block_hash,
potential_shards_for_cleanup,
&epoch_manager,
shard_tracker,
me,
)?;
}
}
chain_store_update.update_tail(height)?;
chain_store_update.commit()?;
Expand Down Expand Up @@ -1014,3 +1056,134 @@ impl<'a> ChainStoreUpdate<'a> {
self.merge(store_update);
}
}

/// Returns shards that we tracked in an epoch, given a hash of the last block in the epoch.
/// The block has to be available, so this function has to be called before gc is run for the block.
///
/// Note that validator ID or shard tracking config could have change since the epoch passed,
/// so we have to rely on what is stored in the database to figure out tracked shards.
/// We rely on `TrieChanges` column to preserve what shards this node tracked at that time.
fn get_potential_shards_for_cleanup(
chain_store_update: &ChainStoreUpdate,
epoch_manager: &Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
block_hash: &CryptoHash,
) -> Result<Option<Vec<ShardUId>>, Error> {
if shard_tracker.tracks_all_shards()
|| !epoch_manager.is_last_block_in_finished_epoch(block_hash)?
{
return Ok(None);
}
let block = chain_store_update
.get_block(block_hash)
.expect("block data is not expected to be already cleaned");
let epoch_id = block.header().epoch_id();
let shard_layout = epoch_manager.get_shard_layout(epoch_id).expect("epoch id must exist");
let mut tracked_shards = vec![];
for shard_uid in shard_layout.shard_uids() {
if chain_store_update
.store()
.exists(DBCol::TrieChanges, &get_block_shard_uid(&block_hash, &shard_uid))?
{
tracked_shards.push(shard_uid);
}
}
Ok(Some(tracked_shards))
}

/// State cleanup for single shard tracking. Removes State of shards that are no longer in use.
///
/// It has to be run after we clear block data for the `last_block_hash_in_gced_epoch`.
/// `potential_shards_for_cleanup` are shards that were tracked in the gc-ed epoch,
/// and these are shards that we potentially no longer use and that can be cleaned up.
/// We do not clean up a shard if it has been tracked in any epoch later,
/// or we care about it in the current or the next epoch (relative to Head).
///
/// With ReshardingV3, we use State mapping (see DBCol::StateShardUIdMapping),
/// where each `ShardUId` is potentially mapped to its ancestor to get the database key prefix.
/// We only remove a shard State if all its descendants are ready to be cleaned up,
/// in which case, we also remove the mapping from `StateShardUIdMapping`.
fn gc_state(
chain_store_update: &mut ChainStoreUpdate,
last_block_hash_in_gced_epoch: &CryptoHash,
potential_shards_for_cleanup: Vec<ShardUId>,
epoch_manager: &Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
me: Option<&AccountId>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "gc_state").entered();
if potential_shards_for_cleanup.is_empty() || shard_tracker.tracks_all_shards() {
return Ok(());
}
let store = chain_store_update.store();
let mut potential_shards_to_cleanup: HashSet<ShardUId> = potential_shards_for_cleanup
.iter()
.map(|shard_uid| get_shard_uid_mapping(&store, *shard_uid))
.collect();

let last_block_hash = chain_store_update.head()?.last_block_hash;
let last_block_info = epoch_manager.get_block_info(&last_block_hash)?;
let current_shard_layout = epoch_manager.get_shard_layout(last_block_info.epoch_id())?;
// Do not clean up shards that we care about in the current or the next epoch.
// Most of the time, `potential_shards_to_cleanup` will become empty as we do not change tracked shards often.
for shard_uid in current_shard_layout.shard_uids() {
if !shard_tracker.cares_about_shard_this_or_next_epoch(
me,
last_block_info.prev_hash(),
shard_uid.shard_id(),
true,
) {
continue;
}
let mapped_shard_uid = get_shard_uid_mapping(&store, shard_uid);
potential_shards_to_cleanup.remove(&mapped_shard_uid);
}

let mut block_info = last_block_info;
loop {
if potential_shards_to_cleanup.is_empty() {
return Ok(());
}
let epoch_first_block_info =
epoch_manager.get_block_info(block_info.epoch_first_block())?;
let prev_epoch_last_block_hash = epoch_first_block_info.prev_hash();
if prev_epoch_last_block_hash == last_block_hash_in_gced_epoch {
break;
}
block_info = epoch_manager.get_block_info(prev_epoch_last_block_hash)?;
let shard_layout = epoch_manager.get_shard_layout(block_info.epoch_id())?;
// Do not clean up shards that were tracked in any epoch after the gc-ed epoch.
for shard_uid in shard_layout.shard_uids() {
if !store
.exists(DBCol::TrieChanges, &get_block_shard_uid(&block_info.hash(), &shard_uid))?
{
continue;
}
let mapped_shard_uid = get_shard_uid_mapping(&store, shard_uid);
potential_shards_to_cleanup.remove(&mapped_shard_uid);
}
}
let shards_to_cleanup = potential_shards_to_cleanup;

// Find ShardUId mappings to shards that we will clean up.
let mut shard_uid_mappings_to_remove = vec![];
for kv in store.iter_ser::<ShardUId>(DBCol::StateShardUIdMapping) {
let (child_shard_uid_bytes, parent_shard_uid) = kv?;
if shards_to_cleanup.contains(&parent_shard_uid) {
shard_uid_mappings_to_remove.push(child_shard_uid_bytes);
}
}

// Delete State of `shards_to_cleanup` and associated ShardUId mapping.
tracing::info!(target: "garbage_collection", ?shards_to_cleanup, ?shard_uid_mappings_to_remove, "state_cleanup");
let mut trie_store_update = store.trie_store().store_update();
for shard_uid_prefix in shards_to_cleanup {
trie_store_update.delete_shard_uid_prefixed_state(shard_uid_prefix);
}
let mut store_update: StoreUpdate = trie_store_update.into();
for child_shard_uid_bytes in shard_uid_mappings_to_remove {
store_update.delete(DBCol::StateShardUIdMapping, &child_shard_uid_bytes);
}
chain_store_update.merge(store_update);
Ok(())
}
23 changes: 14 additions & 9 deletions chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn gc_fork_common(simple_chains: Vec<SimpleChain>, max_changes: usize) {
}

// GC execution
chain1.clear_data(&GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() }).unwrap();
chain1.clear_data(&GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() }, None).unwrap();

let tries2 = get_chain_with_num_shards(Clock::real(), num_shards).runtime_adapter.get_tries();

Expand Down Expand Up @@ -624,11 +624,14 @@ fn test_fork_far_away_from_epoch_end() {

// GC execution
chain
.clear_data(&GCConfig {
gc_blocks_limit: 100,
gc_fork_clean_step: fork_clean_step,
..GCConfig::default()
})
.clear_data(
&GCConfig {
gc_blocks_limit: 100,
gc_fork_clean_step: fork_clean_step,
..GCConfig::default()
},
None,
)
.expect("Clear data failed");

// The run above would clear just the first 5 blocks from the beginning, but shouldn't clear any forks
Expand Down Expand Up @@ -670,7 +673,7 @@ fn test_fork_far_away_from_epoch_end() {
);
}
chain
.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() })
.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }, None)
.expect("Clear data failed");
// And now all these blocks should be safely removed.
for i in 6..50 {
Expand Down Expand Up @@ -707,7 +710,7 @@ fn test_clear_old_data() {
);
}

chain.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap();
chain.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }, None).unwrap();

for i in 0..=max_height {
println!("height = {} hash = {}", i, blocks[i].hash());
Expand Down Expand Up @@ -885,7 +888,9 @@ fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) {

for iter in 0..10 {
println!("ITERATION #{:?}", iter);
assert!(chain.clear_data(&GCConfig { gc_blocks_limit, ..GCConfig::default() }).is_ok());
assert!(chain
.clear_data(&GCConfig { gc_blocks_limit, ..GCConfig::default() }, None)
.is_ok());

// epoch didn't change so no data is garbage collected.
for i in 0..1000 {
Expand Down
20 changes: 3 additions & 17 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn need_receipt(
me: Option<&AccountId>,
shard_tracker: &ShardTracker,
) -> bool {
cares_about_shard_this_or_next_epoch(me, prev_block_hash, shard_id, true, shard_tracker)
shard_tracker.cares_about_shard_this_or_next_epoch(me, prev_block_hash, shard_id, true)
}

/// Returns true if we need this part to sign the block.
Expand All @@ -37,18 +37,6 @@ pub fn need_part(
Ok(Some(&epoch_manager.get_part_owner(&epoch_id, part_ord)?) == me)
}

pub fn cares_about_shard_this_or_next_epoch(
account_id: Option<&AccountId>,
parent_hash: &CryptoHash,
shard_id: ShardId,
is_me: bool,
shard_tracker: &ShardTracker,
) -> bool {
// TODO(robin-near): I think we only need the shard_tracker if is_me is false.
shard_tracker.care_about_shard(account_id, parent_hash, shard_id, is_me)
|| shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me)
}

pub fn get_shards_cares_about_this_or_next_epoch(
account_id: Option<&AccountId>,
is_me: bool,
Expand All @@ -61,12 +49,11 @@ pub fn get_shards_cares_about_this_or_next_epoch(
.unwrap()
.into_iter()
.filter(|&shard_id| {
cares_about_shard_this_or_next_epoch(
shard_tracker.cares_about_shard_this_or_next_epoch(
account_id,
block_header.prev_hash(),
shard_id,
is_me,
shard_tracker,
)
})
.collect()
Expand Down Expand Up @@ -131,12 +118,11 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>(
shard_tracker: &ShardTracker,
) -> PartialEncodedChunk {
let prev_block_hash = header.prev_block_hash();
let cares_about_shard = cares_about_shard_this_or_next_epoch(
let cares_about_shard = shard_tracker.cares_about_shard_this_or_next_epoch(
me,
prev_block_hash,
header.shard_id(),
true,
shard_tracker,
);
let parts = parts
.filter(|entry| {
Expand Down
Loading
Loading