diff --git a/Cargo.lock b/Cargo.lock index cc140563ee6..ebc33b1821a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4164,6 +4164,7 @@ name = "near-chain" version = "0.0.0" dependencies = [ "actix", + "anyhow", "assert_matches", "borsh", "bytesize", diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 5c956a6a27a..1b544c5f313 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] actix.workspace = true +anyhow.workspace = true borsh.workspace = true bytesize.workspace = true chrono.workspace = true diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index d31d848cde4..b8029caa135 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3838,6 +3838,24 @@ impl Chain { ))) } + fn min_chunk_prev_height(&self, block: &Block) -> Result { + let mut ret = None; + for chunk in block.chunks().iter_raw() { + let prev_height = if chunk.prev_block_hash() == &CryptoHash::default() { + 0 + } else { + let prev_header = self.get_block_header(chunk.prev_block_hash())?; + prev_header.height() + }; + if let Some(min_height) = ret { + ret = Some(std::cmp::min(min_height, prev_height)); + } else { + ret = Some(prev_height); + } + } + Ok(ret.unwrap_or(0)) + } + /// Function to create or delete a snapshot if necessary. /// TODO: this function calls head() inside of start_process_block_impl(), consider moving this to be called right after HEAD gets updated fn process_snapshot(&mut self) -> Result<(), Error> { @@ -3847,6 +3865,7 @@ impl Chain { SnapshotAction::MakeSnapshot(prev_hash) => { let prev_block = self.get_block(&prev_hash)?; let prev_prev_hash = prev_block.header().prev_hash(); + let min_chunk_prev_height = self.min_chunk_prev_height(&prev_block)?; let epoch_height = self.epoch_manager.get_epoch_height_from_prev_block(prev_prev_hash)?; let shard_layout = @@ -3854,7 +3873,13 @@ impl Chain { let shard_uids = shard_layout.shard_uids().enumerate().collect(); let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback; - make_snapshot_callback(*prev_prev_hash, epoch_height, shard_uids, prev_block); + make_snapshot_callback( + *prev_prev_hash, + min_chunk_prev_height, + epoch_height, + shard_uids, + prev_block, + ); } SnapshotAction::DeleteSnapshot => { let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback; diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 272c81199d9..83bd481f3fa 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -27,9 +27,9 @@ use near_primitives::trie_key::trie_key_parsers::{ parse_account_id_from_contract_code_key, parse_account_id_from_contract_data_key, parse_account_id_from_received_data_key, parse_account_id_from_trie_key_with_separator, }; -use near_primitives::types::AccountId; #[cfg(feature = "test_features")] use near_primitives::types::BlockHeightDelta; +use near_primitives::types::{AccountId, BlockHeight}; use near_store::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; use near_store::adapter::StoreAdapter; use near_store::flat::{ @@ -62,8 +62,7 @@ use std::iter; /// [FlatStorageResharderController]. /// - In the case of event `Split` the state of flat storage will go back to what it was /// previously. -/// - Children shard catchup is a consequence of splitting a shard, not a resharding event on -/// its own. As such, it can't be manually cancelled. +/// - Children shard catchup can be cancelled and will resume from the point where it left. /// - Resilience to chain forks. /// - Resharding events will perform changes on the state only after their resharding block /// becomes final. @@ -155,16 +154,12 @@ impl FlatStorageResharder { self.clean_children_shards(&status)?; self.schedule_split_shard(parent_shard_uid, &status); } - FlatStorageReshardingStatus::CatchingUp(block_hash) => { + FlatStorageReshardingStatus::CatchingUp(_) => { info!(target: "resharding", ?shard_uid, ?status, "resuming flat storage shard catchup"); // Send a request to schedule the execution of `shard_catchup_task` for this shard. - self.sender.flat_storage_shard_catchup_sender.send( - FlatStorageShardCatchupRequest { - resharder: self.clone(), - shard_uid, - flat_head_block_hash: *block_hash, - }, - ); + self.sender + .flat_storage_shard_catchup_sender + .send(FlatStorageShardCatchupRequest { resharder: self.clone(), shard_uid }); } } Ok(()) @@ -316,10 +311,7 @@ impl FlatStorageResharder { /// /// Conceptually it simply copies each key-value pair from the parent shard to the correct /// child. This task may get cancelled or postponed. - pub fn split_shard_task( - &self, - chain_store: &ChainStore, - ) -> FlatStorageReshardingSchedulableTaskResult { + pub fn split_shard_task(&self, chain_store: &ChainStore) -> FlatStorageReshardingTaskResult { info!(target: "resharding", "flat storage shard split task execution"); // Make sure that the resharding block is final. @@ -336,11 +328,11 @@ impl FlatStorageResharder { self.cancel_scheduled_event(); error!(target: "resharding", "flat storage shard split task failed during scheduling!"); // TODO(resharding): return failed only if scheduling of all resharding blocks have failed. - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } FlatStorageReshardingTaskSchedulingStatus::Postponed => { info!(target: "resharding", "flat storage shard split task has been postponed"); - return FlatStorageReshardingSchedulableTaskResult::Postponed; + return FlatStorageReshardingTaskResult::Postponed; } }; @@ -348,7 +340,7 @@ impl FlatStorageResharder { { if self.adv_should_delay_task(&resharding_hash, chain_store) { info!(target: "resharding", "flat storage shard split task has been artificially postponed!"); - return FlatStorageReshardingSchedulableTaskResult::Postponed; + return FlatStorageReshardingTaskResult::Postponed; } } @@ -376,12 +368,12 @@ impl FlatStorageResharder { parent_shard: ShardUId, split_params: &ParentSplitParameters, metrics: &FlatStorageReshardingShardSplitMetrics, - ) -> FlatStorageReshardingSchedulableTaskResult { + ) -> FlatStorageReshardingTaskResult { self.set_resharding_event_execution_status(TaskExecutionStatus::Started); // Exit early if the task has already been cancelled. if self.controller.is_cancelled() { - return FlatStorageReshardingSchedulableTaskResult::Cancelled; + return FlatStorageReshardingTaskResult::Cancelled; } // Determines after how many bytes worth of key-values the process stops to commit changes @@ -403,7 +395,7 @@ impl FlatStorageResharder { Ok(iter) => iter, Err(err) => { error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_hash, ?err, "failed to build flat storage iterator"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } }; @@ -434,12 +426,12 @@ impl FlatStorageResharder { &split_params, ) { error!(target: "resharding", ?err, "failed to handle flat storage key"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } } Some(FlatStorageAndDeltaIterItem::Entry(Err(err))) => { error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } None => { iter_exhausted = true; @@ -450,7 +442,7 @@ impl FlatStorageResharder { // Make a pause to commit and check if the routine should stop. if let Err(err) = store_update.commit() { error!(target: "resharding", ?err, "failed to commit store update"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } num_batches_done += 1; @@ -459,10 +451,10 @@ impl FlatStorageResharder { // If `iter`` is exhausted we can exit after the store commit. if iter_exhausted { - return FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done }; + return FlatStorageReshardingTaskResult::Successful { num_batches_done }; } if self.controller.is_cancelled() { - return FlatStorageReshardingSchedulableTaskResult::Cancelled; + return FlatStorageReshardingTaskResult::Cancelled; } // Sleep between batches in order to throttle resharding and leave some resource for the @@ -484,7 +476,7 @@ impl FlatStorageResharder { parent_shard: ShardUId, split_params: ParentSplitParameters, metrics: &FlatStorageReshardingShardSplitMetrics, - task_status: FlatStorageReshardingSchedulableTaskResult, + task_status: FlatStorageReshardingTaskResult, ) { let ParentSplitParameters { left_child_shard, @@ -498,7 +490,7 @@ impl FlatStorageResharder { let mut store_update = flat_store.store_update(); match task_status { - FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { + FlatStorageReshardingTaskResult::Successful { .. } => { // Split shard completed successfully. // Parent flat storage can be deleted from the FlatStoreManager. // If FlatStoreManager has no reference to the shard, delete it manually. @@ -524,13 +516,12 @@ impl FlatStorageResharder { FlatStorageShardCatchupRequest { resharder: self.clone(), shard_uid: child_shard, - flat_head_block_hash: resharding_hash, }, ); } } - FlatStorageReshardingSchedulableTaskResult::Failed - | FlatStorageReshardingSchedulableTaskResult::Cancelled => { + FlatStorageReshardingTaskResult::Failed + | FlatStorageReshardingTaskResult::Cancelled => { // We got an error or a cancellation request. // Reset parent. store_update.set_flat_storage_status( @@ -542,7 +533,7 @@ impl FlatStorageResharder { store_update.remove_flat_storage(child_shard); } } - FlatStorageReshardingSchedulableTaskResult::Postponed => { + FlatStorageReshardingTaskResult::Postponed => { panic!("can't finalize processing of a postponed split task!"); } } @@ -612,18 +603,23 @@ impl FlatStorageResharder { pub fn shard_catchup_task( &self, shard_uid: ShardUId, - flat_head_block_hash: CryptoHash, chain_store: &ChainStore, ) -> FlatStorageReshardingTaskResult { - info!(target: "resharding", ?shard_uid, ?flat_head_block_hash, "flat storage shard catchup task started"); + // Exit early if the task has already been cancelled. + if self.controller.is_cancelled() { + return FlatStorageReshardingTaskResult::Cancelled; + } + info!(target: "resharding", ?shard_uid, "flat storage shard catchup task started"); let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid); // Apply deltas and then create the flat storage. - let apply_result = - self.shard_catchup_apply_deltas(shard_uid, flat_head_block_hash, chain_store, &metrics); - let Ok((num_batches_done, flat_head)) = apply_result else { + let apply_result = self.shard_catchup_apply_deltas(shard_uid, chain_store, &metrics); + let Ok(res) = apply_result else { error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!"); return FlatStorageReshardingTaskResult::Failed; }; + let Some((num_batches_done, flat_head)) = res else { + return FlatStorageReshardingTaskResult::Postponed; + }; match self.shard_catchup_finalize_storage(shard_uid, &flat_head, &metrics) { Ok(_) => { let task_status = FlatStorageReshardingTaskResult::Successful { num_batches_done }; @@ -639,16 +635,26 @@ impl FlatStorageResharder { } } + /// checks whether there's a snapshot in progress. Returns true if we've already applied all deltas up + /// to the desired snapshot height, and should no longer continue to give the state snapshot + /// code a chance to finish first. + fn coordinate_snapshot(&self, height: BlockHeight) -> bool { + let manager = self.runtime.get_flat_storage_manager(); + let Some(min_chunk_prev_height) = manager.snapshot_wanted() else { + return false; + }; + height >= min_chunk_prev_height + } + /// Applies flat storage deltas in batches on a shard that is in catchup status. /// /// Returns the number of delta batches applied and the final tip of the flat storage. fn shard_catchup_apply_deltas( &self, shard_uid: ShardUId, - mut flat_head_block_hash: CryptoHash, chain_store: &ChainStore, metrics: &FlatStorageReshardingShardCatchUpMetrics, - ) -> Result<(usize, Tip), Error> { + ) -> Result, Error> { // How many block heights of deltas are applied in a single commit. let catch_up_blocks = self.resharding_config.get().catch_up_blocks; // Delay between every batch. @@ -658,7 +664,24 @@ impl FlatStorageResharder { let mut num_batches_done: usize = 0; + let status = self + .runtime + .store() + .flat_store() + .get_flat_storage_status(shard_uid) + .map_err(|e| Into::::into(e))?; + let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + mut flat_head_block_hash, + )) = status + else { + return Err(Error::Other(format!( + "unexpected resharding catchup flat storage status for {}: {:?}", + shard_uid, &status + ))); + }; + loop { + // TODO:(resharding): check self.controller.is_cancelled() here as well. let _span = tracing::debug_span!( target: "resharding", "shard_catchup_apply_deltas/batch", @@ -670,15 +693,16 @@ impl FlatStorageResharder { // If we reached the desired new flat head, we can terminate the delta application step. if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) { - return Ok(( + return Ok(Some(( num_batches_done, Tip::from_header(&chain_store.get_block_header(&flat_head_block_hash)?), - )); + ))); } let mut merged_changes = FlatStateChanges::default(); let store = self.runtime.store().flat_store(); let mut store_update = store.store_update(); + let mut postpone = false; // Merge deltas from the next blocks until we reach chain final head. for _ in 0..catch_up_blocks { @@ -691,6 +715,10 @@ impl FlatStorageResharder { if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) { break; } + if self.coordinate_snapshot(height) { + postpone = true; + break; + } flat_head_block_hash = chain_store.get_next_block_hash(&flat_head_block_hash)?; if let Some(changes) = store .get_delta(shard_uid, flat_head_block_hash) @@ -715,6 +743,9 @@ impl FlatStorageResharder { num_batches_done += 1; metrics.set_head_height(chain_store.get_block_height(&flat_head_block_hash)?); + if postpone { + return Ok(None); + } // Sleep between batches in order to throttle resharding and leave some resource for the // regular node operation. std::thread::sleep(batch_delay); @@ -1081,19 +1112,11 @@ pub enum TaskExecutionStatus { NotStarted, } -/// Result of a simple flat storage resharding task. +/// Result of a schedulable flat storage resharding task. #[derive(Clone, Debug, Copy, Eq, PartialEq)] pub enum FlatStorageReshardingTaskResult { Successful { num_batches_done: usize }, Failed, -} - -/// Result of a schedulable flat storage resharding task. Extends [FlatStorageReshardingTaskResult] -/// with the option to cancel or postpone the task. -#[derive(Clone, Debug, Copy, Eq, PartialEq)] -pub enum FlatStorageReshardingSchedulableTaskResult { - Successful { num_batches_done: usize }, - Failed, Cancelled, Postponed, } @@ -1207,11 +1230,7 @@ mod tests { impl CanSend for SimpleSender { fn send(&self, msg: FlatStorageShardCatchupRequest) { - msg.resharder.shard_catchup_task( - msg.shard_uid, - msg.flat_head_block_hash, - &self.chain_store.lock().unwrap(), - ); + msg.resharder.shard_catchup_task(msg.shard_uid, &self.chain_store.lock().unwrap()); } } @@ -1240,7 +1259,7 @@ mod tests { } impl DelayedSender { - fn call_split_shard_task(&self) -> FlatStorageReshardingSchedulableTaskResult { + fn call_split_shard_task(&self) -> FlatStorageReshardingTaskResult { let request = self.split_shard_request.lock().unwrap(); request.as_ref().unwrap().resharder.split_shard_task(&self.chain_store.lock().unwrap()) } @@ -1251,11 +1270,9 @@ mod tests { .unwrap() .iter() .map(|request| { - request.resharder.shard_catchup_task( - request.shard_uid, - request.flat_head_block_hash, - &self.chain_store.lock().unwrap(), - ) + request + .resharder + .shard_catchup_task(request.shard_uid, &self.chain_store.lock().unwrap()) }) .collect() } @@ -1645,7 +1662,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that more than one batch has been processed. - let FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done } = + let FlatStorageReshardingTaskResult::Successful { num_batches_done } = sender.call_split_shard_task() else { assert!(false); @@ -1868,7 +1885,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 3 } ); // Validate integrity of children shards. @@ -2412,10 +2429,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Move the chain final head to the resharding block height (2). @@ -2429,7 +2443,7 @@ mod tests { // Trigger resharding again and now it should split the parent shard. assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 3 } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); } @@ -2461,10 +2475,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two blocks on top of the first block (simulate a fork). @@ -2481,10 +2492,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two additional blocks on the fork to make the resharding block (height 1) final. @@ -2498,7 +2506,7 @@ mod tests { // Now the second resharding event should take place. assert_matches!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { .. } + FlatStorageReshardingTaskResult::Successful { .. } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); @@ -2526,10 +2534,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); let (parent_shard, split_params) = resharder.get_parent_shard_and_split_params().unwrap(); let ParentSplitParameters { flat_head, .. } = split_params; - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); // Fork the chain before the resharding block and make it final, but don't update the // resharding block hash. @@ -2541,10 +2546,7 @@ mod tests { ); // Scheduling of the shard split should fail. - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Failed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Failed); assert!(resharder.resharding_event().is_none()); let flat_store = resharder.runtime.store().flat_store(); assert_eq!( diff --git a/chain/chain/src/resharding/resharding_actor.rs b/chain/chain/src/resharding/resharding_actor.rs index bc4a371dfe7..e92672b76d0 100644 --- a/chain/chain/src/resharding/resharding_actor.rs +++ b/chain/chain/src/resharding/resharding_actor.rs @@ -1,13 +1,11 @@ use super::types::{ FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest, }; -use crate::flat_storage_resharder::{ - FlatStorageResharder, FlatStorageReshardingSchedulableTaskResult, - FlatStorageReshardingTaskResult, -}; +use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageReshardingTaskResult}; use crate::ChainStore; use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; use near_async::messaging::{self, Handler, HandlerWithContext}; +use near_primitives::shard_layout::ShardUId; use near_primitives::types::BlockHeight; use near_store::Store; use time::Duration; @@ -29,20 +27,13 @@ impl HandlerWithContext for ReshardingActor { } } -impl Handler for ReshardingActor { - fn handle(&mut self, msg: FlatStorageShardCatchupRequest) { - match msg.resharder.shard_catchup_task( - msg.shard_uid, - msg.flat_head_block_hash, - &self.chain_store, - ) { - FlatStorageReshardingTaskResult::Successful { .. } => { - // All good. - } - FlatStorageReshardingTaskResult::Failed => { - panic!("impossible to recover from a flat storage shard catchup failure!") - } - } +impl HandlerWithContext for ReshardingActor { + fn handle( + &mut self, + msg: FlatStorageShardCatchupRequest, + ctx: &mut dyn DelayedActionRunner, + ) { + self.handle_flat_storage_catchup(msg.resharder, msg.shard_uid, ctx); } } @@ -66,16 +57,16 @@ impl ReshardingActor { // becomes final. If the resharding block is not yet final, the task will exit early with // `Postponed` status and it must be rescheduled. match resharder.split_shard_task(&self.chain_store) { - FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { + FlatStorageReshardingTaskResult::Successful { .. } => { // All good. } - FlatStorageReshardingSchedulableTaskResult::Failed => { + FlatStorageReshardingTaskResult::Failed => { panic!("impossible to recover from a flat storage split shard failure!") } - FlatStorageReshardingSchedulableTaskResult::Cancelled => { + FlatStorageReshardingTaskResult::Cancelled => { // The task has been cancelled. Nothing else to do. } - FlatStorageReshardingSchedulableTaskResult::Postponed => { + FlatStorageReshardingTaskResult::Postponed => { // The task must be retried later. ctx.run_later( "ReshardingActor FlatStorageSplitShard", @@ -87,4 +78,33 @@ impl ReshardingActor { } } } + + fn handle_flat_storage_catchup( + &self, + resharder: FlatStorageResharder, + shard_uid: ShardUId, + ctx: &mut dyn DelayedActionRunner, + ) { + match resharder.shard_catchup_task(shard_uid, &self.chain_store) { + FlatStorageReshardingTaskResult::Successful { .. } => { + // All good. + } + FlatStorageReshardingTaskResult::Failed => { + panic!("impossible to recover from a flat storage shard catchup failure!") + } + FlatStorageReshardingTaskResult::Cancelled => { + // The task has been cancelled. Nothing else to do. + } + FlatStorageReshardingTaskResult::Postponed => { + // The task must be retried later. + ctx.run_later( + "ReshardingActor FlatStorageCatchup", + Duration::milliseconds(1000), + move |act, ctx| { + act.handle_flat_storage_catchup(resharder, shard_uid, ctx); + }, + ); + } + } + } } diff --git a/chain/chain/src/resharding/types.rs b/chain/chain/src/resharding/types.rs index b042f3bab56..a6fae87d991 100644 --- a/chain/chain/src/resharding/types.rs +++ b/chain/chain/src/resharding/types.rs @@ -1,6 +1,5 @@ use crate::flat_storage_resharder::FlatStorageResharder; use near_async::messaging::Sender; -use near_primitives::hash::CryptoHash; use near_store::ShardUId; /// Represents a request to start the split of a parent shard flat storage into two children flat @@ -17,7 +16,6 @@ pub struct FlatStorageSplitShardRequest { pub struct FlatStorageShardCatchupRequest { pub resharder: FlatStorageResharder, pub shard_uid: ShardUId, - pub flat_head_block_hash: CryptoHash, } /// Represents a request to reload a Mem Trie for a shard after its Flat Storage resharding is diff --git a/chain/chain/src/state_snapshot_actor.rs b/chain/chain/src/state_snapshot_actor.rs index 62cb894c53c..02db1c085cd 100644 --- a/chain/chain/src/state_snapshot_actor.rs +++ b/chain/chain/src/state_snapshot_actor.rs @@ -1,11 +1,13 @@ -use near_async::messaging::{Actor, CanSend, Handler, Sender}; +use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; +use near_async::messaging::{Actor, CanSend, Handler, HandlerWithContext, Sender}; +use near_async::time::Duration; use near_async::{MultiSend, MultiSenderFrom}; use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; use near_performance_metrics_macros::perf; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; -use near_primitives::types::{EpochHeight, ShardIndex}; +use near_primitives::types::{BlockHeight, EpochHeight, ShardIndex}; use near_store::flat::FlatStorageManager; use near_store::ShardTries; use std::sync::Arc; @@ -44,13 +46,15 @@ pub struct DeleteAndMaybeCreateSnapshotRequest { #[derive(actix::Message)] #[rtype(result = "()")] pub struct CreateSnapshotRequest { - /// prev_hash of the last processed block. + /// equal to self.block.header().prev_hash() prev_block_hash: CryptoHash, + /// Min height of chunk.prev_block_hash() for each chunk in `block` + min_chunk_prev_height: BlockHeight, /// epoch height associated with prev_block_hash epoch_height: EpochHeight, /// Shards that need to be present in the snapshot. shard_indexes_and_uids: Vec<(ShardIndex, ShardUId)>, - /// Last block of the prev epoch. + /// prev block of the "sync_hash" block. block: Block, } @@ -59,6 +63,7 @@ impl std::fmt::Debug for CreateSnapshotRequest { f.debug_struct("CreateSnapshotRequest") .field("block_hash", self.block.hash()) .field("prev_block_hash", &self.prev_block_hash) + .field("min_chunk_prev_height", &self.min_chunk_prev_height) .field("epoch_height", &self.epoch_height) .field( "shard_uids", @@ -85,19 +90,78 @@ impl StateSnapshotActor { } } - pub fn handle_create_snapshot_request(&mut self, msg: CreateSnapshotRequest) { - tracing::debug!(target: "state_snapshot", ?msg); + /// Returns true if we shouldn't yet try to create a snapshot because a flat storage resharding + /// is in progress. + fn should_wait_for_resharding_split( + &self, + min_chunk_prev_height: BlockHeight, + shard_indexes_and_uids: &[(ShardIndex, ShardUId)], + ) -> anyhow::Result { + let shard_uids = shard_indexes_and_uids.iter().map(|(_idx, uid)| *uid); + let Some(min_height) = + self.flat_storage_manager.resharding_catchup_height_reached(shard_uids)? + else { + // No flat storage split + catchup is in progress, ok to proceed + return Ok(false); + }; + let Some(min_height) = min_height else { + // storage split + catchup is in progress and not all shards have reached the catchup phase yet. Can't proceed + return Ok(true); + }; + // Proceed if the catchup code is already reasonably close to being finished. This is not a correctness issue, + // as this line of code could just be replaced with Ok(false), and things would work. But in that case, if there are for + // some reason lots of deltas to apply (e.g. the sync hash is 1000s of blocks past the start of the epoch because of missed + // chunks), then we'll duplicate a lot of work that's being done by the resharding catchup code. So we might as well just + // come back later after most of that work has already been done. + Ok(min_height + 10 < min_chunk_prev_height) + } + + pub fn handle_create_snapshot_request( + &mut self, + msg: CreateSnapshotRequest, + ctx: &mut dyn DelayedActionRunner, + ) { + let should_wait = match self.should_wait_for_resharding_split( + msg.min_chunk_prev_height, + &msg.shard_indexes_and_uids, + ) { + Ok(s) => s, + Err(err) => { + tracing::error!(target: "state_snapshot", ?err, "State Snapshot Actor failed to check resharding status. Not making snapshot"); + return; + } + }; + // TODO: instead of resending the same message over and over, wait on a Condvar. + // This would require making testloop work with Condvars that normally are meant to be woken up by another thread + if should_wait { + tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Postpone CreateSnapshotRequest"); + ctx.run_later( + "ReshardingActor FlatStorageSplitShard", + Duration::seconds(1), + move |act, ctx| { + act.handle_create_snapshot_request(msg, ctx); + }, + ); + return; + } - let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block } = - msg; + tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Handle CreateSnapshotRequest"); + let CreateSnapshotRequest { + prev_block_hash, + epoch_height, + shard_indexes_and_uids, + block, + .. + } = msg; let res = self.tries.create_state_snapshot(prev_block_hash, &shard_indexes_and_uids, &block); // Unlocking flat state head can be done asynchronously in state_snapshot_actor. // The next flat storage update will bring flat storage to latest head. - if !self.flat_storage_manager.set_flat_state_updates_mode(true) { - tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to unlock flat state updates"); - } + // TODO(resharding): check what happens if two calls to want_snapshot() are made before this point, + // which can happen with short epochs if a state snapshot takes longer than the rest of the epoch to complete. + // TODO(resharding): this can actually be called sooner, just after the rocksdb checkpoint is made. + self.flat_storage_manager.snapshot_taken(); match res { Ok(res_shard_uids) => { let Some(res_shard_uids) = res_shard_uids else { @@ -126,10 +190,10 @@ impl Handler for StateSnapshotActor { } } -impl Handler for StateSnapshotActor { +impl HandlerWithContext for StateSnapshotActor { #[perf] - fn handle(&mut self, msg: CreateSnapshotRequest) { - self.handle_create_snapshot_request(msg) + fn handle(&mut self, msg: CreateSnapshotRequest, ctx: &mut dyn DelayedActionRunner) { + self.handle_create_snapshot_request(msg, ctx) } } @@ -142,7 +206,7 @@ pub struct StateSnapshotSenderForStateSnapshot { pub struct StateSnapshotSenderForClient(Sender); type MakeSnapshotCallback = Arc< - dyn Fn(CryptoHash, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> () + dyn Fn(CryptoHash, BlockHeight, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> () + Send + Sync + 'static, @@ -156,28 +220,38 @@ pub struct SnapshotCallbacks { } /// Sends a request to make a state snapshot. +// TODO: remove the `prev_block_hash` argument. It's just block.header().prev_hash() pub fn get_make_snapshot_callback( sender: StateSnapshotSenderForClient, flat_storage_manager: FlatStorageManager, ) -> MakeSnapshotCallback { - Arc::new(move |prev_block_hash, epoch_height, shard_indexes_and_uids, block| { - tracing::info!( + Arc::new( + move |prev_block_hash, + min_chunk_prev_height, + epoch_height, + shard_indexes_and_uids, + block| { + tracing::info!( target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "make_snapshot_callback sends `DeleteAndMaybeCreateSnapshotRequest` to state_snapshot_addr"); - // We need to stop flat head updates synchronously in the client thread. - // Async update in state_snapshot_actor and potentially lead to flat head progressing beyond prev_block_hash - if !flat_storage_manager.set_flat_state_updates_mode(false) { - tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to lock flat state updates"); - return; - } - let create_snapshot_request = - CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block }; - sender.send(DeleteAndMaybeCreateSnapshotRequest { - create_snapshot_request: Some(create_snapshot_request), - }); - }) + // We need to stop flat head updates synchronously in the client thread. + // Async update in state_snapshot_actor can potentially lead to flat head progressing beyond prev_block_hash + // This also prevents post-resharding flat storage catchup from advancing past `prev_block_hash` + flat_storage_manager.want_snapshot(min_chunk_prev_height); + let create_snapshot_request = CreateSnapshotRequest { + prev_block_hash, + min_chunk_prev_height, + epoch_height, + shard_indexes_and_uids, + block, + }; + sender.send(DeleteAndMaybeCreateSnapshotRequest { + create_snapshot_request: Some(create_snapshot_request), + }); + }, + ) } /// Sends a request to delete a state snapshot. diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index cbf0adc50c6..351b4c4a15c 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -588,7 +588,7 @@ impl TestEnvBuilder { None => TEST_SEED, }; let tries = runtime.get_tries(); - let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| { + let make_snapshot_callback = Arc::new(move |prev_block_hash, _min_chunk_prev_height, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| { tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback"); tries.delete_state_snapshot(); tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap(); diff --git a/core/store/src/adapter/chunk_store.rs b/core/store/src/adapter/chunk_store.rs index e3b8a3cf294..f1d84bd11cc 100644 --- a/core/store/src/adapter/chunk_store.rs +++ b/core/store/src/adapter/chunk_store.rs @@ -13,8 +13,8 @@ pub struct ChunkStoreAdapter { } impl StoreAdapter for ChunkStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/adapter/flat_store.rs b/core/store/src/adapter/flat_store.rs index 322f4dd09cc..d5200e038f2 100644 --- a/core/store/src/adapter/flat_store.rs +++ b/core/store/src/adapter/flat_store.rs @@ -20,8 +20,8 @@ pub struct FlatStoreAdapter { } impl StoreAdapter for FlatStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/adapter/mod.rs b/core/store/src/adapter/mod.rs index ba31d691775..fe975dc11c9 100644 --- a/core/store/src/adapter/mod.rs +++ b/core/store/src/adapter/mod.rs @@ -85,7 +85,11 @@ impl Into for StoreUpdateHolder<'static> { /// Simple adapter wrapper on top of Store to provide a more ergonomic interface for different store types. /// We provide simple inter-convertibility between different store types like FlatStoreAdapter and TrieStoreAdapter. pub trait StoreAdapter { - fn store(&self) -> Store; + fn store_ref(&self) -> &Store; + + fn store(&self) -> Store { + self.store_ref().clone() + } fn trie_store(&self) -> trie_store::TrieStoreAdapter { trie_store::TrieStoreAdapter::new(self.store()) diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index e01a4232e4b..bedec4578ea 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -18,8 +18,8 @@ pub struct TrieStoreAdapter { } impl StoreAdapter for TrieStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/flat/chunk_view.rs b/core/store/src/flat/chunk_view.rs index d704e2a013c..edff03ba670 100644 --- a/core/store/src/flat/chunk_view.rs +++ b/core/store/src/flat/chunk_view.rs @@ -47,6 +47,8 @@ impl FlatStorageChunkView { self.flat_storage.contains_key(&self.block_hash, key) } + // TODO: this should be changed to check the values that haven't yet been applied, like in get_value() and contains_key(), + // because otherwise we're iterating over old state that might have been updated by `self.block_hash` pub fn iter_range(&self, from: Option<&[u8]>, to: Option<&[u8]>) -> FlatStateIterator { self.store.iter_range(self.flat_storage.shard_uid(), from, to) } diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index 47168512acb..31cb9a93e39 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -1,5 +1,10 @@ use crate::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; -use crate::flat::{BlockInfo, FlatStorageReadyStatus, FlatStorageStatus, POISONED_LOCK_ERR}; +use crate::flat::{ + BlockInfo, FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, + POISONED_LOCK_ERR, +}; +use crate::{DBCol, StoreAdapter}; +use near_primitives::block_header::BlockHeader; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; @@ -30,11 +35,18 @@ pub struct FlatStorageManagerInner { /// this epoch can share the same `head` and `tail`, similar for shards for the next epoch, /// but such overhead is negligible comparing the delta sizes, so we think it's ok. flat_storages: Mutex>, + /// Set to Some() when there's a state snapshot in progress. Used to signal to the resharding flat + /// storage catchup code that it shouldn't advance past this block height + want_snapshot: Mutex>, } impl FlatStorageManager { pub fn new(store: FlatStoreAdapter) -> Self { - Self(Arc::new(FlatStorageManagerInner { store, flat_storages: Default::default() })) + Self(Arc::new(FlatStorageManagerInner { + store, + flat_storages: Default::default(), + want_snapshot: Default::default(), + })) } /// When a node starts from an empty database, this function must be called to ensure @@ -66,8 +78,14 @@ impl FlatStorageManager { /// and resharding. pub fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> { tracing::debug!(target: "store", ?shard_uid, "Creating flat storage for shard"); + let want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + let disable_updates = want_snapshot.is_some(); + let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); let flat_storage = FlatStorage::new(self.0.store.clone(), shard_uid)?; + if disable_updates { + flat_storage.set_flat_head_update_mode(false); + } let original_value = flat_storages.insert(shard_uid, flat_storage); if original_value.is_some() { // Generally speaking this shouldn't happen. It may only happen when @@ -81,6 +99,67 @@ impl FlatStorageManager { Ok(()) } + fn read_block_info(&self, hash: &CryptoHash) -> Result { + let header = self + .0 + .store + .store_ref() + .get_ser::(DBCol::BlockHeader, hash.as_ref()) + .map_err(|e| { + StorageError::StorageInconsistentState(format!( + "could not read block header {}: {:?}", + hash, e + )) + })? + .ok_or_else(|| { + StorageError::StorageInconsistentState(format!("block header {} not found", hash)) + })?; + Ok(BlockInfo { + hash: *header.hash(), + prev_hash: *header.prev_hash(), + height: header.height(), + }) + } + + /// Sets the status to `Ready` if it's currently `Resharding(CatchingUp)` + fn mark_flat_storage_ready(&self, shard_uid: ShardUId) -> Result<(), StorageError> { + // Don't use Self::get_flat_storage_status() because there's no need to panic if this fails, since this is used + // during state snapshotting where an error isn't critical to node operation. + let status = self.0.store.get_flat_storage_status(shard_uid)?; + let catchup_flat_head = match status { + FlatStorageStatus::Ready(_) => return Ok(()), + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(flat_head)) => { + flat_head + } + _ => { + return Err(StorageError::StorageInconsistentState(format!( + "Unexpected flat storage status: {:?}", + &status + ))) + } + }; + let flat_head = self.read_block_info(&catchup_flat_head)?; + let mut store_update = self.0.store.store_update(); + store_update.set_flat_storage_status( + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }), + ); + // TODO: Consider adding a StorageError::IO variant? + store_update.commit().map_err(|_| StorageError::StorageInternalError)?; + Ok(()) + } + + // If the flat storage status is Resharding(CatchingUp), sets it to Ready(), and then calls create_flat_storage_for_shard() + // This is used in creating state snapshots when this might be a flat storage that is in the middle of catchup, and that + // should now be considered `Ready` in the state snapshot, even if not in the main DB. + pub fn mark_ready_and_create_flat_storage( + &self, + shard_uid: ShardUId, + ) -> Result<(), StorageError> { + self.mark_flat_storage_ready(shard_uid)?; + self.create_flat_storage_for_shard(shard_uid) + } + /// Update flat storage for given processed or caught up block, which includes: /// - merge deltas from current flat storage head to new one given in /// `new_flat_head`; @@ -220,39 +299,66 @@ impl FlatStorageManager { } } - /// Updates `move_head_enabled` for all shards and returns whether it succeeded. - /// If at least one of the shards fails to update move_head_enabled, then that operation is rolled back for all shards. - /// - /// Rollbacks should work, because we assume that this function is the only - /// entry point to locking/unlocking flat head updates in a system with - /// multiple FlatStorages running in parallel. - pub fn set_flat_state_updates_mode(&self, enabled: bool) -> bool { + /// Returns None if there's no resharding flat storage split in progress + /// If there is, returns Some(None) if there's at least one child shard that hasn't been split and had its + /// status set to `CatchingUp`. If they've all been split already and are in the catchup phase, + /// returns the lowest height among all shards that resharding catchup has advanced to. + pub fn resharding_catchup_height_reached( + &self, + shard_uids: impl Iterator, + ) -> Result>, StorageError> { + let mut ret = None; + for shard_uid in shard_uids { + match self.0.store.get_flat_storage_status(shard_uid)? { + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + catchup_flat_head, + )) => { + let flat_head = self.read_block_info(&catchup_flat_head)?; + if let Some(Some(min_height)) = ret { + ret = Some(Some(std::cmp::min(min_height, flat_head.height))); + } else { + ret = Some(Some(flat_head.height)); + } + } + FlatStorageStatus::Resharding(_) => return Ok(Some(None)), + _ => {} + }; + } + Ok(ret) + } + + /// Should be called when we want to take a state snapshot. Disallows flat head updates, and signals to any resharding + /// flat storage code that it should not advance beyond this hash + pub fn want_snapshot(&self, min_chunk_prev_height: BlockHeight) { + { + let mut want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot = Some(min_chunk_prev_height); + } let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); - let mut all_updated = true; - let mut updated_flat_storages = vec![]; - let mut updated_shard_uids = vec![]; - for (shard_uid, flat_storage) in flat_storages.iter() { - if flat_storage.set_flat_head_update_mode(enabled) { - updated_flat_storages.push(flat_storage); - updated_shard_uids.push(shard_uid); - } else { - all_updated = false; - tracing::error!(target: "store", rolling_back_shards = ?updated_shard_uids, enabled, ?shard_uid, "Locking/Unlocking of flat head updates failed for shard. Reverting."); - break; - } + for flat_storage in flat_storages.values() { + flat_storage.set_flat_head_update_mode(false); } - if all_updated { - tracing::debug!(target: "store", enabled, "Locking/Unlocking of flat head updates succeeded"); - true - } else { - // Do rollback. - // It does allow for a data race if somebody updates move_head_enabled on individual shards. - // The assumption is that all shards get locked/unlocked at the same time. - for flat_storage in updated_flat_storages { - flat_storage.set_flat_head_update_mode(!enabled); - } - tracing::error!(target: "store", enabled, "Locking/Unlocking of flat head updates failed"); - false + tracing::debug!(target: "store", "Locked flat head updates"); + } + + /// Should be called when we're done taking a state snapshot. Allows flat head updates, and signals to any resharding + /// flat storage code that it can advance now. + pub fn snapshot_taken(&self) { + { + let mut want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot = None; } + let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); + for flat_storage in flat_storages.values() { + flat_storage.set_flat_head_update_mode(true); + } + tracing::debug!(target: "store", "Unlocked flat head updates"); + } + + // Returns Some() if a state snapshot should be taken, and therefore any resharding flat storage code should not advance + // past the given hash + pub fn snapshot_wanted(&self) -> Option { + let want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot } } diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index c24917782d6..8db1b72ea6f 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -492,14 +492,9 @@ impl FlatStorage { } /// Updates `move_head_enabled` and returns whether the change was done. - pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) -> bool { + pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) { let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR); - if enabled != guard.move_head_enabled { - guard.move_head_enabled = enabled; - true - } else { - false - } + guard.move_head_enabled = enabled; } } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 7c1c288ba19..10579acda8a 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -117,8 +117,8 @@ pub struct Store { } impl StoreAdapter for Store { - fn store(&self) -> Store { - self.clone() + fn store_ref(&self) -> &Store { + self } } diff --git a/core/store/src/trie/state_snapshot.rs b/core/store/src/trie/state_snapshot.rs index edbff71b09a..77ff792aca0 100644 --- a/core/store/src/trie/state_snapshot.rs +++ b/core/store/src/trie/state_snapshot.rs @@ -90,7 +90,7 @@ impl StateSnapshot { tracing::debug!(target: "state_snapshot", ?shard_indexes_and_uids, ?prev_block_hash, "new StateSnapshot"); let mut included_shard_uids = vec![]; for &(shard_index, shard_uid) in shard_indexes_and_uids { - if let Err(err) = flat_storage_manager.create_flat_storage_for_shard(shard_uid) { + if let Err(err) = flat_storage_manager.mark_ready_and_create_flat_storage(shard_uid) { tracing::warn!(target: "state_snapshot", ?err, ?shard_uid, "Failed to create a flat storage for snapshot shard"); continue; } diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 0bda17d80bf..c04d89ba630 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -87,7 +87,13 @@ impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { let num_accounts = self.num_accounts.unwrap_or(8); let num_clients = self.num_clients.unwrap_or(3); - let epoch_length = self.epoch_length.unwrap_or(6); + // When there's a resharding task delay and single-shard tracking, the delay might be pushed out + // even further because the resharding task might have to wait for the state snapshot to be made + // before it can proceed, which might mean that flat storage won't be ready for the child shard for a whole epoch. + // So we extend the epoch length a bit in this case. + let epoch_length = self + .epoch_length + .unwrap_or_else(|| self.delay_flat_state_resharding.map_or(6, |delay| delay + 7)); // #12195 prevents number of BPs bigger than `epoch_length`. assert!(num_clients > 0 && num_clients <= epoch_length); @@ -968,9 +974,11 @@ fn test_resharding_v3_slower_post_processing_tasks() { } #[test] -// TODO(resharding): fix nearcore and change the ignore condition -// #[cfg_attr(not(feature = "test_features"), ignore)] -#[ignore] +// TODO(resharding): fix the fact that this test fails if the epoch length is set to 10, (and state sync +// is made to run before shard catchup) because set_state_finalize() sets flat storage state to +// ready before child catchup is done. Also fix the failure in +// check_state_shard_uid_mapping_after_resharding() if the epoch length is set to 11 +#[cfg_attr(not(feature = "test_features"), ignore)] fn test_resharding_v3_shard_shuffling_slower_post_processing_tasks() { let params = TestReshardingParametersBuilder::default() .shuffle_shard_assignment_for_chunk_producers(true)