Skip to content

Commit

Permalink
Merge pull request #6344 from elsa-workflows/task/restore-previous-wo…
Browse files Browse the repository at this point in the history
…rkflow-runtime-api

Restore old workflow runtime API
  • Loading branch information
sfmskywalker authored Jan 28, 2025
2 parents 0a43b99 + 9f01e14 commit e2a57ac
Show file tree
Hide file tree
Showing 32 changed files with 619 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ public interface IWorkflowInstanceManager
/// Finds the first workflow instance that matches the specified filter.
/// </summary>
Task<WorkflowInstance?> FindAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default);


/// <summary>
/// Determines whether a workflow instance with the specified ID exists.
/// </summary>
Task<bool> ExistsAsync(string instanceId, CancellationToken cancellationToken = default);

/// <summary>
/// Saves the specified workflow instance.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public class WorkflowInstanceManager(
return await store.FindAsync(filter, cancellationToken);
}

public async Task<bool> ExistsAsync(string instanceId, CancellationToken cancellationToken = default)
{
var filter = new WorkflowInstanceFilter
{
Id = instanceId
};
var count = await store.CountAsync(filter, cancellationToken);
return count > 0;
}

/// <inheritdoc />
public async Task SaveAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,4 @@
<ProjectReference Include="..\Elsa.Workflows.Runtime\Elsa.Workflows.Runtime.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Handlers\" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public async Task ImportStateAsync(WorkflowState workflowState, CancellationToke
await _localWorkflowClient.ImportStateAsync(workflowState, cancellationToken);
}

public async Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default)
{
return await _localWorkflowClient.InstanceExistsAsync(cancellationToken);
}

private async Task<R> WithLockAsync<R>(Func<Task<R>> func)
{
var lockKey = $"workflow-instance:{WorkflowInstanceId}";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Filters;
using Elsa.Workflows.Runtime.Matches;
using Elsa.Workflows.Runtime.Options;
using Elsa.Workflows.Runtime.Parameters;
using Elsa.Workflows.Runtime.Params;
using Elsa.Workflows.Runtime.Requests;
using Elsa.Workflows.Runtime.Results;
using Elsa.Workflows.State;

namespace Elsa.Workflows.Runtime.Distributed;

public partial class DistributedWorkflowRuntime
{
private readonly ObsoleteWorkflowRuntime _obsoleteApi;

public Task<CanStartWorkflowResult> CanStartWorkflowAsync(string definitionId, StartWorkflowRuntimeParams? options = null) => _obsoleteApi.CanStartWorkflowAsync(definitionId, options);
public Task<WorkflowExecutionResult> StartWorkflowAsync(string definitionId, StartWorkflowRuntimeParams? options = null) => _obsoleteApi.StartWorkflowAsync(definitionId, options);
public Task<ICollection<WorkflowExecutionResult>> StartWorkflowsAsync(string activityTypeName, object bookmarkPayload, TriggerWorkflowsOptions? options = null) => _obsoleteApi.StartWorkflowsAsync(activityTypeName, bookmarkPayload, options);
public Task<WorkflowExecutionResult?> TryStartWorkflowAsync(string definitionId, StartWorkflowRuntimeParams? options = null) => _obsoleteApi.TryStartWorkflowAsync(definitionId, options);
public Task<WorkflowExecutionResult?> ResumeWorkflowAsync(string workflowInstanceId, ResumeWorkflowRuntimeParams? options = null) => _obsoleteApi.ResumeWorkflowAsync(workflowInstanceId, options);
public Task<ICollection<WorkflowExecutionResult>> ResumeWorkflowsAsync(string activityTypeName, object bookmarkPayload, TriggerWorkflowsOptions? options = null) => _obsoleteApi.ResumeWorkflowsAsync(activityTypeName, bookmarkPayload, options);
public Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(string activityTypeName, object bookmarkPayload, TriggerWorkflowsOptions? options = null) => _obsoleteApi.TriggerWorkflowsAsync(activityTypeName, bookmarkPayload, options);
public Task<WorkflowExecutionResult> ExecuteWorkflowAsync(WorkflowMatch match, ExecuteWorkflowParams? options = default) => _obsoleteApi.ExecuteWorkflowAsync(match, options);
public Task<CancellationResult> CancelWorkflowAsync(string workflowInstanceId, CancellationToken cancellationToken = default) => _obsoleteApi.CancelWorkflowAsync(workflowInstanceId, cancellationToken);
public Task<IEnumerable<WorkflowMatch>> FindWorkflowsAsync(WorkflowsFilter filter, CancellationToken cancellationToken = default) => _obsoleteApi.FindWorkflowsAsync(filter, cancellationToken);
public Task<WorkflowState?> ExportWorkflowStateAsync(string workflowInstanceId, CancellationToken cancellationToken = default) => _obsoleteApi.ExportWorkflowStateAsync(workflowInstanceId, cancellationToken);
public Task ImportWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default) => _obsoleteApi.ImportWorkflowStateAsync(workflowState, cancellationToken);
public Task UpdateBookmarkAsync(StoredBookmark bookmark, CancellationToken cancellationToken = default) => _obsoleteApi.UpdateBookmarkAsync(bookmark, cancellationToken);
public Task<long> CountRunningWorkflowsAsync(CountRunningWorkflowsRequest request, CancellationToken cancellationToken = default) => _obsoleteApi.CountRunningWorkflowsAsync(request, cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
using Elsa.Workflows.Management;
using Microsoft.Extensions.DependencyInjection;

namespace Elsa.Workflows.Runtime.Distributed;

/// <summary>
/// Represents a distributed workflow runtime that can create <see cref="IWorkflowClient"/> instances connected to a workflow instance.
/// </summary>
public class DistributedWorkflowRuntime(IServiceProvider serviceProvider, IIdentityGenerator identityGenerator) : IWorkflowRuntime
public partial class DistributedWorkflowRuntime : IWorkflowRuntime
{
private readonly IServiceProvider _serviceProvider;
private readonly IIdentityGenerator _identityGenerator;

/// <summary>
/// Represents a distributed workflow runtime that can create <see cref="IWorkflowClient"/> instances connected to a workflow instance.
/// </summary>
public DistributedWorkflowRuntime(IServiceProvider serviceProvider, IIdentityGenerator identityGenerator)
{
_serviceProvider = serviceProvider;
_identityGenerator = identityGenerator;
_obsoleteApi = ActivatorUtilities.CreateInstance<ObsoleteWorkflowRuntime>(serviceProvider, (Func<string?, CancellationToken, ValueTask<IWorkflowClient>>)CreateClientAsync);
}

/// <inheritdoc />
public async ValueTask<IWorkflowClient> CreateClientAsync(CancellationToken cancellationToken = default)
{
Expand All @@ -16,8 +30,8 @@ public async ValueTask<IWorkflowClient> CreateClientAsync(CancellationToken canc
/// <inheritdoc />
public ValueTask<IWorkflowClient> CreateClientAsync(string? workflowInstanceId, CancellationToken cancellationToken = default)
{
workflowInstanceId ??= identityGenerator.GenerateId();
var client = (IWorkflowClient)ActivatorUtilities.CreateInstance(serviceProvider, typeof(DistributedWorkflowClient), workflowInstanceId);
workflowInstanceId ??= _identityGenerator.GenerateId();
var client = (IWorkflowClient)ActivatorUtilities.CreateInstance(_serviceProvider, typeof(DistributedWorkflowClient), workflowInstanceId);
return new(client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private WorkflowState WorkflowState

public override Task OnStarted()
{
_linkedTokenSource = new CancellationTokenSource();
_linkedTokenSource = new();
_linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(Context.CancellationToken, _linkedTokenSource.Token).Token;
return Task.CompletedTask;
}
Expand All @@ -61,7 +61,7 @@ public override Task Create(CreateWorkflowInstanceRequest request, Action<Create
if (result.IsFaulted)
onError(result.Exception.Message);
else
respond(new CreateWorkflowInstanceResponse());
respond(new());
});

return Task.CompletedTask;
Expand Down Expand Up @@ -170,7 +170,7 @@ public override async Task<ExportWorkflowStateResponse> ExportState()
{
await EnsureStateAsync();
var json = mappers.WorkflowStateJsonMapper.Map(WorkflowState);
return new ExportWorkflowStateResponse
return new()
{
SerializedWorkflowState = json
};
Expand All @@ -186,12 +186,21 @@ public override async Task ImportState(ImportWorkflowStateRequest request)
await workflowInstanceManager.SaveAsync(WorkflowState, Context.CancellationToken);
}

public override Task<InstanceExistsResponse> InstanceExists()
{
var exists = _workflowInstanceId != null;
return Task.FromResult(new InstanceExistsResponse
{
Exists = exists
});
}

private async Task<RunWorkflowResult> RunAsync(RunWorkflowOptions runWorkflowOptions)
{
if (_isRunning)
{
_queuedRunWorkflowOptions.Enqueue(runWorkflowOptions);
return new RunWorkflowResult(null!, null!, null);
return new(null!, null!, null);
}

_isRunning = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ message ExportWorkflowStateResponse {
message ImportWorkflowStateRequest {
Json SerializedWorkflowState = 1;
}

message InstanceExistsResponse {
bool Exists = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ service WorkflowInstance {
rpc Cancel (Empty) returns (Empty);
rpc ExportState(Empty) returns (ExportWorkflowStateResponse);
rpc ImportState(ImportWorkflowStateRequest) returns (Empty);
rpc InstanceExists(Empty) returns (InstanceExistsResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public async Task ImportStateAsync(WorkflowState workflowState, CancellationToke
await _actorClient.ImportState(request, CreateHeaders(), cancellationToken);
}

public Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

private IDictionary<string, string> CreateHeaders()
{
var headers = new Dictionary<string, string>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Elsa.Workflows.Runtime.Distributed;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Filters;
using Elsa.Workflows.Runtime.Matches;
using Elsa.Workflows.Runtime.Options;
using Elsa.Workflows.Runtime.Parameters;
using Elsa.Workflows.Runtime.Params;
using Elsa.Workflows.Runtime.Requests;
using Elsa.Workflows.Runtime.Results;
using Elsa.Workflows.State;

namespace Elsa.Workflows.Runtime.ProtoActor.Services;

public partial class ProtoActorWorkflowRuntime
{
private readonly ObsoleteWorkflowRuntime _obsoleteApi;

public Task<CanStartWorkflowResult> CanStartWorkflowAsync(string definitionId, StartWorkflowRuntimeParams? options = null) => _obsoleteApi.CanStartWorkflowAsync(definitionId, options);
public Task<WorkflowExecutionResult> StartWorkflowAsync(string definitionId, StartWorkflowRuntimeParams? options = null) => _obsoleteApi.StartWorkflowAsync(definitionId, options);
public Task<ICollection<WorkflowExecutionResult>> StartWorkflowsAsync(string activityTypeName, object bookmarkPayload, TriggerWorkflowsOptions? options = null) => _obsoleteApi.StartWorkflowsAsync(activityTypeName, bookmarkPayload, options);
public Task<WorkflowExecutionResult?> TryStartWorkflowAsync(string definitionId, StartWorkflowRuntimeParams? options = null) => _obsoleteApi.TryStartWorkflowAsync(definitionId, options);
public Task<WorkflowExecutionResult?> ResumeWorkflowAsync(string workflowInstanceId, ResumeWorkflowRuntimeParams? options = null) => _obsoleteApi.ResumeWorkflowAsync(workflowInstanceId, options);
public Task<ICollection<WorkflowExecutionResult>> ResumeWorkflowsAsync(string activityTypeName, object bookmarkPayload, TriggerWorkflowsOptions? options = null) => _obsoleteApi.ResumeWorkflowsAsync(activityTypeName, bookmarkPayload, options);
public Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(string activityTypeName, object bookmarkPayload, TriggerWorkflowsOptions? options = null) => _obsoleteApi.TriggerWorkflowsAsync(activityTypeName, bookmarkPayload, options);
public Task<WorkflowExecutionResult> ExecuteWorkflowAsync(WorkflowMatch match, ExecuteWorkflowParams? options = default) => _obsoleteApi.ExecuteWorkflowAsync(match, options);
public Task<CancellationResult> CancelWorkflowAsync(string workflowInstanceId, CancellationToken cancellationToken = default) => _obsoleteApi.CancelWorkflowAsync(workflowInstanceId, cancellationToken);
public Task<IEnumerable<WorkflowMatch>> FindWorkflowsAsync(WorkflowsFilter filter, CancellationToken cancellationToken = default) => _obsoleteApi.FindWorkflowsAsync(filter, cancellationToken);
public Task<WorkflowState?> ExportWorkflowStateAsync(string workflowInstanceId, CancellationToken cancellationToken = default) => _obsoleteApi.ExportWorkflowStateAsync(workflowInstanceId, cancellationToken);
public Task ImportWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default) => _obsoleteApi.ImportWorkflowStateAsync(workflowState, cancellationToken);
public Task UpdateBookmarkAsync(StoredBookmark bookmark, CancellationToken cancellationToken = default) => _obsoleteApi.UpdateBookmarkAsync(bookmark, cancellationToken);
public Task<long> CountRunningWorkflowsAsync(CountRunningWorkflowsRequest request, CancellationToken cancellationToken = default) => _obsoleteApi.CountRunningWorkflowsAsync(request, cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,22 @@ namespace Elsa.Workflows.Runtime.ProtoActor.Services;
/// <summary>
/// Represents a Proto.Actor implementation of the workflows runtime.
/// </summary>
public class ProtoActorWorkflowRuntime(IServiceProvider serviceProvider, IIdentityGenerator identityGenerator) : IWorkflowRuntime
public partial class ProtoActorWorkflowRuntime : IWorkflowRuntime
{
private readonly IServiceProvider _serviceProvider;
private readonly IIdentityGenerator _identityGenerator;

/// <summary>
/// Represents a Proto.Actor implementation of the workflows runtime.
/// </summary>
public ProtoActorWorkflowRuntime(IServiceProvider serviceProvider,
IIdentityGenerator identityGenerator)
{
_serviceProvider = serviceProvider;
_identityGenerator = identityGenerator;
_obsoleteApi = ActivatorUtilities.CreateInstance<ObsoleteWorkflowRuntime>(serviceProvider, (Func<string?, CancellationToken, ValueTask<IWorkflowClient>>)CreateClientAsync);
}

/// <inheritdoc />
public async ValueTask<IWorkflowClient> CreateClientAsync(CancellationToken cancellationToken = default)
{
Expand All @@ -16,8 +30,8 @@ public async ValueTask<IWorkflowClient> CreateClientAsync(CancellationToken canc
/// <inheritdoc />
public ValueTask<IWorkflowClient> CreateClientAsync(string? workflowInstanceId, CancellationToken cancellationToken = default)
{
workflowInstanceId ??= identityGenerator.GenerateId();
var client = (IWorkflowClient)ActivatorUtilities.CreateInstance(serviceProvider, typeof(ProtoActorWorkflowClient), workflowInstanceId);
workflowInstanceId ??= _identityGenerator.GenerateId();
var client = (IWorkflowClient)ActivatorUtilities.CreateInstance(_serviceProvider, typeof(ProtoActorWorkflowClient), workflowInstanceId);
return new(client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface IWorkflowClient
/// Imports the specified <see cref="WorkflowState"/>.
/// </summary>
Task ImportStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default);

Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default);
}
Loading

0 comments on commit e2a57ac

Please sign in to comment.