From 0edc593d5117a16aba429722c96149e3c3bd1e32 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 22 Nov 2024 12:35:34 +0100 Subject: [PATCH] Implement kill and retry/cancel and retry functionality --- .../worker/src/partition/state_machine/mod.rs | 246 +++++++++++++++++- 1 file changed, 235 insertions(+), 11 deletions(-) diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 4d232e060f..fd3a6249b2 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -87,7 +87,7 @@ use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::RangeInclusive; use std::time::Instant; -use tracing::error; +use tracing::{error, info}; use utils::SpanExt; #[derive(Debug, Hash, enumset::EnumSetType, strum::Display)] @@ -852,12 +852,20 @@ impl StateMachine { ctx: &mut StateMachineApplyContext<'_, State>, InvocationTermination { invocation_id, - flavor: termination_flavor, + flavor, }: InvocationTermination, ) -> Result<(), Error> { - match termination_flavor { + match flavor { TerminationFlavor::Kill => self.on_kill_invocation(ctx, invocation_id).await, TerminationFlavor::Cancel => self.on_cancel_invocation(ctx, invocation_id).await, + TerminationFlavor::KillAndRestart => { + self.on_kill_and_restart_invocation(ctx, invocation_id) + .await + } + TerminationFlavor::CancelAndRestart => { + self.on_cancel_and_restart_invocation(ctx, invocation_id) + .await + } } } @@ -880,11 +888,11 @@ impl StateMachine { match status { InvocationStatus::Invoked(metadata) => { - self.kill_invoked_invocation(ctx, invocation_id, metadata) + self.kill_invoked_invocation(ctx, invocation_id, metadata, false) .await?; } InvocationStatus::Suspended { metadata, .. } => { - self.kill_suspended_invocation(ctx, invocation_id, metadata) + self.kill_suspended_invocation(ctx, invocation_id, metadata, false) .await?; } InvocationStatus::Inboxed(inboxed) => { @@ -1010,6 +1018,157 @@ impl StateMachine { Ok(()) } + async fn on_kill_and_restart_invocation< + State: VirtualObjectStatusTable + + InvocationStatusTable + + InboxTable + + FsmTable + + StateTable + + JournalTable + + OutboxTable + + TimerTable + + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + ) -> Result<(), Error> { + let status = ctx.get_invocation_status(&invocation_id).await?; + + match status { + InvocationStatus::Invoked(metadata) => { + self.kill_invoked_invocation(ctx, invocation_id, metadata, true) + .await?; + } + InvocationStatus::Suspended { metadata, .. } => { + self.kill_suspended_invocation(ctx, invocation_id, metadata, true) + .await?; + } + InvocationStatus::Inboxed(_) => { + info!("Received kill and restart command for invocation '{invocation_id}' in inboxed state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Scheduled(_) => { + info!("Received kill and restart command for invocation '{invocation_id}' in scheduled state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Killed(mut metadata) => { + if !metadata.restart_when_completed { + // Update the restart_when_completed. + // This might happen if the user sent in a quick succession two kill commands, the first without retry and the second with retry + metadata.restart_when_completed = true; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + } + + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + info!("Received kill and restart command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { + trace!("Received kill and restart command for unknown invocation with id '{invocation_id}'."); + // We still try to send the abort signal to the invoker, + // as it might be the case that previously the user sent an abort signal + // but some message was still between the invoker/PP queues. + // This can happen because the invoke/resume and the abort invoker messages end up in different queues, + // and the abort message can overtake the invoke/resume. + // Consequently the invoker might have not received the abort and the user tried to send it again. + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + }; + + Ok(()) + } + + async fn on_cancel_and_restart_invocation< + State: VirtualObjectStatusTable + + InvocationStatusTable + + InboxTable + + FsmTable + + StateTable + + JournalTable + + OutboxTable + + TimerTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + ) -> Result<(), Error> { + let status = ctx.get_invocation_status(&invocation_id).await?; + + match status { + InvocationStatus::Invoked(mut metadata) => { + self.cancel_journal_leaves( + ctx, + invocation_id, + InvocationStatusProjection::Invoked, + metadata.journal_metadata.length, + ) + .await?; + if !metadata.restart_when_completed { + // Update the restart_when_completed. + metadata.restart_when_completed = true; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata)) + .await; + } + } + InvocationStatus::Suspended { + mut metadata, + waiting_for_completed_entries, + } => { + if self + .cancel_journal_leaves( + ctx, + invocation_id, + InvocationStatusProjection::Suspended(waiting_for_completed_entries), + metadata.journal_metadata.length, + ) + .await? + { + metadata.restart_when_completed = true; + Self::do_resume_service(ctx, invocation_id, metadata).await?; + } + } + InvocationStatus::Inboxed(_) => { + info!("Received cancel and restart command for invocation '{invocation_id}' in inboxed state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Scheduled(_) => { + info!("Received cancel and restart command for invocation '{invocation_id}' in scheduled state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Killed(mut metadata) => { + if !metadata.restart_when_completed { + // Update the restart_when_completed. + // This might happen if the user sent in a quick succession two kill commands, the first without retry and the second with retry + metadata.restart_when_completed = true; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + } + + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received cancel command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { + trace!("Received cancel command for unknown invocation with id '{invocation_id}'."); + // We still try to send the abort signal to the invoker, + // as it might be the case that previously the user sent an abort signal + // but some message was still between the invoker/PP queues. + // This can happen because the invoke/resume and the abort invoker messages end up in different queues, + // and the abort message can overtake the invoke/resume. + // Consequently the invoker might have not received the abort and the user tried to send it again. + // TODO + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + }; + + Ok(()) + } + async fn terminate_inboxed_invocation< State: InvocationStatusTable + InboxTable + OutboxTable + FsmTable, >( @@ -1020,8 +1179,10 @@ impl StateMachine { inboxed_invocation: InboxedInvocation, ) -> Result<(), Error> { let error = match termination_flavor { - TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, - TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + TerminationFlavor::Kill | TerminationFlavor::KillAndRestart => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel | TerminationFlavor::CancelAndRestart => { + CANCELED_INVOCATION_ERROR + } }; let InboxedInvocation { @@ -1079,8 +1240,10 @@ impl StateMachine { scheduled_invocation: ScheduledInvocation, ) -> Result<(), Error> { let error = match termination_flavor { - TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, - TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + TerminationFlavor::Kill | TerminationFlavor::KillAndRestart => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel | TerminationFlavor::CancelAndRestart => { + CANCELED_INVOCATION_ERROR + } }; let ScheduledInvocation { @@ -1142,7 +1305,8 @@ impl StateMachine { &mut self, ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, - metadata: InFlightInvocationMetadata, + mut metadata: InFlightInvocationMetadata, + restart: bool, ) -> Result<(), Error> { self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; @@ -1157,11 +1321,17 @@ impl StateMachine { "Effect: Store killed invocation" ); + metadata.restart_when_completed = restart; ctx.storage .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) .await; Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); } else { + if restart { + // Kill and restart won't work without ExperimentalFeature::InvocationStatusKilled + warn!("Ignoring the kill and restart command for '{invocation_id}' and simply executing kill, as this command is not implemented yet") + } + self.end_invocation( ctx, invocation_id, @@ -1187,11 +1357,16 @@ impl StateMachine { &mut self, ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, - metadata: InFlightInvocationMetadata, + mut metadata: InFlightInvocationMetadata, + restart: bool, ) -> Result<(), Error> { self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; + if restart { + metadata.restart_when_completed = true + } + // No need to go through the Killed state when we're suspended, // because it means we already got a terminal state from the invoker. self.end_invocation( @@ -1680,6 +1855,12 @@ impl StateMachine { // If given, this will override any Output Entry available in the journal table response_result_override: Option, ) -> Result<(), Error> { + if invocation_metadata.restart_when_completed { + return self + .restart_invocation(ctx, invocation_id, invocation_metadata) + .await; + } + let invocation_target = invocation_metadata.invocation_target.clone(); let journal_length = invocation_metadata.journal_metadata.length; let completion_retention_time = invocation_metadata.completion_retention_duration; @@ -1756,6 +1937,49 @@ impl StateMachine { Ok(()) } + async fn restart_invocation< + State: InboxTable + + VirtualObjectStatusTable + + JournalTable + + OutboxTable + + FsmTable + + InvocationStatusTable + + StateTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + mut invocation_metadata: InFlightInvocationMetadata, + ) -> Result<(), Error> { + info!("Restarting invocation"); + + // We need to cleanup the journal except the first item. + let journal_length = invocation_metadata.journal_metadata.length; + debug_if_leader!( + ctx.is_leader, + restate.journal.length = journal_length, + "Effect: Drop journal except first entry" + ); + ctx.storage + .delete_journal_range(&invocation_id, 1..journal_length) + .await; + + // Let's reset a bunch of parameters in the InFlightInvocationMetadata + invocation_metadata.journal_metadata.length = 1; + invocation_metadata.pinned_deployment = None; + invocation_metadata.restart_when_completed = false; + + Self::invoke( + ctx, + invocation_id, + invocation_metadata, + InvokeInputJournal::NoCachedJournal, + ) + .await?; + + Ok(()) + } + #[allow(clippy::too_many_arguments)] async fn send_response_to_sinks( &mut self,