diff --git a/internal/dbadapter/db_adapter.go b/internal/dbadapter/db_adapter.go index bea47538..d9e799aa 100644 --- a/internal/dbadapter/db_adapter.go +++ b/internal/dbadapter/db_adapter.go @@ -524,6 +524,10 @@ func JobSetStateSnoozed(id int64, scheduledAt time.Time, maxAttempts int) *JobSe return &JobSetStateIfRunningParams{ID: id, maxAttempts: &maxAttempts, scheduledAt: &scheduledAt, state: dbsqlc.JobStateScheduled} } +func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams { + return &JobSetStateIfRunningParams{ID: id, maxAttempts: &maxAttempts, scheduledAt: &scheduledAt, state: dbsqlc.JobStateAvailable} +} + func (a *StandardAdapter) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) { ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout) defer cancel() diff --git a/job_executor.go b/job_executor.go index 3f2bbd23..a7e3ffae 100644 --- a/job_executor.go +++ b/job_executor.go @@ -244,7 +244,19 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) slog.Duration("duration", snoozeErr.duration), ) nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration) - if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)); err != nil { + + // Normally, snoozed jobs are set `scheduled` for the future and it's the + // scheduler's job to set them back to `available` so they can be reworked. + // Just as with retryable jobs, this isn't friendly for short snooze times + // so we instead make the job immediately `available` if the snooze time is + // smaller than the scheduler's run interval. + var params *dbadapter.JobSetStateIfRunningParams + if nextAttemptScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + params = dbadapter.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) + } else { + params = dbadapter.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) + } + if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job", slog.Int64("job_id", e.JobRow.ID), ) diff --git a/job_executor_test.go b/job_executor_test.go index b01ca605..6147969f 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -372,6 +372,26 @@ func TestJobExecutor_Execute(t *testing.T) { require.Empty(t, job.Errors) }) + t.Run("JobSnoozeErrorInNearFutureMakesJobAvailableAndIncrementsMaxAttempts", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + maxAttemptsBefore := bundle.jobRow.MaxAttempts + + cancelErr := JobSnooze(time.Millisecond) + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + executor.Completer.Wait() + + job, err := queries.JobGetByID(ctx, bundle.tx, bundle.jobRow.ID) + require.NoError(t, err) + require.Equal(t, dbsqlc.JobStateAvailable, job.State) + require.WithinDuration(t, time.Now(), job.ScheduledAt, 2*time.Second) + require.Equal(t, maxAttemptsBefore+1, int(job.MaxAttempts)) + require.Empty(t, job.Errors) + }) + t.Run("ErrorWithCustomRetryPolicy", func(t *testing.T) { t.Parallel()