Skip to content

Commit

Permalink
smarter short snooze times
Browse files Browse the repository at this point in the history
Normally, snoozed jobs are set `scheduled` for the future and it's the
scheduler's job to set them back to `available` so they can be reworked.
Just as with retryable jobs, this isn't friendly for short snooze times
so we instead make the job immediately `available` if the snooze time is
smaller than the scheduler's run interval.

We have an identical behavior for errored/retryable jobs for the same
reason.
  • Loading branch information
bgentry committed Jan 19, 2024
1 parent 0476d14 commit baa1eaf
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
4 changes: 4 additions & 0 deletions internal/dbadapter/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ func JobSetStateSnoozed(id int64, scheduledAt time.Time, maxAttempts int) *JobSe
return &JobSetStateIfRunningParams{ID: id, maxAttempts: &maxAttempts, scheduledAt: &scheduledAt, state: dbsqlc.JobStateScheduled}
}

func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, maxAttempts: &maxAttempts, scheduledAt: &scheduledAt, state: dbsqlc.JobStateAvailable}
}

func (a *StandardAdapter) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()
Expand Down
14 changes: 13 additions & 1 deletion job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,19 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult)
slog.Duration("duration", snoozeErr.duration),
)
nextAttemptScheduledAt := time.Now().Add(snoozeErr.duration)
if err := e.Completer.JobSetStateIfRunning(e.stats, dbadapter.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)); err != nil {

// Normally, snoozed jobs are set `scheduled` for the future and it's the
// scheduler's job to set them back to `available` so they can be reworked.
// Just as with retryable jobs, this isn't friendly for short snooze times
// so we instead make the job immediately `available` if the snooze time is
// smaller than the scheduler's run interval.
var params *dbadapter.JobSetStateIfRunningParams
if nextAttemptScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval {
params = dbadapter.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)
} else {
params = dbadapter.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)
}
if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job",
slog.Int64("job_id", e.JobRow.ID),
)
Expand Down
20 changes: 20 additions & 0 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,26 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Empty(t, job.Errors)
})

t.Run("JobSnoozeErrorInNearFutureMakesJobAvailableAndIncrementsMaxAttempts", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)
maxAttemptsBefore := bundle.jobRow.MaxAttempts

cancelErr := JobSnooze(time.Millisecond)
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow)

executor.Execute(ctx)
executor.Completer.Wait()

job, err := queries.JobGetByID(ctx, bundle.tx, bundle.jobRow.ID)
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateAvailable, job.State)
require.WithinDuration(t, time.Now(), job.ScheduledAt, 2*time.Second)
require.Equal(t, maxAttemptsBefore+1, int(job.MaxAttempts))
require.Empty(t, job.Errors)
})

t.Run("ErrorWithCustomRetryPolicy", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit baa1eaf

Please sign in to comment.