Skip to content

Commit

Permalink
tweak JobRetry to reset ScheduledAt in most cases (#211)
Browse files Browse the repository at this point in the history
A job which has already completed will have a ScheduledAt that could be
long in the past. Now that we're re-scheduling it, we should update that
to the current time to slot it in alongside other recently-scheduled
jobs and not skip the line. Also, its wait duration can't be calculated
accurately if we don't reset the ScheduledAt (it could show days of wait
time even if it was picked up immediately after being retried).
  • Loading branch information
bgentry authored Feb 18, 2024
1 parent e6b9358 commit 985e684
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Tweaked behavior of `JobRetry` so that it does actually update the `ScheduledAt` time of the job in all cases where the job is actually being rescheduled. As before, jobs which are already available with a past `ScheduledAt` will not be touched by this query so that they retain their place in line. [PR #211](https://github.com/riverqueue/river/pull/211).

## [0.0.20] - 2024-02-14

### Added
Expand Down
34 changes: 31 additions & 3 deletions internal/dbadapter/db_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,16 +1064,44 @@ func Test_StandardAdapter_JobRetryImmediately(t *testing.T) {
})
}

t.Run("DoesNotAlterScheduledAtIfInThePast", func(t *testing.T) {
// We don't want to update ScheduledAt if the job was already scheduled
t.Run("AltersScheduledAtForAlreadyCompletedJob", func(t *testing.T) {
// A job which has already completed will have a ScheduledAt that could be
// long in the past. Now that we're re-scheduling it, we should update that
// to the current time to slot it in alongside other recently-scheduled jobs
// and not skip the line; also, its wait duration can't be calculated
// accurately if we don't reset the scheduled_at.
t.Parallel()

adapter, bundle := setupTx(t)

params := makeFakeJobInsertParams(0, nil)
params.ScheduledAt = bundle.baselineTime.Add(-1 * time.Hour)
res, err := adapter.JobInsert(ctx, params)
require.NoError(t, err)
_, err = adapter.queries.JobUpdate(ctx, bundle.ex, dbsqlc.JobUpdateParams{
FinalizedAtDoUpdate: true,
FinalizedAt: &bundle.baselineTime,
ID: res.Job.ID,
StateDoUpdate: true,
State: dbsqlc.JobStateCompleted,
})
require.NoError(t, err)

jAfter, err := adapter.JobRetryImmediately(ctx, res.Job.ID)
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateAvailable, jAfter.State)
require.WithinDuration(t, time.Now().UTC(), jAfter.ScheduledAt, 5*time.Second)
})

t.Run("DoesNotAlterScheduledAtIfInThePastAndJobAlreadyAvailable", func(t *testing.T) {
// We don't want to update ScheduledAt if the job was already available
// because doing so can make it lose its place in line.
t.Parallel()

adapter, bundle := setupTx(t)

params := makeFakeJobInsertParams(0, nil)
params.ScheduledAt = bundle.baselineTime.Add(-1 * time.Hour)
params.State = dbsqlc.JobStateScheduled
res, err := adapter.JobInsert(ctx, params)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ updated_job AS (
UPDATE river_job
SET
state = 'available'::river_job_state,
scheduled_at = CASE WHEN scheduled_at < now() THEN scheduled_at ELSE now() END,
scheduled_at = now(),
max_attempts = CASE WHEN attempt = max_attempts THEN max_attempts + 1 ELSE max_attempts END,
finalized_at = NULL
FROM job_to_update
Expand Down
2 changes: 1 addition & 1 deletion internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 985e684

Please sign in to comment.