Skip to content

Commit

Permalink
rename Client.Cancel to Client.JobCancel
Browse files Browse the repository at this point in the history
This was added in #141 / 0aaeee8. Meanwhile another job-related query
API was added called `JobList`, but `Cancel` and `CancelTx` were not
given the `Job*` prefix.

This renames `Cancel` to `JobCancel` and `CancelTx` to `JobCancelTx` for
consistency. Other upcoming job query APIs will also use the prefix.
  • Loading branch information
bgentry committed Jan 16, 2024
1 parent 29064a6 commit 27383ed
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `Cancel` and `CancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141).
- Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117).

Expand Down
21 changes: 11 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,10 +940,10 @@ func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
}
}

// Cancel cancels the job with the given ID. If possible, the job is cancelled
// immediately and will not be retried. The provided context is used for the
// underlying Postgres update and can be used to cancel the operation or apply a
// timeout.
// JobCancel cancels the job with the given ID. If possible, the job is
// cancelled immediately and will not be retried. The provided context is used
// for the underlying Postgres update and can be used to cancel the operation or
// apply a timeout.
//
// If the job is still in the queue (available, scheduled, or retryable), it is
// immediately marked as cancelled and will not be retried.
Expand Down Expand Up @@ -976,7 +976,7 @@ func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
func (c *Client[TTx]) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancel(ctx, jobID)
if err != nil {
if errors.Is(err, riverdriver.ErrNoRows) {
Expand All @@ -988,10 +988,11 @@ func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRo
return dbsqlc.JobRowFromInternal(job), nil
}

// CancelTx cancels the job with the given ID within the specified transaction.
// This variant lets a caller cancel a job atomically alongside other database
// changes. An cancelled job doesn't take effect until the transaction commits,
// and if the transaction rolls back, so too is the cancelled job.
// JobCancelTx cancels the job with the given ID within the specified
// transaction. This variant lets a caller cancel a job atomically alongside
// other database changes. An cancelled job doesn't take effect until the
// transaction commits, and if the transaction rolls back, so too is the
// cancelled job.
//
// If possible, the job is cancelled immediately and will not be retried. The
// provided context is used for the underlying Postgres update and can be used
Expand Down Expand Up @@ -1028,7 +1029,7 @@ func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRo
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) CancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error) {
func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancelTx(ctx, c.driver.UnwrapTx(tx), jobID)
if errors.Is(err, riverdriver.ErrNoRows) {
return nil, ErrNotFound
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Test_Client(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.Cancel(ctx, jobID)
return client.JobCancel(ctx, jobID)
})
})

Expand All @@ -339,7 +339,7 @@ func Test_Client(t *testing.T) {
err error
)
txErr := pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
job, err = client.CancelTx(ctx, tx, jobID)
job, err = client.JobCancelTx(ctx, tx, jobID)
return err
})
require.NoError(t, txErr)
Expand Down Expand Up @@ -370,7 +370,7 @@ func Test_Client(t *testing.T) {
require.NoError(t, err)

// Cancel the job:
updatedJob, err := client.Cancel(ctx, insertedJob.ID)
updatedJob, err := client.JobCancel(ctx, insertedJob.ID)
require.NoError(t, err)
require.NotNil(t, updatedJob)
require.Equal(t, rivertype.JobStateCancelled, updatedJob.State)
Expand All @@ -384,13 +384,13 @@ func Test_Client(t *testing.T) {
startClient(ctx, t, client)

// Cancel an unknown job ID:
jobAfter, err := client.Cancel(ctx, 0)
jobAfter, err := client.JobCancel(ctx, 0)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)

// Cancel an unknown job ID, within a transaction:
err = pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
jobAfter, err := client.CancelTx(ctx, tx, 0)
jobAfter, err := client.JobCancelTx(ctx, tx, 0)
require.ErrorIs(t, err, ErrNotFound)
require.Nil(t, jobAfter)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func (w *SleepingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs
return ctx.Err()
}

// Example_cancelJobFromClient demonstrates how to permanently cancel a job from
// any Client using Cancel.
func Example_cancelJobFromClient() {
// Example_jobCancelFromClient demonstrates how to permanently cancel a job from
// any Client using JobCancel.
func Example_jobCancelFromClient() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
Expand Down Expand Up @@ -88,7 +88,7 @@ func Example_cancelJobFromClient() {
// cancellation signal.
time.Sleep(500 * time.Millisecond)

if _, err = riverClient.Cancel(ctx, job.ID); err != nil {
if _, err = riverClient.JobCancel(ctx, job.ID); err != nil {
panic(err)
}
waitForNJobs(subscribeChan, 1)
Expand Down

0 comments on commit 27383ed

Please sign in to comment.