Skip to content

Commit

Permalink
Introduce JobKeyProvider for managing Quartz job keys
Browse files Browse the repository at this point in the history
Added JobKeyProvider to centralize job key and group name handling, simplifying Quartz job scheduling. Refactored QuartzWorkflowScheduler to use the new provider, improving maintainability. Updated QuartzSchedulerFeature to register the new provider and a startup task for job registration.
  • Loading branch information
sfmskywalker committed Dec 11, 2024
1 parent 9725be0 commit 96bf791
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 39 deletions.
9 changes: 9 additions & 0 deletions src/modules/Elsa.Quartz/Contracts/IJobKeyProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Quartz;

namespace Elsa.Quartz.Contracts;

internal interface IJobKeyProvider
{
JobKey GetJobKey<TJob>() where TJob : IJob;
string GetGroupName();
}
15 changes: 10 additions & 5 deletions src/modules/Elsa.Quartz/Features/QuartzSchedulerFeature.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Elsa.Extensions;
using Elsa.Features.Abstractions;
using Elsa.Features.Attributes;
using Elsa.Features.Services;
using Elsa.Quartz.Contracts;
using Elsa.Quartz.Handlers;
using Elsa.Quartz.Jobs;
using Elsa.Quartz.Services;
using Elsa.Quartz.Tasks;
using Elsa.Scheduling;
using Elsa.Scheduling.Features;
using Elsa.Workflows;
Expand Down Expand Up @@ -34,9 +36,12 @@ public override void Configure()
/// <inheritdoc />
public override void Apply()
{
Services.AddSingleton<IActivityDescriptorModifier, CronActivityDescriptorModifier>();
Services.AddSingleton<QuartzCronParser>();
Services.AddSingleton<QuartzWorkflowScheduler>();
Services.AddQuartz();
Services
.AddSingleton<IActivityDescriptorModifier, CronActivityDescriptorModifier>()
.AddSingleton<QuartzCronParser>()
.AddScoped<QuartzWorkflowScheduler>()
.AddScoped<IJobKeyProvider, JobKeyProvider>()
.AddStartupTask<RegisterJobsTask>()
.AddQuartz();
}
}
19 changes: 19 additions & 0 deletions src/modules/Elsa.Quartz/Services/JobKeyProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Elsa.Common.Multitenancy;
using Elsa.Quartz.Contracts;
using Quartz;

namespace Elsa.Quartz.Services;

internal class JobKeyProvider(ITenantAccessor tenantAccessor) : IJobKeyProvider
{
public JobKey GetJobKey<TJob>() where TJob : IJob
{
return new(typeof(TJob).Name, GetGroupName());
}

public string GetGroupName()
{
var tenantId = tenantAccessor.Tenant?.Id;
return string.IsNullOrWhiteSpace(tenantId) ? "Default" : tenantId;
}
}
60 changes: 26 additions & 34 deletions src/modules/Elsa.Quartz/Services/QuartzWorkflowScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,108 +1,103 @@
using Elsa.Common;
using Elsa.Common.Multitenancy;
using Elsa.Extensions;
using Elsa.Quartz.Contracts;
using Elsa.Quartz.Jobs;
using Elsa.Scheduling;
using Quartz;
using IScheduler = Quartz.IScheduler;

namespace Elsa.Quartz.Services;

/// <summary>
/// An implementation of <see cref="IWorkflowScheduler"/> that uses Quartz.NET.
/// </summary>
public class QuartzWorkflowScheduler(ISchedulerFactory schedulerFactoryFactory, IJsonSerializer jsonSerializer, ITenantAccessor tenantAccessor) : IWorkflowScheduler
internal class QuartzWorkflowScheduler(ISchedulerFactory schedulerFactoryFactory, IJsonSerializer jsonSerializer, ITenantAccessor tenantAccessor, IJobKeyProvider jobKeyProvider) : IWorkflowScheduler
{
/// <inheritdoc />
public async ValueTask ScheduleAtAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<RunWorkflowJob>()
.WithIdentity(GetRunWorkflowJobKey())
.Build();

var trigger = TriggerBuilder.Create()
.ForJob(GetRunWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.WithIdentity(GetTriggerKey(taskName))
.StartAt(at)
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleAtAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<ResumeWorkflowJob>().WithIdentity(GetResumeWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.ForJob(GetResumeWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.WithIdentity(GetTriggerKey(taskName))
.StartAt(at)
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleRecurringAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, DateTimeOffset startAt, TimeSpan interval, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<RunWorkflowJob>().WithIdentity(GetRunWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.WithIdentity(GetTriggerKey(taskName))
.ForJob(GetRunWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.StartAt(startAt)
.WithSimpleSchedule(schedule => schedule.WithInterval(interval).RepeatForever())
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleRecurringAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, DateTimeOffset startAt, TimeSpan interval, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<ResumeWorkflowJob>().WithIdentity(GetResumeWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.WithIdentity(GetTriggerKey(taskName))
.ForJob(GetResumeWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.StartAt(startAt)
.WithSimpleSchedule(schedule => schedule.WithInterval(interval).RepeatForever())
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleCronAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, string cronExpression, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<RunWorkflowJob>().WithIdentity(GetRunWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.UsingJobData(CreateJobDataMap(request))
.ForJob(GetRunWorkflowJobKey())
.WithIdentity(GetTriggerKey(taskName))
.WithCronSchedule(cronExpression)
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
public async ValueTask ScheduleCronAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, string cronExpression, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
var job = JobBuilder.Create<ResumeWorkflowJob>().WithIdentity(GetResumeWorkflowJobKey()).Build();
var trigger = TriggerBuilder.Create()
.ForJob(GetResumeWorkflowJobKey())
.UsingJobData(CreateJobDataMap(request))
.WithIdentity(GetTriggerKey(taskName))
.WithCronSchedule(cronExpression).Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.ScheduleJob(job, trigger, cancellationToken);
await ScheduleJobAsync(scheduler, trigger, cancellationToken);
}

/// <inheritdoc />
Expand All @@ -112,6 +107,12 @@ public async ValueTask UnscheduleAsync(string taskName, CancellationToken cancel
var triggerKey = GetTriggerKey(taskName);
await scheduler.UnscheduleJob(triggerKey, cancellationToken);
}

private async Task ScheduleJobAsync(IScheduler scheduler, ITrigger trigger, CancellationToken cancellationToken)
{
if (!await scheduler.CheckExists(trigger.Key, cancellationToken))
await scheduler.ScheduleJob(trigger, cancellationToken);
}

private JobDataMap CreateJobDataMap(ScheduleNewWorkflowInstanceRequest request)
{
Expand All @@ -138,18 +139,9 @@ private JobDataMap CreateJobDataMap(ScheduleExistingWorkflowInstanceRequest requ
.AddIfNotEmpty(nameof(ScheduleExistingWorkflowInstanceRequest.ActivityHandle), serializedActivityHandle)
.AddIfNotEmpty(nameof(ScheduleExistingWorkflowInstanceRequest.BookmarkId), request.BookmarkId);
}

private JobKey GetRunWorkflowJobKey() => new(nameof(RunWorkflowJob), GetGroupName());
private JobKey GetResumeWorkflowJobKey() => new(nameof(ResumeWorkflowJob), GetGroupName());

private string GetGroupName()
{
var tenantId = tenantAccessor.Tenant?.Id;
return string.IsNullOrWhiteSpace(tenantId) ? "Default" : tenantId;
}

private TriggerKey GetTriggerKey(string taskName)
{
return new TriggerKey(taskName, GetGroupName());
}
private JobKey GetRunWorkflowJobKey() => jobKeyProvider.GetJobKey<RunWorkflowJob>();
private JobKey GetResumeWorkflowJobKey() => jobKeyProvider.GetJobKey<ResumeWorkflowJob>();
private string GetGroupName() => jobKeyProvider.GetGroupName();
private TriggerKey GetTriggerKey(string taskName) => new(taskName, GetGroupName());
}
35 changes: 35 additions & 0 deletions src/modules/Elsa.Quartz/Tasks/RegisterJobsTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Elsa.Common;
using Elsa.Quartz.Contracts;
using Elsa.Quartz.Jobs;
using JetBrains.Annotations;
using Quartz;

namespace Elsa.Quartz.Tasks;

/// <summary>
/// Registers the Quartz jobs.
/// </summary>
/// <param name="schedulerFactoryFactory"></param>
/// <param name="jobKeyProvider"></param>
[UsedImplicitly]
internal class RegisterJobsTask(ISchedulerFactory schedulerFactoryFactory, IJobKeyProvider jobKeyProvider) : IStartupTask
{
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var scheduler = await schedulerFactoryFactory.GetScheduler(cancellationToken);
await CreateJobAsync<RunWorkflowJob>(scheduler, cancellationToken);
await CreateJobAsync<ResumeWorkflowJob>(scheduler, cancellationToken);
}

private async Task CreateJobAsync<TJobType>(IScheduler scheduler, CancellationToken cancellationToken) where TJobType : IJob
{
var key = jobKeyProvider.GetJobKey<TJobType>();
var job = JobBuilder.Create<TJobType>()
.WithIdentity(key)
.StoreDurably()
.Build();

if (!await scheduler.CheckExists(job.Key, cancellationToken))
await scheduler.AddJob(job, false, cancellationToken);
}
}

0 comments on commit 96bf791

Please sign in to comment.