Skip to content

Commit

Permalink
feat: single shard tracking State cleanup (#12734)
Browse files Browse the repository at this point in the history
Cleanup parent shard State if neither child was tracked for GC window
epochs.
That also implements shards garbage collection in general, see
#11883.

TODO: add more tests

#### Summary
* We cleanup the unused shards State with the delay of GC window epochs.
One reason is that GC modifies the State, and removing the State earlier
would result at least in negative refcounts, if not more serious
problems.
* For that, we need to know if a shard was not tracked since GC window
epochs. One caveat is that validator operator could potentially changed
the validator key in this period, so we should not rely on the current
validator key (or even tracking config) to tell what shards were tracked
in past.
* We use `TrieChanges` column, to determine what shards were tracked at
the given epoch. We rely on `TrieChanges` being saved to the last block
of an epoch, for all shards that were tracked at given epoch. TODO: add
a test that focuses on that
* The cleanup for shards is only triggered when we gc-ed the last block
of an epoch, always a final block and in canonical chain.
* For each shard that we cleaned up, we remove State mapping for it, as
the shard being deleted means we do not have the State for any
descendant shard too.
* Of course we should not remove the State of shards that are currently
tracked. And we do not remove State of shards that we care about in the
next epoch.

## Testing
GC num epochs to keep set to **3**.

#### Notation
**P** - parent shard
**C** - child shard
**U** - unrelated shard
Schedule: _(epoch before resharding) | (epoch after resharding) | ...
next epochs_

#### Tested scenarios
- P | C | U ... `test_resharding_v3_state_cleanup`
- P | U ... `test_resharding_v3_do_not_track_children_after_resharding`
- P | C | U | U | U | U | U | C ...
`test_resharding_v3_stop_track_child_for_5_epochs` (in the end we do not
map to parent)
- P | C1 | U | U | C2 | U | U | C1 ...
`test_resharding_v3_stop_track_child_for_5_epochs_with_sibling_in_between`
(in the end we map to parent)
- P | U | C ... `test_resharding_v3_shard_shuffling_untrack_then_track`
- U | U | C ... `test_resharding_v3_sync_child`
  • Loading branch information
staffik authored Jan 17, 2025
1 parent be90285 commit 98a2235
Show file tree
Hide file tree
Showing 21 changed files with 449 additions and 143 deletions.
201 changes: 187 additions & 14 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ use std::{fmt, io};

use near_chain_configs::GCConfig;
use near_chain_primitives::Error;
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::get_block_shard_uid;
use near_primitives::state_sync::{StateHeaderKey, StatePartKey};
use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId};
use near_primitives::types::{
AccountId, BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId,
};
use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes};
use near_store::adapter::trie_store::get_shard_uid_mapping;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId, StoreUpdate};

use crate::types::RuntimeAdapter;
use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate};
Expand Down Expand Up @@ -41,10 +45,21 @@ impl fmt::Debug for GCMode {
/// TODO - the reset_data_pre_state_sync function seems to also be used in
/// production code. It's used in update_sync_status <- handle_sync_needed <- run_sync_step
impl Chain {
pub fn clear_data(&mut self, gc_config: &GCConfig) -> Result<(), Error> {
pub fn clear_data(
&mut self,
gc_config: &GCConfig,
me: Option<&AccountId>,
) -> Result<(), Error> {
let runtime_adapter = self.runtime_adapter.clone();
let epoch_manager = self.epoch_manager.clone();
self.mut_chain_store().clear_data(gc_config, runtime_adapter, epoch_manager)
let shard_tracker = self.shard_tracker.clone();
self.mut_chain_store().clear_data(
gc_config,
runtime_adapter,
epoch_manager,
&shard_tracker,
me,
)
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
Expand Down Expand Up @@ -81,7 +96,12 @@ impl ChainStore {
// 2. `clear_data()` runs GC process for all blocks from the Tail to GC Stop Height provided by Epoch Manager.
// 3. `clear_data()` executes separately:
// a. Forks Clearing runs for each height from Tail up to GC Stop Height.
// b. Canonical Chain Clearing from (Tail + 1) up to GC Stop Height.
// b. Canonical Chain Clearing (CCC) from (Tail + 1) up to GC Stop Height.
// i) After CCC for the last block of an epoch, we check what shards tracked in the epoch qualify for trie State cleanup.
// ii) A shard qualify for trie State cleanup, if we did not care about it up to the Head,
// and we won't care about it in the next epoch after the Head.
// iii) `gc_state()` handles trie State cleanup, and it uses current tracking config (`shard_tracker` and optional validator ID),
// to determine what shards we care about at the Head or in the next epoch after the Head.
// 4. Before actual clearing is started, Block Reference Map should be built.
// 5. `clear_data()` executes every time when block at new height is added.
// 6. In case of State Sync, State Sync Clearing happens.
Expand Down Expand Up @@ -137,6 +157,8 @@ impl ChainStore {
gc_config: &GCConfig,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
me: Option<&AccountId>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered();
let tries = runtime_adapter.get_tries();
Expand Down Expand Up @@ -206,19 +228,39 @@ impl ChainStore {
if prev_block_refcount > 1 {
// Block of `prev_hash` starts a Fork, stopping
break;
} else if prev_block_refcount == 1 {
debug_assert_eq!(blocks_current_height.len(), 1);
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;
} else {
}
if prev_block_refcount < 1 {
return Err(Error::GCError(
"block on canonical chain shouldn't have refcount 0".into(),
));
}
debug_assert_eq!(blocks_current_height.len(), 1);

// Do not clean up immediately, as we still need the State in order to run gc for this block.
let potential_shards_for_cleanup = get_potential_shards_for_cleanup(
&chain_store_update,
&epoch_manager,
shard_tracker,
block_hash,
)?;

chain_store_update.clear_block_data(
epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;

if let Some(potential_shards_for_cleanup) = potential_shards_for_cleanup {
gc_state(
&mut chain_store_update,
block_hash,
potential_shards_for_cleanup,
&epoch_manager,
shard_tracker,
me,
)?;
}
}
chain_store_update.update_tail(height)?;
chain_store_update.commit()?;
Expand Down Expand Up @@ -1014,3 +1056,134 @@ impl<'a> ChainStoreUpdate<'a> {
self.merge(store_update);
}
}

/// Returns shards that we tracked in an epoch, given a hash of the last block in the epoch.
/// The block has to be available, so this function has to be called before gc is run for the block.
///
/// Note that validator ID or shard tracking config could have change since the epoch passed,
/// so we have to rely on what is stored in the database to figure out tracked shards.
/// We rely on `TrieChanges` column to preserve what shards this node tracked at that time.
fn get_potential_shards_for_cleanup(
chain_store_update: &ChainStoreUpdate,
epoch_manager: &Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
block_hash: &CryptoHash,
) -> Result<Option<Vec<ShardUId>>, Error> {
if shard_tracker.tracks_all_shards()
|| !epoch_manager.is_last_block_in_finished_epoch(block_hash)?
{
return Ok(None);
}
let block = chain_store_update
.get_block(block_hash)
.expect("block data is not expected to be already cleaned");
let epoch_id = block.header().epoch_id();
let shard_layout = epoch_manager.get_shard_layout(epoch_id).expect("epoch id must exist");
let mut tracked_shards = vec![];
for shard_uid in shard_layout.shard_uids() {
if chain_store_update
.store()
.exists(DBCol::TrieChanges, &get_block_shard_uid(&block_hash, &shard_uid))?
{
tracked_shards.push(shard_uid);
}
}
Ok(Some(tracked_shards))
}

/// State cleanup for single shard tracking. Removes State of shards that are no longer in use.
///
/// It has to be run after we clear block data for the `last_block_hash_in_gced_epoch`.
/// `potential_shards_for_cleanup` are shards that were tracked in the gc-ed epoch,
/// and these are shards that we potentially no longer use and that can be cleaned up.
/// We do not clean up a shard if it has been tracked in any epoch later,
/// or we care about it in the current or the next epoch (relative to Head).
///
/// With ReshardingV3, we use State mapping (see DBCol::StateShardUIdMapping),
/// where each `ShardUId` is potentially mapped to its ancestor to get the database key prefix.
/// We only remove a shard State if all its descendants are ready to be cleaned up,
/// in which case, we also remove the mapping from `StateShardUIdMapping`.
fn gc_state(
chain_store_update: &mut ChainStoreUpdate,
last_block_hash_in_gced_epoch: &CryptoHash,
potential_shards_for_cleanup: Vec<ShardUId>,
epoch_manager: &Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
me: Option<&AccountId>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "gc_state").entered();
if potential_shards_for_cleanup.is_empty() || shard_tracker.tracks_all_shards() {
return Ok(());
}
let store = chain_store_update.store();
let mut potential_shards_to_cleanup: HashSet<ShardUId> = potential_shards_for_cleanup
.iter()
.map(|shard_uid| get_shard_uid_mapping(&store, *shard_uid))
.collect();

let last_block_hash = chain_store_update.head()?.last_block_hash;
let last_block_info = epoch_manager.get_block_info(&last_block_hash)?;
let current_shard_layout = epoch_manager.get_shard_layout(last_block_info.epoch_id())?;
// Do not clean up shards that we care about in the current or the next epoch.
// Most of the time, `potential_shards_to_cleanup` will become empty as we do not change tracked shards often.
for shard_uid in current_shard_layout.shard_uids() {
if !shard_tracker.cares_about_shard_this_or_next_epoch(
me,
last_block_info.prev_hash(),
shard_uid.shard_id(),
true,
) {
continue;
}
let mapped_shard_uid = get_shard_uid_mapping(&store, shard_uid);
potential_shards_to_cleanup.remove(&mapped_shard_uid);
}

let mut block_info = last_block_info;
loop {
if potential_shards_to_cleanup.is_empty() {
return Ok(());
}
let epoch_first_block_info =
epoch_manager.get_block_info(block_info.epoch_first_block())?;
let prev_epoch_last_block_hash = epoch_first_block_info.prev_hash();
if prev_epoch_last_block_hash == last_block_hash_in_gced_epoch {
break;
}
block_info = epoch_manager.get_block_info(prev_epoch_last_block_hash)?;
let shard_layout = epoch_manager.get_shard_layout(block_info.epoch_id())?;
// Do not clean up shards that were tracked in any epoch after the gc-ed epoch.
for shard_uid in shard_layout.shard_uids() {
if !store
.exists(DBCol::TrieChanges, &get_block_shard_uid(&block_info.hash(), &shard_uid))?
{
continue;
}
let mapped_shard_uid = get_shard_uid_mapping(&store, shard_uid);
potential_shards_to_cleanup.remove(&mapped_shard_uid);
}
}
let shards_to_cleanup = potential_shards_to_cleanup;

// Find ShardUId mappings to shards that we will clean up.
let mut shard_uid_mappings_to_remove = vec![];
for kv in store.iter_ser::<ShardUId>(DBCol::StateShardUIdMapping) {
let (child_shard_uid_bytes, parent_shard_uid) = kv?;
if shards_to_cleanup.contains(&parent_shard_uid) {
shard_uid_mappings_to_remove.push(child_shard_uid_bytes);
}
}

// Delete State of `shards_to_cleanup` and associated ShardUId mapping.
tracing::info!(target: "garbage_collection", ?shards_to_cleanup, ?shard_uid_mappings_to_remove, "state_cleanup");
let mut trie_store_update = store.trie_store().store_update();
for shard_uid_prefix in shards_to_cleanup {
trie_store_update.delete_shard_uid_prefixed_state(shard_uid_prefix);
}
let mut store_update: StoreUpdate = trie_store_update.into();
for child_shard_uid_bytes in shard_uid_mappings_to_remove {
store_update.delete(DBCol::StateShardUIdMapping, &child_shard_uid_bytes);
}
chain_store_update.merge(store_update);
Ok(())
}
23 changes: 14 additions & 9 deletions chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn gc_fork_common(simple_chains: Vec<SimpleChain>, max_changes: usize) {
}

// GC execution
chain1.clear_data(&GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() }).unwrap();
chain1.clear_data(&GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() }, None).unwrap();

let tries2 = get_chain_with_num_shards(Clock::real(), num_shards).runtime_adapter.get_tries();

Expand Down Expand Up @@ -624,11 +624,14 @@ fn test_fork_far_away_from_epoch_end() {

// GC execution
chain
.clear_data(&GCConfig {
gc_blocks_limit: 100,
gc_fork_clean_step: fork_clean_step,
..GCConfig::default()
})
.clear_data(
&GCConfig {
gc_blocks_limit: 100,
gc_fork_clean_step: fork_clean_step,
..GCConfig::default()
},
None,
)
.expect("Clear data failed");

// The run above would clear just the first 5 blocks from the beginning, but shouldn't clear any forks
Expand Down Expand Up @@ -670,7 +673,7 @@ fn test_fork_far_away_from_epoch_end() {
);
}
chain
.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() })
.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }, None)
.expect("Clear data failed");
// And now all these blocks should be safely removed.
for i in 6..50 {
Expand Down Expand Up @@ -707,7 +710,7 @@ fn test_clear_old_data() {
);
}

chain.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap();
chain.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }, None).unwrap();

for i in 0..=max_height {
println!("height = {} hash = {}", i, blocks[i].hash());
Expand Down Expand Up @@ -885,7 +888,9 @@ fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) {

for iter in 0..10 {
println!("ITERATION #{:?}", iter);
assert!(chain.clear_data(&GCConfig { gc_blocks_limit, ..GCConfig::default() }).is_ok());
assert!(chain
.clear_data(&GCConfig { gc_blocks_limit, ..GCConfig::default() }, None)
.is_ok());

// epoch didn't change so no data is garbage collected.
for i in 0..1000 {
Expand Down
20 changes: 3 additions & 17 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn need_receipt(
me: Option<&AccountId>,
shard_tracker: &ShardTracker,
) -> bool {
cares_about_shard_this_or_next_epoch(me, prev_block_hash, shard_id, true, shard_tracker)
shard_tracker.cares_about_shard_this_or_next_epoch(me, prev_block_hash, shard_id, true)
}

/// Returns true if we need this part to sign the block.
Expand All @@ -37,18 +37,6 @@ pub fn need_part(
Ok(Some(&epoch_manager.get_part_owner(&epoch_id, part_ord)?) == me)
}

pub fn cares_about_shard_this_or_next_epoch(
account_id: Option<&AccountId>,
parent_hash: &CryptoHash,
shard_id: ShardId,
is_me: bool,
shard_tracker: &ShardTracker,
) -> bool {
// TODO(robin-near): I think we only need the shard_tracker if is_me is false.
shard_tracker.care_about_shard(account_id, parent_hash, shard_id, is_me)
|| shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me)
}

pub fn get_shards_cares_about_this_or_next_epoch(
account_id: Option<&AccountId>,
is_me: bool,
Expand All @@ -61,12 +49,11 @@ pub fn get_shards_cares_about_this_or_next_epoch(
.unwrap()
.into_iter()
.filter(|&shard_id| {
cares_about_shard_this_or_next_epoch(
shard_tracker.cares_about_shard_this_or_next_epoch(
account_id,
block_header.prev_hash(),
shard_id,
is_me,
shard_tracker,
)
})
.collect()
Expand Down Expand Up @@ -131,12 +118,11 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>(
shard_tracker: &ShardTracker,
) -> PartialEncodedChunk {
let prev_block_hash = header.prev_block_hash();
let cares_about_shard = cares_about_shard_this_or_next_epoch(
let cares_about_shard = shard_tracker.cares_about_shard_this_or_next_epoch(
me,
prev_block_hash,
header.shard_id(),
true,
shard_tracker,
);
let parts = parts
.filter(|entry| {
Expand Down
Loading

0 comments on commit 98a2235

Please sign in to comment.