Skip to content

Commit

Permalink
API engine retry
Browse files Browse the repository at this point in the history
Adds a engine API retry which will continue to retry an API call if the
API currently reports that the API is closed. This is useful for when
leadership changes are in progress (typical during boot where leaders
are actively being shuffled). The retry will continue to retry an active
API call every 300ms when the API reports as closed (because leadership
has caused the API to be closed). The retry will continue to cancel
calls if the caller has cancelled the context, or if the cron is
shutting down.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Jan 14, 2025
1 parent 0df5048 commit 6c938a7
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 131 deletions.
83 changes: 14 additions & 69 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package cron
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

Expand All @@ -21,9 +20,9 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/diagridio/go-etcd-cron/api"
internalapi "github.com/diagridio/go-etcd-cron/internal/api"
"github.com/diagridio/go-etcd-cron/internal/client"
"github.com/diagridio/go-etcd-cron/internal/engine"
"github.com/diagridio/go-etcd-cron/internal/engine/retry"
"github.com/diagridio/go-etcd-cron/internal/key"
"github.com/diagridio/go-etcd-cron/internal/leadership"
"github.com/diagridio/go-etcd-cron/internal/leadership/elector"
Expand Down Expand Up @@ -82,14 +81,11 @@ type cron struct {
gcgInterval *time.Duration
replicaData *anypb.Any

engine atomic.Pointer[engine.Engine]
api *retry.Retry
elected atomic.Bool
wleaderCh chan<- []*anypb.Any

lock sync.RWMutex
running atomic.Bool
readyCh chan struct{}
closeCh chan struct{}
}

// New creates a new cron instance.
Expand Down Expand Up @@ -134,8 +130,7 @@ func New(opts Options) (api.Interface, error) {
triggerFn: opts.TriggerFn,
gcgInterval: opts.CounterGarbageCollectionInterval,
wleaderCh: opts.WatchLeadership,
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
api: retry.New(),
}, nil
}

Expand All @@ -145,7 +140,7 @@ func (c *cron) Run(ctx context.Context) error {
return errors.New("cron already running")
}

defer close(c.closeCh)
defer c.api.Close()
defer c.log.Info("cron instance shutdown")

leadership := leadership.New(leadership.Options{
Expand All @@ -167,9 +162,9 @@ func (c *cron) Run(ctx context.Context) error {
return err
}

c.log.Info("engine restarting due to leadership rebalance")

for {
c.log.Info("engine restarting due to leadership rebalance")

ectx, elected, err := leadership.Reelect(ctx)
if ctx.Err() != nil {
c.log.Error(err, "cron instance shutting down during leadership re-election")
Expand All @@ -192,62 +187,32 @@ func (c *cron) Run(ctx context.Context) error {

// Add forwards the call to the embedded API.
func (c *cron) Add(ctx context.Context, name string, job *api.Job) error {
api, err := c.waitAPIReady(ctx)
if err != nil {
return err
}

return api.Add(ctx, name, job)
return c.api.Add(ctx, name, job)
}

// Get forwards the call to the embedded API.
func (c *cron) Get(ctx context.Context, name string) (*api.Job, error) {
api, err := c.waitAPIReady(ctx)
if err != nil {
return nil, err
}

return api.Get(ctx, name)
return c.api.Get(ctx, name)
}

// Delete forwards the call to the embedded API.
func (c *cron) Delete(ctx context.Context, name string) error {
api, err := c.waitAPIReady(ctx)
if err != nil {
return err
}

return api.Delete(ctx, name)
return c.api.Delete(ctx, name)
}

// DeletePrefixes forwards the call to the embedded API.
func (c *cron) DeletePrefixes(ctx context.Context, prefixes ...string) error {
api, err := c.waitAPIReady(ctx)
if err != nil {
return err
}

return api.DeletePrefixes(ctx, prefixes...)
return c.api.DeletePrefixes(ctx, prefixes...)
}

// List forwards the call to the embedded API.
func (c *cron) List(ctx context.Context, prefix string) (*api.ListResponse, error) {
api, err := c.waitAPIReady(ctx)
if err != nil {
return nil, err
}

return api.List(ctx, prefix)
return c.api.List(ctx, prefix)
}

// DeliverablePrefixes forwards the call to the embedded API.
func (c *cron) DeliverablePrefixes(ctx context.Context, prefixes ...string) (context.CancelFunc, error) {
api, err := c.waitAPIReady(ctx)
if err != nil {
return nil, err
}

return api.DeliverablePrefixes(ctx, prefixes...)
return c.api.DeliverablePrefixes(ctx, prefixes...)
}

// IsElected returns true if cron is currently elected for leadership of its
Expand All @@ -256,21 +221,6 @@ func (c *cron) IsElected() bool {
return c.elected.Load()
}

func (c *cron) waitAPIReady(ctx context.Context) (internalapi.Interface, error) {
c.lock.Lock()
readyCh := c.readyCh
c.lock.Unlock()

select {
case <-readyCh:
return c.engine.Load().API(), nil
case <-c.closeCh:
return nil, errors.New("cron is closed")
case <-ctx.Done():
return nil, ctx.Err()
}
}

// runEngine runs the cron engine with the given elected leadership.
func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
c.elected.Store(true)
Expand All @@ -288,10 +238,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
return err
}

c.engine.Store(engine)
c.lock.Lock()
close(c.readyCh)
c.lock.Unlock()
c.api.Ready(engine)

if c.wleaderCh != nil {
select {
Expand All @@ -304,9 +251,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
err = engine.Run(ctx)
c.elected.Store(false)

c.lock.Lock()
c.readyCh = make(chan struct{})
c.lock.Unlock()
c.api.NotReady()

if err != nil || ctx.Err() != nil {
return err
Expand Down
35 changes: 11 additions & 24 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"testing"
"time"

"github.com/dapr/kit/ptr"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -54,11 +54,9 @@ func Test_Run(t *testing.T) {
errCh1 <- cronI.Run(ctx)
}()

select {
case <-cron.readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, cron.IsElected())
}, time.Second*5, 100*time.Millisecond)

go func() {
errCh2 <- cronI.Run(ctx)
Expand All @@ -80,7 +78,7 @@ func Test_Run(t *testing.T) {
}
})

t.Run("cron engine remains ready after leadership change that keeps partition total the same", func(t *testing.T) {
t.Run("cron instance fatal errors when leadership is overwritten", func(t *testing.T) {
t.Parallel()

replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data")))
Expand Down Expand Up @@ -109,11 +107,9 @@ func Test_Run(t *testing.T) {
}()

// wait until ready
select {
case <-cron.readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, cron.IsElected())
}, time.Second*5, 100*time.Millisecond)

leadershipData, err := proto.Marshal(&stored.Leadership{
Total: 10,
Expand All @@ -124,18 +120,9 @@ func Test_Run(t *testing.T) {
_, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData))
require.NoError(t, err, "failed to insert leadership data into etcd")

// wait until ready
select {
case <-cron.readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}

// confirm cron is ready
err = cronI.Add(context.Background(), "a123", &api.Job{
DueTime: ptr.Of("10s"),
})
require.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.False(c, cron.IsElected())
}, time.Second*5, 100*time.Millisecond)

cancel()
select {
Expand Down
6 changes: 5 additions & 1 deletion internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/diagridio/go-etcd-cron/internal/scheduler"
)

var (
ErrClosed = errors.New("api is closed")
)

type Options struct {
Log logr.Logger
Client client.Interface
Expand Down Expand Up @@ -277,7 +281,7 @@ func (a *api) DeliverablePrefixes(ctx context.Context, prefixes ...string) (cont
func (a *api) waitReady(ctx context.Context) error {
select {
case <-a.closeCh:
return errors.New("api is closed")
return ErrClosed
case <-a.readyCh:
return nil
case <-ctx.Done():
Expand Down
15 changes: 10 additions & 5 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ type Options struct {
CounterGarbageCollectionInterval *time.Duration
}

type Engine struct {
type Interface interface {
Run(ctx context.Context) error
API() internalapi.Interface
}

type engine struct {
log logr.Logger
collector garbage.Interface
queue *queue.Queue
Expand All @@ -68,7 +73,7 @@ type Engine struct {
wg sync.WaitGroup
}

func New(opts Options) (*Engine, error) {
func New(opts Options) (Interface, error) {
collector, err := garbage.New(garbage.Options{
Log: opts.Log,
Client: opts.Client,
Expand Down Expand Up @@ -108,7 +113,7 @@ func New(opts Options) (*Engine, error) {
Log: opts.Log,
})

return &Engine{
return &engine{
log: opts.Log.WithName("engine"),
collector: collector,
queue: queue,
Expand All @@ -118,7 +123,7 @@ func New(opts Options) (*Engine, error) {
}, nil
}

func (e *Engine) Run(ctx context.Context) error {
func (e *engine) Run(ctx context.Context) error {
if !e.running.CompareAndSwap(false, true) {
return errors.New("engine is already running")
}
Expand Down Expand Up @@ -152,6 +157,6 @@ func (e *Engine) Run(ctx context.Context) error {
).Run(ctx)
}

func (e *Engine) API() internalapi.Interface {
func (e *engine) API() internalapi.Interface {
return e.api
}
47 changes: 47 additions & 0 deletions internal/engine/fake/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright (c) 2024 Diagrid Inc.
Licensed under the MIT License.
*/

package fake

import (
"context"

"github.com/diagridio/go-etcd-cron/internal/api"
)

type Fake struct {
runFn func(context.Context) error
apiFn func() api.Interface
}

func New() *Fake {
return &Fake{
runFn: func(context.Context) error {
return nil
},
apiFn: func() api.Interface {
return nil
},
}
}

func (f *Fake) WithRun(fn func(context.Context) error) *Fake {
f.runFn = fn
return f
}

func (f *Fake) WithAPI(a api.Interface) *Fake {
f.apiFn = func() api.Interface { return a }

return f
}

func (f *Fake) Run(ctx context.Context) error {
return f.runFn(ctx)
}

func (f *Fake) API() api.Interface {
return f.apiFn()
}
16 changes: 16 additions & 0 deletions internal/engine/fake/fake_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
Copyright (c) 2025 Diagrid Inc.
Licensed under the MIT License.
*/

package fake

import (
"testing"

"github.com/diagridio/go-etcd-cron/internal/engine"
)

func Test_Fake(t *testing.T) {
var _ engine.Interface = New()
}
Loading

0 comments on commit 6c938a7

Please sign in to comment.