Skip to content

Commit

Permalink
Fix intermittent test problem in rivertest + more transactional use (
Browse files Browse the repository at this point in the history
…#226)

This one's aimed at fixing #225, in which I think what was happening is
that because we were inserting a test job on a non-transactional pool
with a state of `scheduled`, in some cases the queue maintainer's
scheduler service was actually changing it back to `available` before
the test case would check it with a `RequireInserted`.

As I was moving through the test suite, I realized that we could be
making a lot more use of `InsertTx` variants pretty much everywhere
since we were mostly testing the transactional variables of
`RequireInserted`, so that I changed those over.

Then I realize that even when we were testing the non-transactional
variants, we didn't actually need a started client to do so, so I took
that code out. Then I realized that since we're not starting the client,
we didn't actually need to specify `Queues` or `Workers` (I believe the
idea of an insert-only came about only after I wrote these tests
originally). And if we didn't need to specify `Workers`, then we didn't
even need the worker structs, so I got rid of those too.

Overall, I think we simplify things quite a bit, and because the client
doesn't need to be started, probably reduces the possibility of future
intermittency problems like we observed in #225.
  • Loading branch information
brandur authored Feb 25, 2024
1 parent 1cf1a49 commit 5a6bf92
Showing 1 changed file with 41 additions and 125 deletions.
166 changes: 41 additions & 125 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,12 @@ type Job1Args struct {

func (Job1Args) Kind() string { return "job1" }

type Job1Worker struct {
river.WorkerDefaults[Job1Args]
}

func (w *Job1Worker) Work(ctx context.Context, job *river.Job[Job1Args]) error { return nil }

type Job2Args struct {
Int int `json:"int"`
}

func (Job2Args) Kind() string { return "job2" }

type Job2Worker struct {
river.WorkerDefaults[Job2Args]
}

func (w *Job2Worker) Work(ctx context.Context, job *river.Job[Job2Args]) error { return nil }

// The tests for this function are quite minimal because it uses the same
// implementation as the `*Tx` variant, so most of the test happens below.
func TestRequireInserted(t *testing.T) {
Expand All @@ -55,26 +43,13 @@ func TestRequireInserted(t *testing.T) {
mockT *MockT
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
require.NoError(t, err)

err = riverClient.Start(ctx)
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

return riverClient, &testBundle{
dbPool: dbPool,
Expand Down Expand Up @@ -102,40 +77,19 @@ func TestRequireInsertedTx(t *testing.T) {
ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
mockT *MockT
tx pgx.Tx
mockT *MockT
tx pgx.Tx
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
require.NoError(t, err)

err = riverClient.Start(ctx)
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

tx, err := dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })

return riverClient, &testBundle{
dbPool: dbPool,
mockT: NewMockT(t),
tx: tx,
mockT: NewMockT(t),
tx: riverinternaltest.TestTx(ctx, t),
}
}

Expand All @@ -144,7 +98,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

job := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil)
Expand All @@ -157,10 +111,10 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, nil)
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil)
require.NoError(t, err)

job1 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil)
Expand All @@ -178,11 +132,9 @@ func TestRequireInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Start a second transaction with different visibility.
tx, err := bundle.dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })
otherTx := riverinternaltest.TestTx(ctx, t)

_, err = riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

// Visible in the original transaction.
Expand All @@ -191,7 +143,7 @@ func TestRequireInsertedTx(t *testing.T) {
require.Equal(t, "foo", job.Args.String)

// Not visible in the second transaction.
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, tx, &Job1Args{}, nil)
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, &Job1Args{}, nil)
require.True(t, bundle.mockT.Failed)
})

Expand All @@ -201,7 +153,7 @@ func TestRequireInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify default insertion options.
_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, &RequireInsertedOpts{
Expand All @@ -212,7 +164,7 @@ func TestRequireInsertedTx(t *testing.T) {
require.False(t, bundle.mockT.Failed)

// Verify custom insertion options.
_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand All @@ -236,7 +188,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job2Args{}, nil)
Expand All @@ -251,7 +203,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand Down Expand Up @@ -282,7 +234,7 @@ func TestRequireInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify custom insertion options.
_, err := riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand Down Expand Up @@ -407,27 +359,14 @@ func TestRequireManyInserted(t *testing.T) {
mockT *MockT
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
require.NoError(t, err)

err = riverClient.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

return riverClient, &testBundle{
dbPool: dbPool,
mockT: NewMockT(t),
Expand Down Expand Up @@ -456,40 +395,19 @@ func TestRequireManyInsertedTx(t *testing.T) {
ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
mockT *MockT
tx pgx.Tx
mockT *MockT
tx pgx.Tx
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
require.NoError(t, err)

err = riverClient.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

tx, err := dbPool.Begin(ctx)
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })

return riverClient, &testBundle{
dbPool: dbPool,
mockT: NewMockT(t),
tx: tx,
mockT: NewMockT(t),
tx: riverinternaltest.TestTx(ctx, t),
}
}

Expand All @@ -498,7 +416,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{
Expand All @@ -514,11 +432,9 @@ func TestRequireManyInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Start a second transaction with different visibility.
tx, err := bundle.dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })
otherTx := riverinternaltest.TestTx(ctx, t)

_, err = riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

// Visible in the original transaction.
Expand All @@ -529,7 +445,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
require.Equal(t, "job1", jobs[0].Kind)

// Not visible in the second transaction.
_ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, tx, []ExpectedJob{
_ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, []ExpectedJob{
{Args: &Job1Args{}},
})
require.True(t, bundle.mockT.Failed)
Expand All @@ -540,10 +456,10 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, nil)
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil)
require.NoError(t, err)

jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{
Expand All @@ -560,7 +476,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand All @@ -580,7 +496,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
{Args: Job2Args{Int: 123}},
Expand Down Expand Up @@ -610,7 +526,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify default insertion options.
_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{
Expand All @@ -626,7 +542,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
require.False(t, bundle.mockT.Failed)

// Verify custom insertion options.
_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand Down Expand Up @@ -669,7 +585,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand All @@ -689,7 +605,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job2Args{Int: 123}},
{Args: Job1Args{String: "foo"}},
})
Expand All @@ -710,7 +626,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
{Args: Job2Args{Int: 123}},
Expand Down Expand Up @@ -738,7 +654,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify custom insertion options.
_, err := riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand Down

0 comments on commit 5a6bf92

Please sign in to comment.