Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix intermittent test problem in rivertest + more transactional use #226

Merged
merged 1 commit into from
Feb 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 41 additions & 125 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,12 @@ type Job1Args struct {

func (Job1Args) Kind() string { return "job1" }

type Job1Worker struct {
river.WorkerDefaults[Job1Args]
}

func (w *Job1Worker) Work(ctx context.Context, job *river.Job[Job1Args]) error { return nil }

type Job2Args struct {
Int int `json:"int"`
}

func (Job2Args) Kind() string { return "job2" }

type Job2Worker struct {
river.WorkerDefaults[Job2Args]
}

func (w *Job2Worker) Work(ctx context.Context, job *river.Job[Job2Args]) error { return nil }

// The tests for this function are quite minimal because it uses the same
// implementation as the `*Tx` variant, so most of the test happens below.
func TestRequireInserted(t *testing.T) {
Expand All @@ -55,26 +43,13 @@ func TestRequireInserted(t *testing.T) {
mockT *MockT
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
require.NoError(t, err)

err = riverClient.Start(ctx)
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

return riverClient, &testBundle{
dbPool: dbPool,
Expand Down Expand Up @@ -102,40 +77,19 @@ func TestRequireInsertedTx(t *testing.T) {
ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
mockT *MockT
tx pgx.Tx
mockT *MockT
tx pgx.Tx
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
require.NoError(t, err)

err = riverClient.Start(ctx)
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

tx, err := dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })

return riverClient, &testBundle{
dbPool: dbPool,
mockT: NewMockT(t),
tx: tx,
mockT: NewMockT(t),
tx: riverinternaltest.TestTx(ctx, t),
}
}

Expand All @@ -144,7 +98,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

job := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil)
Expand All @@ -157,10 +111,10 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, nil)
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil)
require.NoError(t, err)

job1 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil)
Expand All @@ -178,11 +132,9 @@ func TestRequireInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Start a second transaction with different visibility.
tx, err := bundle.dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })
otherTx := riverinternaltest.TestTx(ctx, t)

_, err = riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

// Visible in the original transaction.
Expand All @@ -191,7 +143,7 @@ func TestRequireInsertedTx(t *testing.T) {
require.Equal(t, "foo", job.Args.String)

// Not visible in the second transaction.
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, tx, &Job1Args{}, nil)
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, &Job1Args{}, nil)
require.True(t, bundle.mockT.Failed)
})

Expand All @@ -201,7 +153,7 @@ func TestRequireInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify default insertion options.
_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, &RequireInsertedOpts{
Expand All @@ -212,7 +164,7 @@ func TestRequireInsertedTx(t *testing.T) {
require.False(t, bundle.mockT.Failed)

// Verify custom insertion options.
_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand All @@ -236,7 +188,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job2Args{}, nil)
Expand All @@ -251,7 +203,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand Down Expand Up @@ -282,7 +234,7 @@ func TestRequireInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify custom insertion options.
_, err := riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand Down Expand Up @@ -407,27 +359,14 @@ func TestRequireManyInserted(t *testing.T) {
mockT *MockT
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
require.NoError(t, err)

err = riverClient.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

return riverClient, &testBundle{
dbPool: dbPool,
mockT: NewMockT(t),
Expand Down Expand Up @@ -456,40 +395,19 @@ func TestRequireManyInsertedTx(t *testing.T) {
ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
mockT *MockT
tx pgx.Tx
mockT *MockT
tx pgx.Tx
}

setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl
setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)

workers := river.NewWorkers()
river.AddWorker(workers, &Job1Worker{})
river.AddWorker(workers, &Job2Worker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
require.NoError(t, err)

err = riverClient.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) })

tx, err := dbPool.Begin(ctx)
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })

return riverClient, &testBundle{
dbPool: dbPool,
mockT: NewMockT(t),
tx: tx,
mockT: NewMockT(t),
tx: riverinternaltest.TestTx(ctx, t),
}
}

Expand All @@ -498,7 +416,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{
Expand All @@ -514,11 +432,9 @@ func TestRequireManyInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Start a second transaction with different visibility.
tx, err := bundle.dbPool.Begin(ctx)
require.NoError(t, err)
t.Cleanup(func() { tx.Rollback(ctx) })
otherTx := riverinternaltest.TestTx(ctx, t)

_, err = riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

// Visible in the original transaction.
Expand All @@ -529,7 +445,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
require.Equal(t, "job1", jobs[0].Kind)

// Not visible in the second transaction.
_ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, tx, []ExpectedJob{
_ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, []ExpectedJob{
{Args: &Job1Args{}},
})
require.True(t, bundle.mockT.Failed)
Expand All @@ -540,10 +456,10 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, nil)
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil)
require.NoError(t, err)

jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{
Expand All @@ -560,7 +476,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand All @@ -580,7 +496,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
{Args: Job2Args{Int: 123}},
Expand Down Expand Up @@ -610,7 +526,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify default insertion options.
_, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil)
_, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil)
require.NoError(t, err)

_ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{
Expand All @@ -626,7 +542,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
require.False(t, bundle.mockT.Failed)

// Verify custom insertion options.
_, err = riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand Down Expand Up @@ -669,7 +585,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand All @@ -689,7 +605,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job2Args{Int: 123}},
{Args: Job1Args{String: "foo"}},
})
Expand All @@ -710,7 +626,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
{Args: Job2Args{Int: 123}},
Expand Down Expand Up @@ -738,7 +654,7 @@ func TestRequireManyInsertedTx(t *testing.T) {
riverClient, bundle := setup(t)

// Verify custom insertion options.
_, err := riverClient.Insert(ctx, Job2Args{Int: 123}, &river.InsertOpts{
_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
MaxAttempts: 78,
Priority: 2,
Queue: "another_queue",
Expand Down
Loading