Skip to content

Commit

Permalink
rivertest.Worker: Use job executor, always insert jobs, require tx (#766
Browse files Browse the repository at this point in the history
)

* allow setting created_at on insert for clock overrides

While the client code was attempting to set `created_at` when the clock
was stubbed, the driver code was not actually utilizing this at all.
This commit makes it possible to set the `created_at` (similar to
`scheduled_at`) using a mocked clock. It also removes the ability to
set `finalized_at`, which has no need to be set at insert (other than
for tests utilizing `JobInsertFull`).

* Let test workers call `JobCompleteTx` + test work transaction variants

Follows up #753 to make it possible to call `JobCompleteTx` inside test
worker implementations. The test worker tries to update an equivalent
job in the database's state to `running` so `JobCompleteTx` will be able
to find it when called (it only considers jobs in `running` state).

We also augment the API so that it's workable for users who want to use
test transactions in their tests (which is hopefully the vast majority
of users) by adding transaction variants `WorkTx` and `WorkJobTx`.

* always take tx in rivertest.Worker methods

* always insert jobs for testworker

* use real executor for rivertest worker, alter APIs

Leverage the actual job executor for the `rivertest.Worker` type so that
it closely matches real job executions. Reduce the exposed APIs so it
only has `Work` and `WorkJob`, both of which require a transaction (but
the latter works on existing jobs).

Rather than returning only an error, also return a `WorkResult` struct
that contains additional metadata. Populate it with the final updated
job and the event kind to indicate whether the job completed, errored,
snoozed, cancelled, etc.

---------

Co-authored-by: Brandur <brandur@brandur.org>
  • Loading branch information
bgentry and brandur authored Feb 18, 2025
1 parent e4353d3 commit ded1d06
Show file tree
Hide file tree
Showing 13 changed files with 564 additions and 259 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.18.0 has breaking changes for the `rivertest.Worker` type that was just introduced. While attempting to round out some edge cases with its design, we realized some of them simply couldn't be solved adequately without changing the overall design such that all tested jobs are inserted into the database. Given the short duration since it was released (over a weekend) it's unlikely many users have adopted it and it seemed best to rip off the bandaid to fix it before it gets widely used.

### Changed

- **Breaking change:** The `rivertest.Worker` type now requires all jobs to be inserted into the database. The original design allowed workers to be tested without hitting the database at all. Ultimately this design made it hard to correctly simulate features like `JobCompleteTx` and the other potential solutions seemed undesirable.

As part of this change, the `Work` and `WorkJob` methods now take a transaction argument. The expectation is that a transaction will be opened by the caller and rolled back after test completion. Additionally, the return signature was changed to return a `WorkResult` struct alongside the error. The struct includes the post-execution job row as well as the event kind that occurred, making it easy to inspect the job's state after execution.

Finally, the implementation was refactored so that it uses the _real_ `river.Client` insert path, and also uses the same job execution path as real execution. This minimizes the potential for differences in behavior between testing and real execution.
[PR #766](https://github.com/riverqueue/river/pull/766).

## [0.17.0] - 2025-02-16

### Added
Expand Down
18 changes: 15 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ const (

// TestConfig contains configuration specific to test environments.
type TestConfig struct {
// DisableUniqueEnforcement disables the application of unique job
// constraints. This is useful for testing scenarios when testing a worker
// that typically uses uniqueness, but where enforcing uniqueness would cause
// conflicts with parallel test execution.
//
// The [rivertest.Worker] type automatically disables uniqueness enforcement
// when creating jobs.
DisableUniqueEnforcement bool

// Time is a time generator to make time stubbable in tests.
Time rivertype.TimeGenerator
}
Expand Down Expand Up @@ -1253,9 +1262,12 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
return nil, errors.New("priority must be between 1 and 4")
}

uniqueOpts := insertOpts.UniqueOpts
if uniqueOpts.isEmpty() {
uniqueOpts = jobInsertOpts.UniqueOpts
var uniqueOpts UniqueOpts
if !config.Test.DisableUniqueEnforcement {
uniqueOpts = insertOpts.UniqueOpts
if uniqueOpts.isEmpty() {
uniqueOpts = jobInsertOpts.UniqueOpts
}
}
if err := uniqueOpts.validate(); err != nil {
return nil, err
Expand Down
39 changes: 36 additions & 3 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)),
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Expand All @@ -901,7 +902,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.Empty(t, job.AttemptedBy)
require.WithinDuration(t, now, job.CreatedAt, 2*time.Second)
require.WithinDuration(t, now.Add(time.Duration(i)*5*time.Second), job.CreatedAt, time.Millisecond)
require.JSONEq(t, `{"encoded": "args"}`, string(job.EncodedArgs))
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
Expand Down Expand Up @@ -974,6 +975,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)),
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Expand All @@ -994,10 +996,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
for i, job := range jobsAfter {
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second)
require.WithinDuration(t, now.Add(time.Duration(i)*5*time.Second), job.CreatedAt, time.Millisecond)
require.JSONEq(t, `{"encoded": "args"}`, string(job.EncodedArgs))
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
Expand All @@ -1012,6 +1014,37 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
}
})

t.Run("MissingCreatedAtDefaultsToNow", func(t *testing.T) {
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
CreatedAt: nil, // explicit nil
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: ptrutil.Ptr(time.Now().UTC()),
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
}

count, err := exec.JobInsertFastManyNoReturning(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second)
}
})

t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) {
exec, _ := setup(ctx, t)

Expand Down
48 changes: 28 additions & 20 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt
func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*riverdriver.JobInsertFastResult, error) {
insertJobsParams := &dbsqlc.JobInsertFastManyParams{
Args: make([]string, len(params)),
CreatedAt: make([]time.Time, len(params)),
Kind: make([]string, len(params)),
MaxAttempts: make([]int16, len(params)),
Metadata: make([]string, len(params)),
Expand All @@ -220,6 +221,11 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.
for i := 0; i < len(params); i++ {
params := params[i]

createdAt := now
if params.CreatedAt != nil {
createdAt = *params.CreatedAt
}

scheduledAt := now
if params.ScheduledAt != nil {
scheduledAt = *params.ScheduledAt
Expand All @@ -233,6 +239,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.
defaultObject := "{}"

insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject)
insertJobsParams.CreatedAt[i] = createdAt
insertJobsParams.Kind[i] = params.Kind
insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec
insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject)
Expand Down Expand Up @@ -262,6 +269,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.
func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) {
insertJobsParams := &dbsqlc.JobInsertFastManyNoReturningParams{
Args: make([]string, len(params)),
CreatedAt: make([]time.Time, len(params)),
Kind: make([]string, len(params)),
MaxAttempts: make([]int16, len(params)),
Metadata: make([]string, len(params)),
Expand All @@ -278,6 +286,11 @@ func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*r
for i := 0; i < len(params); i++ {
params := params[i]

createdAt := now
if params.CreatedAt != nil {
createdAt = *params.CreatedAt
}

scheduledAt := now
if params.ScheduledAt != nil {
scheduledAt = *params.ScheduledAt
Expand All @@ -291,6 +304,7 @@ func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*r
defaultObject := "{}"

insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject)
insertJobsParams.CreatedAt[i] = createdAt
insertJobsParams.Kind[i] = params.Kind
insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec
insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject)
Expand Down
4 changes: 2 additions & 2 deletions riverdriver/riverpgxv5/internal/dbsqlc/copyfrom.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ LIMIT @max;
-- name: JobInsertFastMany :many
INSERT INTO river_job(
args,
created_at,
kind,
max_attempts,
metadata,
Expand All @@ -209,6 +210,7 @@ INSERT INTO river_job(
unique_states
) SELECT
unnest(@args::jsonb[]),
unnest(@created_at::timestamptz[]),
unnest(@kind::text[]),
unnest(@max_attempts::smallint[]),
unnest(@metadata::jsonb[]),
Expand Down Expand Up @@ -237,6 +239,7 @@ RETURNING sqlc.embed(river_job), (xmax != 0) AS unique_skipped_as_duplicate;
-- name: JobInsertFastManyNoReturning :execrows
INSERT INTO river_job(
args,
created_at,
kind,
max_attempts,
metadata,
Expand All @@ -249,6 +252,7 @@ INSERT INTO river_job(
unique_states
) SELECT
unnest(@args::jsonb[]),
unnest(@created_at::timestamptz[]),
unnest(@kind::text[]),
unnest(@max_attempts::smallint[]),
unnest(@metadata::jsonb[]),
Expand Down
Loading

0 comments on commit ded1d06

Please sign in to comment.