Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elector should use slog #137

Merged
merged 1 commit into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

client.notifier = notifier.New(archetype, driver.GetDBPool().Config().ConnConfig, client.monitor.SetNotifierStatus)
var err error
client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.id, 5*time.Second)
client.elector, err = leadership.NewElector(client.adapter, client.notifier, instanceName, client.id, 5*time.Second, logger)
if err != nil {
return nil, err
}
Expand Down
20 changes: 11 additions & 9 deletions internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"log"
"log/slog"
"sync"
"time"

Expand Down Expand Up @@ -47,6 +47,7 @@ type Elector struct {
interval time.Duration
name string
notifier *notifier.Notifier
logger *slog.Logger

mu sync.Mutex
isLeader bool
Expand All @@ -56,14 +57,15 @@ type Elector struct {
// NewElector returns an Elector using the given adapter. The name should correspond
// to the name of the database + schema combo and should be shared across all Clients
// running with that combination. The id should be unique to the Client.
func NewElector(adapter dbadapter.Adapter, notifier *notifier.Notifier, name, id string, interval time.Duration) (*Elector, error) {
func NewElector(adapter dbadapter.Adapter, notifier *notifier.Notifier, name, id string, interval time.Duration, logger *slog.Logger) (*Elector, error) {
// TODO: validate name + id length/format, interval, etc
return &Elector{
adapter: adapter,
id: id,
interval: interval,
name: name,
notifier: notifier,
logger: logger.WithGroup("elector"),
}, nil
}

Expand All @@ -82,12 +84,12 @@ func (e *Elector) Run(ctx context.Context) {
handleNotification := func(topic notifier.NotificationTopic, payload string) {
if topic != notifier.NotificationTopicLeadership {
// This should not happen unless the notifier is broken.
log.Printf("Elector received unexpected notification on topic %q: %q\n", topic, payload)
e.logger.Error("received unexpected notification", "topic", topic, "payload", payload)
return
}
notification := pgNotification{}
if err := json.Unmarshal([]byte(payload), &notification); err != nil {
log.Printf("Elector unable to unmarshal leadership notification: %v\n", err)
e.logger.Error("unable to unmarshal leadership notification", "err", err)
return
}

Expand All @@ -114,7 +116,7 @@ func (e *Elector) Run(ctx context.Context) {
return
default:
// TODO: proper backoff
log.Println("gainLeadership returned unexpectedly, waiting to try again")
e.logger.Error("gainLeadership returned unexpectedly, waiting to try again")
time.Sleep(time.Second)
continue
}
Expand All @@ -131,7 +133,7 @@ func (e *Elector) Run(ctx context.Context) {
return
default:
// TODO: backoff
log.Printf("error keeping leadership: %v\n", err)
e.logger.Error("error keeping leadership", "err", err)
continue
}
}
Expand All @@ -142,7 +144,7 @@ func (e *Elector) gainLeadership(ctx context.Context, leadershipNotificationChan
for {
success, err := e.attemptElect(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Printf("error attempting to elect: %v\n", err)
e.logger.Error("error attempting to elect", "err", err)
}
if success {
return true
Expand Down Expand Up @@ -196,7 +198,7 @@ func (e *Elector) keepLeadership(ctx context.Context, leadershipNotificationChan
if reelectionErrCount > 5 {
return err
}
log.Printf("error attempting reelection: %v\n", err)
e.logger.Error("error attempting reelection", "err", err)
continue
}
if !reelected {
Expand All @@ -211,7 +213,7 @@ func (e *Elector) keepLeadership(ctx context.Context, leadershipNotificationChan
func (e *Elector) giveUpLeadership() {
for i := 0; i < 10; i++ {
if err := e.attemptResign(i); err != nil {
log.Printf("error attempting to resign: %v\n", err)
e.logger.Error("error attempting to resign", "err", err)
// TODO: exponential backoff? wait longer than ~1s total?
time.Sleep(100 * time.Millisecond)
continue
Expand Down