Skip to content

Commit

Permalink
Move JobCompleteTx to its own .go file (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
brandur authored Feb 25, 2024
1 parent 4a63e9a commit 1cf1a49
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 49 deletions.
59 changes: 10 additions & 49 deletions job.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package river

import (
"context"
"encoding/json"
"errors"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
)

const (
JobStateAvailable = rivertype.JobStateAvailable
JobStateCancelled = rivertype.JobStateCancelled
JobStateCompleted = rivertype.JobStateCompleted
JobStateDiscarded = rivertype.JobStateDiscarded
JobStateRetryable = rivertype.JobStateRetryable
JobStateRunning = rivertype.JobStateRunning
JobStateScheduled = rivertype.JobStateScheduled
)

// Job represents a single unit of work, holding both the arguments and
// information for a job with args of type T.
type Job[T JobArgs] struct {
Expand All @@ -34,46 +38,3 @@ type JobArgsWithInsertOpts interface {
// system defaults. These can also be overridden at insertion time.
InsertOpts() InsertOpts
}

// JobCompleteTx marks the job as completed as part of transaction tx. If tx is
// rolled back, the completion will be as well.
//
// The function needs to know the type of the River database driver, which is
// the same as the one in use by Client, but the other generic parameters can be
// inferred. An invocation should generally look like:
//
// _, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
// if err != nil {
// // handle error
// }
//
// Returns the updated, completed job.
func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error) {
if job.State != JobStateRunning {
return nil, errors.New("job must be running")
}

var driver TDriver
jobRow, err := driver.UnwrapExecutor(tx).JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, time.Now()))
if err != nil {
return nil, err
}

updatedJob := &Job[TArgs]{JobRow: jobRow}

if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil {
return nil, err
}

return updatedJob, nil
}

const (
JobStateAvailable = rivertype.JobStateAvailable
JobStateCancelled = rivertype.JobStateCancelled
JobStateCompleted = rivertype.JobStateCompleted
JobStateDiscarded = rivertype.JobStateDiscarded
JobStateRetryable = rivertype.JobStateRetryable
JobStateRunning = rivertype.JobStateRunning
JobStateScheduled = rivertype.JobStateScheduled
)
43 changes: 43 additions & 0 deletions job_complete_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package river

import (
"context"
"encoding/json"
"errors"
"time"

"github.com/riverqueue/river/riverdriver"
)

// JobCompleteTx marks the job as completed as part of transaction tx. If tx is
// rolled back, the completion will be as well.
//
// The function needs to know the type of the River database driver, which is
// the same as the one in use by Client, but the other generic parameters can be
// inferred. An invocation should generally look like:
//
// _, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
// if err != nil {
// // handle error
// }
//
// Returns the updated, completed job.
func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error) {
if job.State != JobStateRunning {
return nil, errors.New("job must be running")
}

var driver TDriver
jobRow, err := driver.UnwrapExecutor(tx).JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, time.Now()))
if err != nil {
return nil, err
}

updatedJob := &Job[TArgs]{JobRow: jobRow}

if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil {
return nil, err
}

return updatedJob, nil
}
72 changes: 72 additions & 0 deletions job_complete_tx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package river

import (
"context"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/riverinternaltest/testfactory"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func TestJobCompleteTx(t *testing.T) {
t.Parallel()

ctx := context.Background()

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

type testBundle struct {
exec riverdriver.Executor
tx pgx.Tx
}

setup := func(t *testing.T) *testBundle {
t.Helper()

tx := riverinternaltest.TestTx(ctx, t)

return &testBundle{
exec: riverpgxv5.New(nil).UnwrapExecutor(tx),
tx: tx,
}
}

t.Run("CompletesJob", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
State: ptrutil.Ptr(JobStateRunning),
})

completedJob, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.NoError(t, err)
require.Equal(t, JobStateCompleted, completedJob.State)
require.WithinDuration(t, time.Now(), *completedJob.FinalizedAt, 2*time.Second)

updatedJob, err := bundle.exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, JobStateCompleted, updatedJob.State)
})

t.Run("ErrorIfNotRunning", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

_, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.EqualError(t, err, "job must be running")
})
}

0 comments on commit 1cf1a49

Please sign in to comment.