Skip to content

Commit

Permalink
Workflow cancellation (#4813)
Browse files Browse the repository at this point in the history
* Removed duplicate entries

* Prevented workflows and activities from starting when the parent workflow is being cancelled

* Added cancellation to execution contexts

* Added store for workflow execution contexts

* Added cancellation to workflowRuntime

* Removed calling BookmarkPersistedHandler when persisting bookmarks.

* Added endpoint for bulk cancelling tasks

* Added tests for cancelling workflows

* Prevented cancelling the cancellation process since it could have unwanted effects

---------

Co-authored-by: Sipke Schoorstra <sipkeschoorstra@outlook.com>
  • Loading branch information
raymonddenhaan and sfmskywalker authored Jan 23, 2024
1 parent 832fac4 commit c409414
Show file tree
Hide file tree
Showing 42 changed files with 854 additions and 122 deletions.
2 changes: 1 addition & 1 deletion Elsa.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@


Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.7.34003.232
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected override async ValueTask HandleAsync(AlterationContext context, Schedu
{
// If the activity is in a faulted state, reset it to Running.
if (existingActivityExecutionContext.Status == ActivityStatus.Faulted)
existingActivityExecutionContext.Status = ActivityStatus.Running;
existingActivityExecutionContext.TransitionTo(ActivityStatus.Running);

// Schedule the activity execution context.
var parentContext = existingActivityExecutionContext.ParentActivityExecutionContext;
Expand Down
60 changes: 58 additions & 2 deletions src/modules/Elsa.ProtoActor/Grains/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
using Elsa.ProtoActor.Mappers;
using Elsa.ProtoActor.ProtoBuf;
using Elsa.ProtoActor.Snapshots;
using Elsa.Workflows;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Helpers;
using Elsa.Workflows.Management.Contracts;
using Elsa.Workflows.Management.Mappers;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Options;
using Elsa.Workflows.Runtime.Requests;
using Elsa.Workflows.State;
using Microsoft.Extensions.DependencyInjection;
using Proto;
using Proto.Cluster;
using Proto.Persistence;
using CancellationTokens = Elsa.Workflows.Models.CancellationTokens;
using WorkflowStatus = Elsa.Workflows.WorkflowStatus;
using WorkflowSubStatus = Elsa.Workflows.WorkflowSubStatus;

namespace Elsa.ProtoActor.Grains;

Expand All @@ -38,6 +44,8 @@ internal class WorkflowInstance : WorkflowInstanceBase
private IWorkflowHost _workflowHost = default!;
private WorkflowState _workflowState = default!;

private readonly ICollection<CancellationTokenSource> _cancellationTokenSources = new List<CancellationTokenSource>();

/// <inheritdoc />
public WorkflowInstance(
IServiceScopeFactory scopeFactory,
Expand Down Expand Up @@ -139,6 +147,10 @@ public override async Task Start(StartWorkflowRequest request, Action<WorkflowEx
var versionOptions = VersionOptions.FromString(request.VersionOptions);
var cancellationToken = Context.CancellationToken;

var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_cancellationTokenSources.Add(cancellationTokenSource);
cancellationToken = cancellationTokenSource.Token;

// Only need to reconstruct a workflow host if not already done so during CanStart.
if (_workflowHost == null!)
{
Expand All @@ -155,7 +167,9 @@ public override async Task Start(StartWorkflowRequest request, Action<WorkflowEx
CorrelationId = correlationId,
Input = input,
Properties = properties,
TriggerActivityId = request.TriggerActivityId
TriggerActivityId = request.TriggerActivityId,
StatusUpdatedCallback = StatusUpdated,
CancellationTokens = new CancellationTokens(cancellationToken)
};

var task = _workflowHost.StartWorkflowAsync(startWorkflowOptions, cancellationToken);
Expand Down Expand Up @@ -187,6 +201,31 @@ public override async Task Start(StartWorkflowRequest request, Action<WorkflowEx
});
}

private void StatusUpdated(WorkflowExecutionContext context)
{
_ = Task.Run(async () => await Update(context));
}

private async Task Update(WorkflowExecutionContext context)
{
using var scope = _scopeFactory.CreateScope();
var extractor = scope.ServiceProvider.GetRequiredService<IWorkflowStateExtractor>();
var bookmarkPersistor = scope.ServiceProvider.GetRequiredService<IBookmarksPersister>();
var workflowState = extractor.Extract(context);
var originalBookmarks = _workflowHost.WorkflowState.Bookmarks;

_workflowState = workflowState;

await SaveSnapshotAsync();
SaveWorkflowInstance(workflowState);
var newBookmarks = workflowState.Bookmarks;

var diff = Diff.For(originalBookmarks, newBookmarks);

var bookmarkRequest = new UpdateBookmarksRequest(workflowState.DefinitionId, diff, workflowState.CorrelationId);
await bookmarkPersistor.PersistBookmarksAsync(bookmarkRequest);
}

/// <inheritdoc />
public override Task Stop()
{
Expand All @@ -208,6 +247,10 @@ public override async Task Resume(ResumeWorkflowRequest request, Action<Workflow
var activityInstanceId = request.ActivityInstanceId.NullIfEmpty();
var activityHash = request.ActivityHash.NullIfEmpty();
var cancellationToken = Context.CancellationToken;

var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_cancellationTokenSources.Add(cancellationTokenSource);
cancellationToken = cancellationTokenSource.Token;

var resumeWorkflowHostOptions = new ResumeWorkflowHostOptions
{
Expand All @@ -218,7 +261,8 @@ public override async Task Resume(ResumeWorkflowRequest request, Action<Workflow
ActivityInstanceId = activityInstanceId,
ActivityHash = activityHash,
Input = _input,
Properties = _properties
Properties = _properties,
CancellationTokens = cancellationToken
};

var definitionId = _definitionId;
Expand Down Expand Up @@ -260,6 +304,18 @@ public override async Task Resume(ResumeWorkflowRequest request, Action<Workflow
/// <inheritdoc />
public override Task<WorkflowExecutionResponse> Resume(ResumeWorkflowRequest request) => Task.FromResult(new WorkflowExecutionResponse());

public override async Task Cancel()
{
if (_workflowState.Status != WorkflowStatus.Finished)
{
_workflowState.SubStatus = WorkflowSubStatus.Cancelled;
_workflowState.Status = WorkflowStatus.Finished;
}

foreach(var source in _cancellationTokenSources)
source.Cancel();
}

/// <inheritdoc />
public override async Task<ExportWorkflowStateResponse> ExportState(ExportWorkflowStateRequest request)
{
Expand Down
1 change: 1 addition & 0 deletions src/modules/Elsa.ProtoActor/Proto/WorkflowInstance.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service WorkflowInstance {
rpc Start (StartWorkflowRequest) returns (WorkflowExecutionResponse);
rpc Stop (Empty) returns (Empty);
rpc Resume (ResumeWorkflowRequest) returns (WorkflowExecutionResponse);
rpc Cancel (Empty) returns (Empty);
rpc ExportState(ExportWorkflowStateRequest) returns (ExportWorkflowStateResponse);
rpc ImportState(ImportWorkflowStateRequest) returns (ImportWorkflowStateResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Proto.Cluster;
using Bookmark = Elsa.Workflows.Models.Bookmark;
using CountRunningWorkflowsRequest = Elsa.Workflows.Runtime.Requests.CountRunningWorkflowsRequest;
using WorkflowStatus = Elsa.Workflows.WorkflowStatus;

namespace Elsa.ProtoActor.Services;

Expand Down Expand Up @@ -246,6 +247,13 @@ public async Task<WorkflowExecutionResult> ExecuteWorkflowAsync(WorkflowMatch ma
return result!;
}

/// <inheritdoc />
public async Task CancelWorkflowAsync(string workflowInstanceId, CancellationToken cancellationToken)
{
var client = _cluster.GetNamedWorkflowGrain(workflowInstanceId);
await client.Cancel(cancellationToken);
}

/// <inheritdoc />
public async Task<IEnumerable<WorkflowMatch>> FindWorkflowsAsync(WorkflowsFilter filter, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -296,7 +304,7 @@ public async Task<long> CountRunningWorkflowsAsync(CountRunningWorkflowsRequest
DefinitionId = request.DefinitionId,
Version = request.Version,
CorrelationId = request.CorrelationId,
WorkflowStatus = Workflows.WorkflowStatus.Running
WorkflowStatus = WorkflowStatus.Running
};
return await _workflowInstanceStore.CountAsync(filter, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public override void Configure()

public override async Task<Response> ExecuteAsync(Request request, CancellationToken cancellationToken)
{
var count = await _workflowDefinitionManager.BulkDeleteByDefinitionIdsAsync(request!.DefinitionIds, cancellationToken);
var count = await _workflowDefinitionManager.BulkDeleteByDefinitionIdsAsync(request.DefinitionIds, cancellationToken);
return new Response(count);
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
using System.Text.Json.Serialization;
using Elsa.Abstractions;
using Elsa.Workflows.Runtime.Contracts;

namespace Elsa.Workflows.Api.Endpoints.WorkflowInstances.BulkCancel;

public class BulkCancel : ElsaEndpoint<Request, Response>
{
private readonly IWorkflowRuntime _workflowRuntime;

public BulkCancel(IWorkflowRuntime workflowRuntime)
{
_workflowRuntime = workflowRuntime;
}

public override void Configure()
{
Post("/bulk-actions/cancel/workflow-instances/by-id");
ConfigurePermissions("cancel:workflow-instances");
}

public override async Task<Response> ExecuteAsync(Request request, CancellationToken cancellationToken)
{
// TODO: Implement workflow cancellation.
var count = -1;
var tasks = request.Ids.Select(id => _workflowRuntime.CancelWorkflowAsync(id, cancellationToken)).ToList();
await Task.WhenAll(tasks);

var count = tasks.Count(t => t.IsCompletedSuccessfully);

return new(count);
}

public record BulkCancelWorkflowInstancesRequest(ICollection<string> Ids);

public record BulkCancelWorkflowInstancesResponse([property: JsonPropertyName("cancelled")] int CancelledCount);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Notifications;
using Elsa.Workflows.Signals;

namespace Elsa.Workflows;

public partial class ActivityExecutionContext
{
private readonly CancellationTokenRegistration _cancellationRegistration;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly INotificationSender _publisher;

private void CancelActivity()
{
// If the activity is not running, do nothing.
if (Status != ActivityStatus.Running && Status != ActivityStatus.Faulted)
return;

_ = Task.Run(async () => await CancelActivityAsync());
}

private async Task CancelActivityAsync()
{
// Select all child contexts.
var childContexts = WorkflowExecutionContext.ActivityExecutionContexts.Where(x => x.ParentActivityExecutionContext == this).ToList();

foreach (var childContext in childContexts)
childContext._cancellationTokenSource.Cancel();

TransitionTo(ActivityStatus.Canceled);
ClearBookmarks();
ClearCompletionCallbacks();
WorkflowExecutionContext.Bookmarks.RemoveWhere(x => x.ActivityNodeId == NodeId);

// Add an execution log entry.
AddExecutionLogEntry("Canceled", payload: JournalData, includeActivityState: true);

_cancellationRegistration.Dispose();

await this.SendSignalAsync(new CancelSignal());
await _publisher.SendAsync(new ActivityCancelled(this));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Elsa.Extensions;
using Elsa.Workflows.Models;

namespace Elsa.Workflows;

public partial class ActivityExecutionContext
{
/// <summary>
/// Adds a new <see cref="WorkflowExecutionLogEntry"/> to the execution log of the current <see cref="Workflows.WorkflowExecutionContext"/>.
/// </summary>
/// <param name="eventName">The name of the event.</param>
/// <param name="message">The message of the event.</param>
/// <param name="source">The source of the activity. For example, the source file name and line number in case of composite activities.</param>
/// <param name="payload">Any contextual data related to this event.</param>
/// <param name="includeActivityState">True to include activity state with this event, false otherwise.</param>
/// <returns>Returns the created <see cref="WorkflowExecutionLogEntry"/>.</returns>
public WorkflowExecutionLogEntry AddExecutionLogEntry(string eventName, string? message = default, string? source = default, object? payload = default, bool includeActivityState = false)
{
var activityState = includeActivityState ? ActivityState : default;

var logEntry = new WorkflowExecutionLogEntry(
Id,
ParentActivityExecutionContext?.Id,
Activity.Id,
Activity.Type,
Activity.Version,
Activity.Name,
NodeId,
activityState,
_systemClock.UtcNow,
WorkflowExecutionContext.ExecutionLogSequence++,
eventName,
message,
source ?? Activity.GetSource(),
payload);

WorkflowExecutionContext.ExecutionLog.Add(logEntry);
return logEntry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Elsa.Expressions.Helpers;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Memory;
using Elsa.Workflows.Models;
Expand All @@ -15,7 +16,7 @@ namespace Elsa.Workflows;
/// <summary>
/// Represents the context of an activity execution.
/// </summary>
public class ActivityExecutionContext : IExecutionContext
public partial class ActivityExecutionContext : IExecutionContext
{
private readonly ISystemClock _systemClock;
private readonly List<Bookmark> _bookmarks = new();
Expand Down Expand Up @@ -47,6 +48,10 @@ public ActivityExecutionContext(
Tag = tag;
CancellationToken = cancellationToken;
Id = id;
_publisher = GetRequiredService<INotificationSender>();

_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_cancellationRegistration = _cancellationTokenSource.Token.Register(CancelActivity);
}

/// <summary>
Expand Down Expand Up @@ -123,8 +128,21 @@ public IEnumerable<Variable> Variables
/// <summary>
/// The current status of the activity.
/// </summary>
public ActivityStatus Status { get; set; }
public ActivityStatus Status { get; private set; }

/// <summary>
/// Sets the current status of the activity.
/// </summary>
public void TransitionTo(ActivityStatus status)
{
Status = status;

if (Status is ActivityStatus.Completed
or ActivityStatus.Canceled
or ActivityStatus.Faulted)
_cancellationRegistration.Dispose();
}

/// <summary>
/// Gets or sets the exception that occurred during the activity execution, if any.
/// </summary>
Expand Down
Loading

0 comments on commit c409414

Please sign in to comment.