Skip to content

Commit

Permalink
Merge branch 'patch/3.3.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
sfmskywalker committed Feb 6, 2025
2 parents a084353 + 5d21bcb commit b57763a
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
- 'main'
- 'bug/*'
- 'perf/*'
- 'patch/*'
release:
types: [ prereleased, published ]
env:
Expand Down Expand Up @@ -42,7 +43,7 @@ jobs:
run: |
if [[ "${{ github.ref }}" == refs/tags/* && "${{ github.event_name }}" == "release" && ("${{ github.event.action }}" == "published" || "${{ github.event.action }}" == "prereleased")]]; then
git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/*
git branch --remote --contains | grep origin/main
git branch --remote --contains | grep origin/patch/3.3.2
else
git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/*
git branch --remote --contains | grep origin/${BRANCH_NAME}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class DefaultTenantAccessor : ITenantAccessor
public Tenant? Tenant
{
get => CurrentTenantField.Value;
internal set => CurrentTenantField.Value = value;
private set => CurrentTenantField.Value = value;
}

public IDisposable PushContext(Tenant? tenant)
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Hangfire/Extensions/JobStorageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static IEnumerable<KeyValuePair<string, ScheduledJobDto>> EnumerateSchedu
{
scheduledJobs = api.ScheduledJobs(skip, take);

var jobs = scheduledJobs.FindAll(x => x.Value.Job.Type == typeof(RunWorkflowJob) || x.Value.Job.Type == typeof(ResumeWorkflowJob));
var jobs = scheduledJobs.FindAll(x => x.Value.Job?.Type == typeof(RunWorkflowJob) || x.Value.Job?.Type == typeof(ResumeWorkflowJob));
foreach (var job in jobs.Where(x => (string)x.Value.Job.Args[0] == name))
yield return job;

Expand All @@ -45,7 +45,7 @@ public static IEnumerable<KeyValuePair<string, EnqueuedJobDto>> EnumerateQueuedJ
{
enqueuedJobs = api.EnqueuedJobs(queueName, skip, take);

var jobs = enqueuedJobs.FindAll(x => x.Value.Job.Type == typeof(RunWorkflowJob) || x.Value.Job.Type == typeof(ResumeWorkflowJob));
var jobs = enqueuedJobs.FindAll(x => x.Value.Job?.Type == typeof(RunWorkflowJob) || x.Value.Job?.Type == typeof(ResumeWorkflowJob));
foreach (var job in jobs.Where(x => (string)x.Value.Job.Args[0] == taskName))
yield return job;

Expand Down
22 changes: 8 additions & 14 deletions src/modules/Elsa.Hangfire/Jobs/ExecuteBackgroundActivityJob.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
using Elsa.Common.Multitenancy;
using Elsa.Workflows.Runtime;
using JetBrains.Annotations;

namespace Elsa.Hangfire.Jobs;

/// <summary>
/// A job that executes a background activity.
/// </summary>
public class ExecuteBackgroundActivityJob
[UsedImplicitly]
public class ExecuteBackgroundActivityJob(IBackgroundActivityInvoker backgroundActivityInvoker, ITenantFinder tenantFinder, ITenantAccessor tenantAccessor)
{
private readonly IBackgroundActivityInvoker _backgroundActivityInvoker;

/// <summary>
/// Initializes a new instance of the <see cref="ExecuteBackgroundActivityJob"/> class.
/// </summary>
/// <param name="backgroundActivityInvoker"></param>
public ExecuteBackgroundActivityJob(IBackgroundActivityInvoker backgroundActivityInvoker)
{
_backgroundActivityInvoker = backgroundActivityInvoker;
}

/// <summary>
/// Executes the job.
/// </summary>
public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default)
public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, string? tenantId, CancellationToken cancellationToken = default)
{
await _backgroundActivityInvoker.ExecuteAsync(scheduledBackgroundActivity, cancellationToken);
var tenant = tenantId != null ? await tenantFinder.FindByIdAsync(tenantId, cancellationToken) : null;
using var scope = tenantAccessor.PushContext(tenant);
await backgroundActivityInvoker.ExecuteAsync(scheduledBackgroundActivity, cancellationToken);
}
}
15 changes: 11 additions & 4 deletions src/modules/Elsa.Hangfire/Jobs/ResumeWorkflowJob.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
using Elsa.Scheduling;
using Elsa.Common.Multitenancy;
using Elsa.Scheduling;
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
using Hangfire;

namespace Elsa.Hangfire.Jobs;

/// <summary>
/// A job that resumes a workflow.
/// </summary>
public class ResumeWorkflowJob(IWorkflowRuntime workflowRuntime)
public class ResumeWorkflowJob(IWorkflowRuntime workflowRuntime, ITenantFinder tenantFinder, ITenantAccessor tenantAccessor)
{
/// <summary>
/// Executes the job.
/// </summary>
/// <param name="name">The name of the job.</param>
/// <param name="taskName">A unique name for this job.</param>
/// <param name="request">The workflow request.</param>
/// <param name="tenantId">The ID of the current tenant scheduling this job.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ExecuteAsync(string name, ScheduleExistingWorkflowInstanceRequest request, CancellationToken cancellationToken)
// ReSharper disable once UnusedParameter.Global
[AutomaticRetry(OnAttemptsExceeded = AttemptsExceededAction.Fail)]
public async Task ExecuteAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, string? tenantId, CancellationToken cancellationToken)
{
var tenant = tenantId != null ? await tenantFinder.FindByIdAsync(tenantId, cancellationToken) : null;
using var scope = tenantAccessor.PushContext(tenant);
var client = await workflowRuntime.CreateClientAsync(request.WorkflowInstanceId, cancellationToken);
var runRequest = new RunWorkflowInstanceRequest
{
Expand Down
13 changes: 10 additions & 3 deletions src/modules/Elsa.Hangfire/Jobs/RunWorkflowJob.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
using Elsa.Common.Multitenancy;
using Elsa.Scheduling;
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
using Hangfire;

namespace Elsa.Hangfire.Jobs;

/// <summary>
/// A job that resumes a workflow.
/// </summary>
public class RunWorkflowJob(IWorkflowRuntime workflowRuntime)
public class RunWorkflowJob(IWorkflowRuntime workflowRuntime, ITenantFinder tenantFinder, ITenantAccessor tenantAccessor)
{
/// <summary>
/// Executes the job.
/// </summary>
/// <param name="name">The name of the job.</param>
/// <param name="taskName">A unique name for this job.</param>
/// <param name="request">The workflow request.</param>
/// <param name="tenantId">The ID of the current tenant scheduling this job.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ExecuteAsync(string name, ScheduleNewWorkflowInstanceRequest request, CancellationToken cancellationToken)
// ReSharper disable once UnusedParameter.Global
[AutomaticRetry(OnAttemptsExceeded = AttemptsExceededAction.Fail)]
public async Task ExecuteAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, string? tenantId, CancellationToken cancellationToken)
{
var tenant = tenantId != null ? await tenantFinder.FindByIdAsync(tenantId, cancellationToken) : null;
using var scope = tenantAccessor.PushContext(tenant);
var client = await workflowRuntime.CreateClientAsync(cancellationToken);
var createAndRunRequest = new CreateAndRunWorkflowInstanceRequest
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Common.Multitenancy;
using Elsa.Hangfire.Jobs;
using Elsa.Hangfire.States;
using Elsa.Workflows.Runtime;
Expand All @@ -9,11 +10,12 @@ namespace Elsa.Hangfire.Services;
/// <summary>
/// Invokes activities from a background worker within the context of its workflow instance using Hangfire.
/// </summary>
public class HangfireBackgroundActivityScheduler(IBackgroundJobClient backgroundJobClient) : IBackgroundActivityScheduler
public class HangfireBackgroundActivityScheduler(IBackgroundJobClient backgroundJobClient, ITenantAccessor tenantAccessor) : IBackgroundActivityScheduler
{
public Task<string> CreateAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default)
{
var jobId = backgroundJobClient.Create<ExecuteBackgroundActivityJob>(x => x.ExecuteAsync(scheduledBackgroundActivity, CancellationToken.None), new PendingState());
var tenantId = tenantAccessor.Tenant?.Id;
var jobId = backgroundJobClient.Create<ExecuteBackgroundActivityJob>(x => x.ExecuteAsync(scheduledBackgroundActivity, tenantId, CancellationToken.None), new PendingState());
return Task.FromResult(jobId);
}

Expand All @@ -28,7 +30,8 @@ public Task ScheduleAsync(string jobId, CancellationToken cancellationToken = de
/// <inheritdoc />
public Task<string> ScheduleAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default)
{
var jobId = backgroundJobClient.Enqueue<ExecuteBackgroundActivityJob>(x => x.ExecuteAsync(scheduledBackgroundActivity, CancellationToken.None));
var tenantId = tenantAccessor.Tenant?.Id;
var jobId = backgroundJobClient.Enqueue<ExecuteBackgroundActivityJob>(x => x.ExecuteAsync(scheduledBackgroundActivity, tenantId, CancellationToken.None));
return Task.FromResult(jobId);
}

Expand Down
45 changes: 20 additions & 25 deletions src/modules/Elsa.Hangfire/Services/HangfireWorkflowScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Common.Multitenancy;
using Elsa.Hangfire.Extensions;
using Elsa.Hangfire.Jobs;
using Elsa.Scheduling;
Expand All @@ -9,33 +10,25 @@ namespace Elsa.Hangfire.Services;
/// <summary>
/// An implementation of <see cref="IWorkflowScheduler"/> that uses Hangfire.
/// </summary>
public class HangfireWorkflowScheduler : IWorkflowScheduler
public class HangfireWorkflowScheduler(
IBackgroundJobClient backgroundJobClient,
IRecurringJobManager recurringJobManager,
ITenantAccessor tenantAccessor,
JobStorage jobStorage) : IWorkflowScheduler
{
private readonly IBackgroundJobClient _backgroundJobClient;
private readonly IRecurringJobManager _recurringJobManager;
private readonly JobStorage _jobStorage;

/// <summary>
/// Initializes a new instance of the <see cref="HangfireWorkflowScheduler"/> class.
/// </summary>
public HangfireWorkflowScheduler(IBackgroundJobClient backgroundJobClient, IRecurringJobManager recurringJobManager, JobStorage jobStorage)
{
_backgroundJobClient = backgroundJobClient;
_recurringJobManager = recurringJobManager;
_jobStorage = jobStorage;
}

/// <inheritdoc />
public ValueTask ScheduleAtAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default)
{
_backgroundJobClient.Schedule<RunWorkflowJob>(job => job.ExecuteAsync(taskName, request, CancellationToken.None), at);
var tenantId = tenantAccessor.Tenant?.Id;
backgroundJobClient.Schedule<RunWorkflowJob>(job => job.ExecuteAsync(taskName, request, tenantId, CancellationToken.None), at);
return ValueTask.CompletedTask;
}

/// <inheritdoc />
public ValueTask ScheduleAtAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default)
{
_backgroundJobClient.Schedule<ResumeWorkflowJob>(job => job.ExecuteAsync(taskName, request, CancellationToken.None), at);
var tenantId = tenantAccessor.Tenant?.Id;
backgroundJobClient.Schedule<ResumeWorkflowJob>(job => job.ExecuteAsync(taskName, request, tenantId, CancellationToken.None), at);
return ValueTask.CompletedTask;
}

Expand All @@ -54,14 +47,16 @@ public async ValueTask ScheduleRecurringAsync(string taskName, ScheduleExistingW
/// <inheritdoc />
public ValueTask ScheduleCronAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, string cronExpression, CancellationToken cancellationToken = default)
{
_recurringJobManager.AddOrUpdate<RunWorkflowJob>(taskName, job => job.ExecuteAsync(taskName, request, CancellationToken.None), cronExpression);
var tenantId = tenantAccessor.Tenant?.Id;
recurringJobManager.AddOrUpdate<RunWorkflowJob>(taskName, job => job.ExecuteAsync(taskName, request, tenantId, CancellationToken.None), cronExpression);
return ValueTask.CompletedTask;
}

/// <inheritdoc />
public ValueTask ScheduleCronAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, string cronExpression, CancellationToken cancellationToken = default)
{
_recurringJobManager.AddOrUpdate<ResumeWorkflowJob>(taskName, job => job.ExecuteAsync(taskName, request, CancellationToken.None), cronExpression);
var tenantId = tenantAccessor.Tenant?.Id;
recurringJobManager.AddOrUpdate<ResumeWorkflowJob>(taskName, job => job.ExecuteAsync(taskName, request, tenantId, CancellationToken.None), cronExpression);
return ValueTask.CompletedTask;
}

Expand All @@ -75,34 +70,34 @@ public ValueTask UnscheduleAsync(string taskName, CancellationToken cancellation
private void DeleteJobByTaskName(string taskName)
{
var scheduledJobIds = GetScheduledJobIds(taskName);
foreach (var jobId in scheduledJobIds) _backgroundJobClient.Delete(jobId);
foreach (var jobId in scheduledJobIds) backgroundJobClient.Delete(jobId);

var queuedJobsIds = GetQueuedJobIds(taskName);
foreach (var jobId in queuedJobsIds) _backgroundJobClient.Delete(jobId);
foreach (var jobId in queuedJobsIds) backgroundJobClient.Delete(jobId);

var recurringJobIds = GetRecurringJobIds<RunWorkflowJob>(taskName);
foreach (var jobId in recurringJobIds) _recurringJobManager.RemoveIfExists(jobId);
foreach (var jobId in recurringJobIds) recurringJobManager.RemoveIfExists(jobId);
}

private IEnumerable<string> GetScheduledJobIds(string taskName)
{
return _jobStorage.EnumerateScheduledJobs(taskName)
return jobStorage.EnumerateScheduledJobs(taskName)
.Select(x => x.Key)
.Distinct()
.ToList();
}

private IEnumerable<string> GetQueuedJobIds(string taskName)
{
return _jobStorage.EnumerateQueuedJobs("default", taskName)
return jobStorage.EnumerateQueuedJobs("default", taskName)
.Select(x => x.Key)
.Distinct()
.ToList();
}

private IEnumerable<string> GetRecurringJobIds<TJob>(string taskName)
{
using var connection = _jobStorage.GetConnection();
using var connection = jobStorage.GetConnection();
var jobs = connection.GetRecurringJobs().Where(x => x.Job.Type == typeof(TJob) && (string)x.Job.Args[0] == taskName);
return jobs.Select(x => x.Id).Distinct().ToList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private WorkflowDefinitionFilter CreateFilter(Request request)
{
var versionOptions = string.IsNullOrWhiteSpace(request.VersionOptions) ? default(VersionOptions?) : VersionOptions.FromString(request.VersionOptions);

return new WorkflowDefinitionFilter
return new()
{
IsSystem = request.IsSystem,
VersionOptions = versionOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,27 @@ public ArgumentJsonConverter(IWellKnownTypeRegistry wellKnownTypeRegistry)
{
_wellKnownTypeRegistry = wellKnownTypeRegistry;
}

/// <inheritdoc />
public override void Write(Utf8JsonWriter writer, ArgumentDefinition value, JsonSerializerOptions options)
{
var newOptions = new JsonSerializerOptions(options);
newOptions.Converters.RemoveWhere(x => x is ArgumentJsonConverterFactory);

var jsonObject = (JsonObject)JsonSerializer.SerializeToNode(value, value.GetType(), newOptions)!;
var isArray = value.Type.IsCollectionType();
jsonObject["isArray"] = isArray;
jsonObject["type"] = _wellKnownTypeRegistry.GetAliasOrDefault(isArray ? value.Type.GetCollectionElementType() : value.Type);
var typeName = value.Type;
var typeAlias = _wellKnownTypeRegistry.TryGetAlias(typeName, out var alias) ? alias : null;
var isArray = typeName.IsArray;
var isCollection = typeName.IsCollectionType();
var elementTypeName = isArray ? typeName.GetElementType() : isCollection ? typeName.GenericTypeArguments[0] : typeName;
var elementTypeAlias = _wellKnownTypeRegistry.GetAliasOrDefault(elementTypeName);
var isAliasedArray = (isArray || isCollection) && typeAlias != null;
var finalTypeAlias = isArray || isCollection ? typeAlias ?? elementTypeAlias : elementTypeAlias;

if (isArray && !isAliasedArray) jsonObject["isArray"] = isArray;
if (isCollection) jsonObject["isCollection"] = isCollection;

jsonObject["type"] = finalTypeAlias;
JsonSerializer.Serialize(writer, jsonObject, newOptions);
}

Expand All @@ -40,11 +49,12 @@ public override ArgumentDefinition Read(ref Utf8JsonReader reader, Type typeToCo
{
var jsonObject = (JsonObject)JsonNode.Parse(ref reader)!;
var isArray = jsonObject["isArray"]?.GetValue<bool>() ?? false;
var typeName = jsonObject["type"]!.GetValue<string>();
var type = _wellKnownTypeRegistry.GetTypeOrDefault(typeName);
var isCollection = jsonObject["isCollection"]?.GetValue<bool>() ?? false;
var typeAlias = jsonObject["type"]!.GetValue<string>();
var type = _wellKnownTypeRegistry.GetTypeOrDefault(typeAlias);

if (isArray)
type = type.MakeArrayType();
if (isArray) type = type.MakeArrayType();
if (isCollection) type = type.MakeGenericType(type.GenericTypeArguments[0]);

var newOptions = new JsonSerializerOptions(options);
newOptions.Converters.RemoveWhere(x => x is ArgumentJsonConverterFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc
{
var workflowInstanceId = scheduledBackgroundActivity.WorkflowInstanceId;
var workflowInstance = await workflowInstanceManager.FindByIdAsync(workflowInstanceId, cancellationToken);
if (workflowInstance == null) throw new Exception("Workflow instance not found");
if (workflowInstance == null) throw new("Workflow instance not found");
var workflowState = workflowInstance.WorkflowState;
var workflow = await workflowDefinitionService.FindWorkflowGraphAsync(workflowInstance.DefinitionVersionId, cancellationToken);
if (workflow == null) throw new Exception("Workflow definition not found");
if (workflow == null) throw new("Workflow definition not found");
var workflowExecutionContext = await WorkflowExecutionContext.CreateAsync(serviceProvider, workflow, workflowState, cancellationToken: cancellationToken);
var activityNodeId = scheduledBackgroundActivity.ActivityNodeId;
var activityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.NodeId == activityNodeId);
Expand Down

0 comments on commit b57763a

Please sign in to comment.