Skip to content

Commit

Permalink
feat(resharding): make flat storage shard catchup cancellable (#12700)
Browse files Browse the repository at this point in the history
- Refactoring `shard_catchup_apply_deltas` to return an enum which is
more explicit than previous return type
- Now `shard_catchup_apply_deltas` gets cancelled correctly if
resharding is aborted
- Added a small simple test
  • Loading branch information
Trisfald authored Jan 9, 2025
1 parent 5b56156 commit a150221
Showing 1 changed file with 79 additions and 13 deletions.
92 changes: 79 additions & 13 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,24 @@ impl FlatStorageResharder {
info!(target: "resharding", ?shard_uid, "flat storage shard catchup task started");
let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid);
// Apply deltas and then create the flat storage.
let apply_result = self.shard_catchup_apply_deltas(shard_uid, chain_store, &metrics);
let Ok(res) = apply_result else {
error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!");
return FlatStorageReshardingTaskResult::Failed;
};
let Some((num_batches_done, flat_head)) = res else {
return FlatStorageReshardingTaskResult::Postponed;
let (num_batches_done, flat_head) = match self.shard_catchup_apply_deltas(
shard_uid,
chain_store,
&metrics,
) {
Ok(ShardCatchupApplyDeltasOutcome::Succeeded(num_batches_done, tip)) => {
(num_batches_done, tip)
}
Ok(ShardCatchupApplyDeltasOutcome::Cancelled) => {
return FlatStorageReshardingTaskResult::Cancelled;
}
Ok(ShardCatchupApplyDeltasOutcome::Postponed) => {
return FlatStorageReshardingTaskResult::Postponed;
}
Err(err) => {
error!(target: "resharding", ?shard_uid, ?err, "flat storage shard catchup delta application failed!");
return FlatStorageReshardingTaskResult::Failed;
}
};
match self.shard_catchup_finalize_storage(shard_uid, &flat_head, &metrics) {
Ok(_) => {
Expand Down Expand Up @@ -648,13 +659,16 @@ impl FlatStorageResharder {

/// Applies flat storage deltas in batches on a shard that is in catchup status.
///
/// Returns the number of delta batches applied and the final tip of the flat storage.
/// This function can either:
/// - Finish successfully, returning the number of delta batches applied and the final tip of the flat storage.
/// - Be cancelled
/// - Be postponed, to let other operations run on this intermediate state of flat storage (example state sync snapshot).
fn shard_catchup_apply_deltas(
&self,
shard_uid: ShardUId,
chain_store: &ChainStore,
metrics: &FlatStorageReshardingShardCatchUpMetrics,
) -> Result<Option<(usize, Tip)>, Error> {
) -> Result<ShardCatchupApplyDeltasOutcome, Error> {
// How many block heights of deltas are applied in a single commit.
let catch_up_blocks = self.resharding_config.get().catch_up_blocks;
// Delay between every batch.
Expand All @@ -680,7 +694,6 @@ impl FlatStorageResharder {
};

loop {
// TODO:(resharding): check self.controller.is_cancelled() here as well.
let _span = tracing::debug_span!(
target: "resharding",
"shard_catchup_apply_deltas/batch",
Expand All @@ -692,10 +705,10 @@ impl FlatStorageResharder {

// If we reached the desired new flat head, we can terminate the delta application step.
if is_flat_head_on_par_with_chain(&flat_head.hash, &chain_final_head) {
return Ok(Some((
return Ok(ShardCatchupApplyDeltasOutcome::Succeeded(
num_batches_done,
Tip::from_header(&chain_store.get_block_header(&flat_head.hash)?),
)));
));
}

let mut merged_changes = FlatStateChanges::default();
Expand Down Expand Up @@ -746,8 +759,12 @@ impl FlatStorageResharder {
metrics.set_head_height(flat_head.height);

if postpone {
return Ok(None);
return Ok(ShardCatchupApplyDeltasOutcome::Postponed);
}
if self.controller.is_cancelled() {
return Ok(ShardCatchupApplyDeltasOutcome::Cancelled);
}

// Sleep between batches in order to throttle resharding and leave some resource for the
// regular node operation.
std::thread::sleep(batch_delay);
Expand Down Expand Up @@ -1141,6 +1158,14 @@ enum FlatStorageReshardingTaskSchedulingStatus {
Postponed,
}

/// Outcome of the task that applies deltas during shard catchup.
enum ShardCatchupApplyDeltasOutcome {
/// Contains the number of delta batches applied and the final tip of the flat storage.
Succeeded(usize, Tip),
Cancelled,
Postponed,
}

/// Helps control the flat storage resharder background operations. This struct wraps
/// [ReshardingHandle] and gives better meaning request to stop any processing when applied to flat
/// storage. In flat storage resharding there's a slight difference between interrupt and cancel.
Expand Down Expand Up @@ -1719,6 +1744,47 @@ mod tests {
}
}

#[test]
fn cancel_shard_catchup() {
init_test_logger();
let (chain, resharder, sender) =
create_chain_resharder_sender::<DelayedSender>(simple_shard_layout());
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);

// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());
let (_, split_params) = resharder.get_parent_shard_and_split_params().unwrap();
let ParentSplitParameters { left_child_shard, right_child_shard, .. } = split_params;

// Run the split task.
assert_matches!(
sender.call_split_shard_task(),
FlatStorageReshardingTaskResult::Successful { num_batches_done: _ }
);

// Cancel resharding.
resharder.controller.handle.stop();

// Run the catchup tasks.
assert_eq!(
sender.call_shard_catchup_tasks(),
vec![
FlatStorageReshardingTaskResult::Cancelled,
FlatStorageReshardingTaskResult::Cancelled
]
);

// Check that resharding was effectively cancelled.
let flat_store = resharder.runtime.store().flat_store();
for child_shard in [left_child_shard, right_child_shard] {
assert_matches!(
flat_store.get_flat_storage_status(child_shard),
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(_)))
);
}
}

/// A shard can't be split if it isn't in ready state.
#[test]
fn reject_split_shard_if_parent_is_not_ready() {
Expand Down

0 comments on commit a150221

Please sign in to comment.