diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 87d552ab0dd..03d05da4685 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -612,13 +612,24 @@ impl FlatStorageResharder { info!(target: "resharding", ?shard_uid, "flat storage shard catchup task started"); let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid); // Apply deltas and then create the flat storage. - let apply_result = self.shard_catchup_apply_deltas(shard_uid, chain_store, &metrics); - let Ok(res) = apply_result else { - error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!"); - return FlatStorageReshardingTaskResult::Failed; - }; - let Some((num_batches_done, flat_head)) = res else { - return FlatStorageReshardingTaskResult::Postponed; + let (num_batches_done, flat_head) = match self.shard_catchup_apply_deltas( + shard_uid, + chain_store, + &metrics, + ) { + Ok(ShardCatchupApplyDeltasOutcome::Succeeded(num_batches_done, tip)) => { + (num_batches_done, tip) + } + Ok(ShardCatchupApplyDeltasOutcome::Cancelled) => { + return FlatStorageReshardingTaskResult::Cancelled; + } + Ok(ShardCatchupApplyDeltasOutcome::Postponed) => { + return FlatStorageReshardingTaskResult::Postponed; + } + Err(err) => { + error!(target: "resharding", ?shard_uid, ?err, "flat storage shard catchup delta application failed!"); + return FlatStorageReshardingTaskResult::Failed; + } }; match self.shard_catchup_finalize_storage(shard_uid, &flat_head, &metrics) { Ok(_) => { @@ -648,13 +659,16 @@ impl FlatStorageResharder { /// Applies flat storage deltas in batches on a shard that is in catchup status. /// - /// Returns the number of delta batches applied and the final tip of the flat storage. + /// This function can either: + /// - Finish successfully, returning the number of delta batches applied and the final tip of the flat storage. + /// - Be cancelled + /// - Be postponed, to let other operations run on this intermediate state of flat storage (example state sync snapshot). fn shard_catchup_apply_deltas( &self, shard_uid: ShardUId, chain_store: &ChainStore, metrics: &FlatStorageReshardingShardCatchUpMetrics, - ) -> Result, Error> { + ) -> Result { // How many block heights of deltas are applied in a single commit. let catch_up_blocks = self.resharding_config.get().catch_up_blocks; // Delay between every batch. @@ -680,7 +694,6 @@ impl FlatStorageResharder { }; loop { - // TODO:(resharding): check self.controller.is_cancelled() here as well. let _span = tracing::debug_span!( target: "resharding", "shard_catchup_apply_deltas/batch", @@ -692,10 +705,10 @@ impl FlatStorageResharder { // If we reached the desired new flat head, we can terminate the delta application step. if is_flat_head_on_par_with_chain(&flat_head.hash, &chain_final_head) { - return Ok(Some(( + return Ok(ShardCatchupApplyDeltasOutcome::Succeeded( num_batches_done, Tip::from_header(&chain_store.get_block_header(&flat_head.hash)?), - ))); + )); } let mut merged_changes = FlatStateChanges::default(); @@ -746,8 +759,12 @@ impl FlatStorageResharder { metrics.set_head_height(flat_head.height); if postpone { - return Ok(None); + return Ok(ShardCatchupApplyDeltasOutcome::Postponed); + } + if self.controller.is_cancelled() { + return Ok(ShardCatchupApplyDeltasOutcome::Cancelled); } + // Sleep between batches in order to throttle resharding and leave some resource for the // regular node operation. std::thread::sleep(batch_delay); @@ -1141,6 +1158,14 @@ enum FlatStorageReshardingTaskSchedulingStatus { Postponed, } +/// Outcome of the task that applies deltas during shard catchup. +enum ShardCatchupApplyDeltasOutcome { + /// Contains the number of delta batches applied and the final tip of the flat storage. + Succeeded(usize, Tip), + Cancelled, + Postponed, +} + /// Helps control the flat storage resharder background operations. This struct wraps /// [ReshardingHandle] and gives better meaning request to stop any processing when applied to flat /// storage. In flat storage resharding there's a slight difference between interrupt and cancel. @@ -1719,6 +1744,47 @@ mod tests { } } + #[test] + fn cancel_shard_catchup() { + init_test_logger(); + let (chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); + + // Perform resharding. + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + let (_, split_params) = resharder.get_parent_shard_and_split_params().unwrap(); + let ParentSplitParameters { left_child_shard, right_child_shard, .. } = split_params; + + // Run the split task. + assert_matches!( + sender.call_split_shard_task(), + FlatStorageReshardingTaskResult::Successful { num_batches_done: _ } + ); + + // Cancel resharding. + resharder.controller.handle.stop(); + + // Run the catchup tasks. + assert_eq!( + sender.call_shard_catchup_tasks(), + vec![ + FlatStorageReshardingTaskResult::Cancelled, + FlatStorageReshardingTaskResult::Cancelled + ] + ); + + // Check that resharding was effectively cancelled. + let flat_store = resharder.runtime.store().flat_store(); + for child_shard in [left_child_shard, right_child_shard] { + assert_matches!( + flat_store.get_flat_storage_status(child_shard), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(_))) + ); + } + } + /// A shard can't be split if it isn't in ready state. #[test] fn reject_split_shard_if_parent_is_not_ready() {