diff --git a/CHANGELOG.md b/CHANGELOG.md index e449a36b..ce810ca8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tweaked behavior of `JobRetry` so that it does actually update the `ScheduledAt` time of the job in all cases where the job is actually being rescheduled. As before, jobs which are already available with a past `ScheduledAt` will not be touched by this query so that they retain their place in line. [PR #211](https://github.com/riverqueue/river/pull/211). + ## [0.0.20] - 2024-02-14 ### Added diff --git a/internal/dbadapter/db_adapter_test.go b/internal/dbadapter/db_adapter_test.go index 975ce4be..fc649b3b 100644 --- a/internal/dbadapter/db_adapter_test.go +++ b/internal/dbadapter/db_adapter_test.go @@ -1064,8 +1064,37 @@ func Test_StandardAdapter_JobRetryImmediately(t *testing.T) { }) } - t.Run("DoesNotAlterScheduledAtIfInThePast", func(t *testing.T) { - // We don't want to update ScheduledAt if the job was already scheduled + t.Run("AltersScheduledAtForAlreadyCompletedJob", func(t *testing.T) { + // A job which has already completed will have a ScheduledAt that could be + // long in the past. Now that we're re-scheduling it, we should update that + // to the current time to slot it in alongside other recently-scheduled jobs + // and not skip the line; also, its wait duration can't be calculated + // accurately if we don't reset the scheduled_at. + t.Parallel() + + adapter, bundle := setupTx(t) + + params := makeFakeJobInsertParams(0, nil) + params.ScheduledAt = bundle.baselineTime.Add(-1 * time.Hour) + res, err := adapter.JobInsert(ctx, params) + require.NoError(t, err) + _, err = adapter.queries.JobUpdate(ctx, bundle.ex, dbsqlc.JobUpdateParams{ + FinalizedAtDoUpdate: true, + FinalizedAt: &bundle.baselineTime, + ID: res.Job.ID, + StateDoUpdate: true, + State: dbsqlc.JobStateCompleted, + }) + require.NoError(t, err) + + jAfter, err := adapter.JobRetryImmediately(ctx, res.Job.ID) + require.NoError(t, err) + require.Equal(t, dbsqlc.JobStateAvailable, jAfter.State) + require.WithinDuration(t, time.Now().UTC(), jAfter.ScheduledAt, 5*time.Second) + }) + + t.Run("DoesNotAlterScheduledAtIfInThePastAndJobAlreadyAvailable", func(t *testing.T) { + // We don't want to update ScheduledAt if the job was already available // because doing so can make it lose its place in line. t.Parallel() @@ -1073,7 +1102,6 @@ func Test_StandardAdapter_JobRetryImmediately(t *testing.T) { params := makeFakeJobInsertParams(0, nil) params.ScheduledAt = bundle.baselineTime.Add(-1 * time.Hour) - params.State = dbsqlc.JobStateScheduled res, err := adapter.JobInsert(ctx, params) require.NoError(t, err) diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 2deb63df..53351904 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -260,7 +260,7 @@ updated_job AS ( UPDATE river_job SET state = 'available'::river_job_state, - scheduled_at = CASE WHEN scheduled_at < now() THEN scheduled_at ELSE now() END, + scheduled_at = now(), max_attempts = CASE WHEN attempt = max_attempts THEN max_attempts + 1 ELSE max_attempts END, finalized_at = NULL FROM job_to_update diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 8d596437..91886bfa 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -656,7 +656,7 @@ updated_job AS ( UPDATE river_job SET state = 'available'::river_job_state, - scheduled_at = CASE WHEN scheduled_at < now() THEN scheduled_at ELSE now() END, + scheduled_at = now(), max_attempts = CASE WHEN attempt = max_attempts THEN max_attempts + 1 ELSE max_attempts END, finalized_at = NULL FROM job_to_update