Skip to content

Commit

Permalink
Specify specific cancel cause on shutdown and use it to determine err…
Browse files Browse the repository at this point in the history
…or logging (#179)

Follow up to a discussion [1] in which an additional error log was
causing example tests to fail after uses of `log` were replaced with
`slog` and no longer suppressed.

Here, use `WithCancelCause` to send a specific cancellation error on
shutdown, which can be handled specially for instances where we'd only
want to log an error under unusual circumstances.

[1] #140 (comment)
  • Loading branch information
brandur authored Jan 28, 2024
1 parent 635d56d commit 92bcd23
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 36 deletions.
14 changes: 7 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ type Client[TTx any] struct {
// fetchNewWorkCancel cancels the context used for fetching new work. This
// will be used to stop fetching new work whenever stop is initiated, or
// when the context provided to Run is itself cancelled.
fetchNewWorkCancel context.CancelFunc
fetchNewWorkCancel context.CancelCauseFunc

id string
monitor *clientMonitor
Expand All @@ -276,7 +276,7 @@ type Client[TTx any] struct {

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
workCancel context.CancelFunc
workCancel context.CancelCauseFunc
}

// Test-only signals.
Expand Down Expand Up @@ -569,9 +569,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancel(ctx)
fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancelCause(ctx)
c.fetchNewWorkCancel = fetchNewWorkCancel
workCtx, workCancel := context.WithCancel(withClient[TTx](ctx, c))
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel

// Before doing anything else, make an initial connection to the database to
Expand Down Expand Up @@ -690,7 +690,7 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started")
c.fetchNewWorkCancel()
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

Expand All @@ -715,8 +715,8 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error {
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.fetchNewWorkCancel()
c.workCancel()
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
c.workCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

Expand Down
10 changes: 3 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func Test_Client(t *testing.T) {
require.Equal(t, `relation "river_job" does not exist`, pgErr.Message)
})

t.Run("Stopped", func(t *testing.T) {
t.Run("StopAndCancel", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
Expand All @@ -441,6 +441,7 @@ func Test_Client(t *testing.T) {
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))
Expand All @@ -459,12 +460,7 @@ func Test_Client(t *testing.T) {
default:
}

stopCtx, stopCancel := context.WithTimeout(ctx, 5*time.Second)
t.Cleanup(stopCancel)

if err := client.StopAndCancel(stopCtx); err != nil {
t.Fatal(err)
}
require.NoError(t, client.StopAndCancel(ctx))

riverinternaltest.WaitOrTimeout(t, client.Stopped())
})
Expand Down
14 changes: 2 additions & 12 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/componentstatus"
"github.com/riverqueue/river/internal/rivercommon"
)

const statementTimeout = 5 * time.Second
Expand Down Expand Up @@ -135,18 +136,7 @@ func (n *Notifier) getConnAndRun(ctx context.Context) {

conn, err := n.establishConn(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
// Log at a lower verbosity level in case an error is received when the
// context is already done (probably because the client is stopping).
// Example tests can finish before the notifier connects and starts
// listening, and on client stop may produce a connection error that
// would otherwise pollute output and fail the test.
select {
case <-ctx.Done():
n.logger.Info("error establishing connection from pool", "err", err)
default:
if !errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) {
n.logger.Error("error establishing connection from pool", "err", err)
}
return
Expand Down
10 changes: 0 additions & 10 deletions internal/rivercommon/constants.go

This file was deleted.

18 changes: 18 additions & 0 deletions internal/rivercommon/river_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package rivercommon

import "errors"

// These constants are made available in rivercommon so that they're accessible
// by internal packages, but the top-level river package re-exports them, and
// all user code must use that set instead.
const (
MaxAttemptsDefault = 25
PriorityDefault = 1
QueueDefault = "default"
)

// ErrShutdown is a special error injected by the client into its fetch and work
// CancelCauseFuncs when it's stopping. It may be used by components for such
// cases like avoiding logging an error during a normal shutdown procedure. This
// is internal for the time being, but we could also consider exposing it.
var ErrShutdown = errors.New("shutdown initiated")

0 comments on commit 92bcd23

Please sign in to comment.