Skip to content

Commit

Permalink
expose client ID in config and on client
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Feb 16, 2024
1 parent 85f7586 commit 858713b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [0.0.20] - 2024-02-14

### Added

- Added an `ID` setting to the `Client` `Config` type to allow users to override client IDs with their own naming convention. Expose the client ID programatically (in case it's generated) in a new `Client.ID()` method. [PR #206](https://github.com/riverqueue/river/pull/206).

### Fixed

- Fix a leadership re-election query bug that would cause past leaders to think they were continuing to win elections. [PR #199](https://github.com/riverqueue/river/pull/199).
Expand Down
41 changes: 31 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ type Config struct {
// Defaults to 1 second.
FetchPollInterval time.Duration

// ID is the unique identifier for this client. If not set, a random ULID will
// be generated.
//
// This is used to identify the client in job attempts and for leader election.
// This value must be unique across all clients in the same database and
// schema and there must not be more than one process running with the same
// ID at the same time.
ID string

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
Expand Down Expand Up @@ -191,6 +200,9 @@ func (c *Config) validate() error {
if c.FetchPollInterval < c.FetchCooldown {
return fmt.Errorf("FetchPollInterval cannot be shorter than FetchCooldown (%s)", c.FetchCooldown)
}
if len(c.ID) > 100 {
return errors.New("ID cannot be longer than 100 characters")
}
if c.JobTimeout < -1 {
return errors.New("JobTimeout cannot be negative, except for -1 (infinite)")
}
Expand Down Expand Up @@ -259,7 +271,6 @@ type Client[TTx any] struct {
// when the context provided to Run is itself cancelled.
fetchNewWorkCancel context.CancelCauseFunc

id string
monitor *clientMonitor
notifier *notifier.Notifier
producersByQueueName map[string]*producer
Expand Down Expand Up @@ -387,6 +398,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
ErrorHandler: config.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault),
ID: config.ID,
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
PeriodicJobs: config.PeriodicJobs,
Expand All @@ -399,12 +411,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.SchedulerIntervalDefault),
}

if err := config.validate(); err != nil {
return nil, err
if config.ID == "" {
// Generate a random ULID for the client ID.
clientID, err := ulid.New(ulid.Now(), rand.Reader)
if err != nil {
return nil, err
}
config.ID = clientID.String()
}

clientID, err := ulid.New(ulid.Now(), rand.Reader)
if err != nil {
if err := config.validate(); err != nil {
return nil, err
}

Expand All @@ -418,7 +434,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
DeadlineTimeout: 5 * time.Second, // not exposed in client configuration for now, but we may want to do so
Executor: driver.GetDBPool(),
WorkerName: clientID.String(),
WorkerName: config.ID,
})

completer := jobcompleter.NewAsyncCompleter(archetype, adapter, 100)
Expand All @@ -428,7 +444,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
completer: completer,
config: config,
driver: driver,
id: clientID.String(),
monitor: newClientMonitor(),
producersByQueueName: make(map[string]*producer),
stopComplete: make(chan struct{}),
Expand All @@ -454,7 +469,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus, logger)
var err error
client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.id, 5*time.Second, logger)
client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.ID(), 5*time.Second, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -634,7 +649,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.runProducers(fetchNewWorkCtx, workCtx)
go c.signalStopComplete(workCtx)

c.baseService.Logger.InfoContext(workCtx, "River client successfully started", slog.String("client_id", c.id))
c.baseService.Logger.InfoContext(workCtx, "River client successfully started", slog.String("client_id", c.ID()))

return nil
}
Expand Down Expand Up @@ -914,7 +929,7 @@ func (c *Client[TTx]) provisionProducers() error {
QueueName: queue,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
WorkerName: c.id,
WorkerName: c.config.ID,
Workers: c.config.Workers,
}
producer, err := newProducer(&c.baseService.Archetype, c.adapter, c.completer, config)
Expand Down Expand Up @@ -1114,6 +1129,12 @@ func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, jobID int64) (*riv
return dbsqlc.JobRowFromInternal(job), nil
}

// ID returns the unique ID of this client as set in its config or
// auto-generated if not specified.
func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbadapter.JobInsertParams, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
Expand Down
29 changes: 28 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2785,7 +2785,7 @@ func Test_NewClient_ClientIDWrittenToJobAttemptedByWhenFetched(t *testing.T) {
var startedJob *Job[callbackArgs]
select {
case startedJob = <-startedCh:
require.Equal([]string{client.id}, startedJob.AttemptedBy)
require.Equal([]string{client.ID()}, startedJob.AttemptedBy)
require.NotNil(startedJob.AttemptedAt)
require.WithinDuration(time.Now().UTC(), *startedJob.AttemptedAt, 2*time.Second)
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -2956,6 +2956,14 @@ func Test_NewClient_Validations(t *testing.T) {
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
},
},
{
name: "ID cannot be longer than 100 characters",
// configFunc: func(config *Config) { config.ID = strings.Repeat("a", 101) },
configFunc: func(config *Config) {
config.ID = strings.Repeat("a", 101)
},
wantErr: errors.New("ID cannot be longer than 100 characters"),
},
{
name: "JobTimeout can be -1 (infinite)",
configFunc: func(config *Config) {
Expand Down Expand Up @@ -3313,6 +3321,25 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
})
}

func TestID(t *testing.T) {
t.Parallel()
ctx := context.Background()

t.Run("IsGeneratedWhenNotSpecifiedInConfig", func(t *testing.T) {
t.Parallel()
client := newTestClient(ctx, t, newTestConfig(t, nil))
require.NotEmpty(t, client.ID())
})

t.Run("IsGeneratedWhenNotSpecifiedInConfig", func(t *testing.T) {
t.Parallel()
config := newTestConfig(t, nil)
config.ID = "my-client-id"
client := newTestClient(ctx, t, config)
require.Equal(t, "my-client-id", client.ID())
})
}

type customInsertOptsJobArgs struct{}

func (w *customInsertOptsJobArgs) Kind() string { return "customInsertOpts" }
Expand Down

0 comments on commit 858713b

Please sign in to comment.