Skip to content

Commit

Permalink
Merge pull request #6200 from elsa-workflows/bug/quartz-scheduled-once
Browse files Browse the repository at this point in the history
Fix Quartz scheduler implementation
  • Loading branch information
sfmskywalker authored Dec 11, 2024
2 parents 1534d5f + 96bf791 commit c50a53f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/modules/Elsa.Identity/Features/IdentityFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace Elsa.Identity.Features;

/// <summary>
/// Provides identity feature to authenticate & authorize API requests.
/// Provides identity feature to authenticate &amp; authorize API requests.
/// </summary>
[DependsOn(typeof(SystemClockFeature))]
[PublicAPI]
Expand Down
9 changes: 9 additions & 0 deletions src/modules/Elsa.Quartz/Contracts/IJobKeyProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Quartz;

namespace Elsa.Quartz.Contracts;

internal interface IJobKeyProvider
{
JobKey GetJobKey<TJob>() where TJob : IJob;
string GetGroupName();
}
15 changes: 10 additions & 5 deletions src/modules/Elsa.Quartz/Features/QuartzSchedulerFeature.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Elsa.Extensions;
using Elsa.Features.Abstractions;
using Elsa.Features.Attributes;
using Elsa.Features.Services;
using Elsa.Quartz.Contracts;
using Elsa.Quartz.Handlers;
using Elsa.Quartz.Jobs;
using Elsa.Quartz.Services;
using Elsa.Quartz.Tasks;
using Elsa.Scheduling;
using Elsa.Scheduling.Features;
using Elsa.Workflows;
Expand Down Expand Up @@ -34,9 +36,12 @@ public override void Configure()
/// <inheritdoc />
public override void Apply()
{
Services.AddSingleton<IActivityDescriptorModifier, CronActivityDescriptorModifier>();
Services.AddSingleton<QuartzCronParser>();
Services.AddSingleton<QuartzWorkflowScheduler>();
Services.AddQuartz();
Services
.AddSingleton<IActivityDescriptorModifier, CronActivityDescriptorModifier>()
.AddSingleton<QuartzCronParser>()
.AddScoped<QuartzWorkflowScheduler>()
.AddScoped<IJobKeyProvider, JobKeyProvider>()
.AddStartupTask<RegisterJobsTask>()
.AddQuartz();
}
}
19 changes: 19 additions & 0 deletions src/modules/Elsa.Quartz/Services/JobKeyProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Elsa.Common.Multitenancy;
using Elsa.Quartz.Contracts;
using Quartz;

namespace Elsa.Quartz.Services;

internal class JobKeyProvider(ITenantAccessor tenantAccessor) : IJobKeyProvider
{
public JobKey GetJobKey<TJob>() where TJob : IJob
{
return new(typeof(TJob).Name, GetGroupName());
}

public string GetGroupName()
{
var tenantId = tenantAccessor.Tenant?.Id;
return string.IsNullOrWhiteSpace(tenantId) ? "Default" : tenantId;
}
}
60 changes: 26 additions & 34 deletions src/modules/Elsa.Quartz/Services/QuartzWorkflowScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,108 +1,103 @@
using Elsa.Common;
using Elsa.Common.Multitenancy;
using Elsa.Extensions;
using Elsa.Quartz.Contracts;
using Elsa.Quartz.Jobs;
using Elsa.Scheduling;
using Quartz;
using IScheduler = Quartz.IScheduler;

namespace Elsa.Quartz.Services;

/// <summary>
/// An implementation of <see cref="IWorkflowScheduler"/> that uses Quartz.NET.
/// </summary>
public class QuartzWorkflowScheduler(ISchedulerFactory schedulerFactoryFactory, IJsonSerializer jsonSerializer, ITenantAccessor tenantAccessor) : IWorkflowScheduler
internal class QuartzWorkflowScheduler(ISchedulerFactory schedulerFactoryFactory, IJsonSerializer jsonSerializer, ITenantAccessor tenantAccessor, IJobKeyProvider jobKeyProvider) : IWorkflowScheduler
{
/// <inheritdoc />
public async ValueTask ScheduleAtAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<RunWorkflowJob>()
.WithIdentity(GetRunWorkflowJobKey())
.Build();

var trigger = TriggerBuilder.Create()
.ForJob(GetRunWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.WithIdentity(GetTriggerKey(taskName))
.StartAt(at)
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleAtAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<ResumeWorkflowJob>().WithIdentity(GetResumeWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.ForJob(GetResumeWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.WithIdentity(GetTriggerKey(taskName))
.StartAt(at)
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleRecurringAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, DateTimeOffset startAt, TimeSpan interval, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<RunWorkflowJob>().WithIdentity(GetRunWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.WithIdentity(GetTriggerKey(taskName))
.ForJob(GetRunWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.StartAt(startAt)
.WithSimpleSchedule(schedule => schedule.WithInterval(interval).RepeatForever())
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleRecurringAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, DateTimeOffset startAt, TimeSpan interval, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<ResumeWorkflowJob>().WithIdentity(GetResumeWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.WithIdentity(GetTriggerKey(taskName))
.ForJob(GetResumeWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.StartAt(startAt)
.WithSimpleSchedule(schedule => schedule.WithInterval(interval).RepeatForever())
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleCronAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, string cronExpression, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<RunWorkflowJob>().WithIdentity(GetRunWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.UsingJobData(CreateJobDataMap(request))
.ForJob(GetRunWorkflowJobKey())
.WithIdentity(GetTriggerKey(taskName))
.WithCronSchedule(cronExpression)
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleCronAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, string cronExpression, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<ResumeWorkflowJob>().WithIdentity(GetResumeWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.ForJob(GetResumeWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.WithIdentity(GetTriggerKey(taskName))
.WithCronSchedule(cronExpression).Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
Expand All @@ -112,6 +107,12 @@ public async ValueTask UnscheduleAsync(string taskName, CancellationToken cancel
var triggerKey = GetTriggerKey(taskName);
await scheduler.UnscheduleJob(triggerKey, cancellationToken);
}

private async Task ScheduleJobAsync(IScheduler scheduler, ITrigger trigger, CancellationToken cancellationToken)
{
if (!await scheduler.CheckExists(trigger.Key, cancellationToken))
await scheduler.ScheduleJob(trigger, cancellationToken);
}

private JobDataMap CreateJobDataMap(ScheduleNewWorkflowInstanceRequest request)
{
Expand All @@ -138,18 +139,9 @@ private JobDataMap CreateJobDataMap(ScheduleExistingWorkflowInstanceRequest requ
.AddIfNotEmpty(nameof(ScheduleExistingWorkflowInstanceRequest.ActivityHandle), serializedActivityHandle)
.AddIfNotEmpty(nameof(ScheduleExistingWorkflowInstanceRequest.BookmarkId), request.BookmarkId);
}

private JobKey GetRunWorkflowJobKey() => new(nameof(RunWorkflowJob), GetGroupName());
private JobKey GetResumeWorkflowJobKey() => new(nameof(ResumeWorkflowJob), GetGroupName());

private string GetGroupName()
{
var tenantId = tenantAccessor.Tenant?.Id;
return string.IsNullOrWhiteSpace(tenantId) ? "Default" : tenantId;
}

private TriggerKey GetTriggerKey(string taskName)
{
return new TriggerKey(taskName, GetGroupName());
}
private JobKey GetRunWorkflowJobKey() => jobKeyProvider.GetJobKey<RunWorkflowJob>();
private JobKey GetResumeWorkflowJobKey() => jobKeyProvider.GetJobKey<ResumeWorkflowJob>();
private string GetGroupName() => jobKeyProvider.GetGroupName();
private TriggerKey GetTriggerKey(string taskName) => new(taskName, GetGroupName());
}
35 changes: 35 additions & 0 deletions src/modules/Elsa.Quartz/Tasks/RegisterJobsTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Elsa.Common;
using Elsa.Quartz.Contracts;
using Elsa.Quartz.Jobs;
using JetBrains.Annotations;
using Quartz;

namespace Elsa.Quartz.Tasks;

/// <summary>
/// Registers the Quartz jobs.
/// </summary>
/// <param name="schedulerFactoryFactory"></param>
/// <param name="jobKeyProvider"></param>
[UsedImplicitly]
internal class RegisterJobsTask(ISchedulerFactory schedulerFactoryFactory, IJobKeyProvider jobKeyProvider) : IStartupTask
{
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
await CreateJobAsync<RunWorkflowJob>(scheduler, cancellationToken);
await CreateJobAsync<ResumeWorkflowJob>(scheduler, cancellationToken);
}

private async Task CreateJobAsync<TJobType>(IScheduler scheduler, CancellationToken cancellationToken) where TJobType : IJob
{
var key = jobKeyProvider.GetJobKey<TJobType>();
var job = JobBuilder.Create<TJobType>()
.WithIdentity(key)
.StoreDurably()
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.AddJob(job, false, cancellationToken);
}
}

0 comments on commit c50a53f

Please sign in to comment.