Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API engine retry #63

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 14 additions & 69 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package cron
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

Expand All @@ -21,9 +20,9 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/diagridio/go-etcd-cron/api"
internalapi "github.com/diagridio/go-etcd-cron/internal/api"
"github.com/diagridio/go-etcd-cron/internal/client"
"github.com/diagridio/go-etcd-cron/internal/engine"
"github.com/diagridio/go-etcd-cron/internal/engine/retry"
"github.com/diagridio/go-etcd-cron/internal/key"
"github.com/diagridio/go-etcd-cron/internal/leadership"
"github.com/diagridio/go-etcd-cron/internal/leadership/elector"
Expand Down Expand Up @@ -82,14 +81,11 @@ type cron struct {
gcgInterval *time.Duration
replicaData *anypb.Any

engine atomic.Pointer[engine.Engine]
api *retry.Retry
elected atomic.Bool
wleaderCh chan<- []*anypb.Any

lock sync.RWMutex
running atomic.Bool
readyCh chan struct{}
closeCh chan struct{}
}

// New creates a new cron instance.
Expand Down Expand Up @@ -134,8 +130,7 @@ func New(opts Options) (api.Interface, error) {
triggerFn: opts.TriggerFn,
gcgInterval: opts.CounterGarbageCollectionInterval,
wleaderCh: opts.WatchLeadership,
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
api: retry.New(),
}, nil
}

Expand All @@ -145,7 +140,7 @@ func (c *cron) Run(ctx context.Context) error {
return errors.New("cron already running")
}

defer close(c.closeCh)
defer c.api.Close()
defer c.log.Info("cron instance shutdown")

leadership := leadership.New(leadership.Options{
Expand All @@ -167,9 +162,9 @@ func (c *cron) Run(ctx context.Context) error {
return err
}

c.log.Info("engine restarting due to leadership rebalance")

for {
c.log.Info("engine restarting due to leadership rebalance")

ectx, elected, err := leadership.Reelect(ctx)
if ctx.Err() != nil {
c.log.Error(err, "cron instance shutting down during leadership re-election")
Expand All @@ -192,62 +187,32 @@ func (c *cron) Run(ctx context.Context) error {

// Add forwards the call to the embedded API.
func (c *cron) Add(ctx context.Context, name string, job *api.Job) error {
api, err := c.waitAPIReady(ctx)
if err != nil {
return err
}

return api.Add(ctx, name, job)
return c.api.Add(ctx, name, job)
}

// Get forwards the call to the embedded API.
func (c *cron) Get(ctx context.Context, name string) (*api.Job, error) {
api, err := c.waitAPIReady(ctx)
if err != nil {
return nil, err
}

return api.Get(ctx, name)
return c.api.Get(ctx, name)
}

// Delete forwards the call to the embedded API.
func (c *cron) Delete(ctx context.Context, name string) error {
api, err := c.waitAPIReady(ctx)
if err != nil {
return err
}

return api.Delete(ctx, name)
return c.api.Delete(ctx, name)
}

// DeletePrefixes forwards the call to the embedded API.
func (c *cron) DeletePrefixes(ctx context.Context, prefixes ...string) error {
api, err := c.waitAPIReady(ctx)
if err != nil {
return err
}

return api.DeletePrefixes(ctx, prefixes...)
return c.api.DeletePrefixes(ctx, prefixes...)
}

// List forwards the call to the embedded API.
func (c *cron) List(ctx context.Context, prefix string) (*api.ListResponse, error) {
api, err := c.waitAPIReady(ctx)
if err != nil {
return nil, err
}

return api.List(ctx, prefix)
return c.api.List(ctx, prefix)
}

// DeliverablePrefixes forwards the call to the embedded API.
func (c *cron) DeliverablePrefixes(ctx context.Context, prefixes ...string) (context.CancelFunc, error) {
api, err := c.waitAPIReady(ctx)
if err != nil {
return nil, err
}

return api.DeliverablePrefixes(ctx, prefixes...)
return c.api.DeliverablePrefixes(ctx, prefixes...)
}

// IsElected returns true if cron is currently elected for leadership of its
Expand All @@ -256,21 +221,6 @@ func (c *cron) IsElected() bool {
return c.elected.Load()
}

func (c *cron) waitAPIReady(ctx context.Context) (internalapi.Interface, error) {
c.lock.Lock()
readyCh := c.readyCh
c.lock.Unlock()

select {
case <-readyCh:
return c.engine.Load().API(), nil
case <-c.closeCh:
return nil, errors.New("cron is closed")
case <-ctx.Done():
return nil, ctx.Err()
}
}

// runEngine runs the cron engine with the given elected leadership.
func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
c.elected.Store(true)
Expand All @@ -288,10 +238,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
return err
}

c.engine.Store(engine)
c.lock.Lock()
close(c.readyCh)
c.lock.Unlock()
c.api.Ready(engine)

if c.wleaderCh != nil {
select {
Expand All @@ -304,9 +251,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
err = engine.Run(ctx)
c.elected.Store(false)

c.lock.Lock()
c.readyCh = make(chan struct{})
c.lock.Unlock()
c.api.NotReady()

if err != nil || ctx.Err() != nil {
return err
Expand Down
35 changes: 11 additions & 24 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"testing"
"time"

"github.com/dapr/kit/ptr"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -54,11 +54,9 @@ func Test_Run(t *testing.T) {
errCh1 <- cronI.Run(ctx)
}()

select {
case <-cron.readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, cron.IsElected())
}, time.Second*5, 100*time.Millisecond)

go func() {
errCh2 <- cronI.Run(ctx)
Expand All @@ -80,7 +78,7 @@ func Test_Run(t *testing.T) {
}
})

t.Run("cron engine remains ready after leadership change that keeps partition total the same", func(t *testing.T) {
t.Run("cron instance fatal errors when leadership is overwritten", func(t *testing.T) {
t.Parallel()

replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data")))
Expand Down Expand Up @@ -109,11 +107,9 @@ func Test_Run(t *testing.T) {
}()

// wait until ready
select {
case <-cron.readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, cron.IsElected())
}, time.Second*5, 100*time.Millisecond)

leadershipData, err := proto.Marshal(&stored.Leadership{
Total: 10,
Expand All @@ -124,18 +120,9 @@ func Test_Run(t *testing.T) {
_, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData))
require.NoError(t, err, "failed to insert leadership data into etcd")

// wait until ready
select {
case <-cron.readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}

// confirm cron is ready
err = cronI.Add(context.Background(), "a123", &api.Job{
DueTime: ptr.Of("10s"),
})
require.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.False(c, cron.IsElected())
}, time.Second*5, 100*time.Millisecond)

cancel()
select {
Expand Down
6 changes: 5 additions & 1 deletion internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/diagridio/go-etcd-cron/internal/scheduler"
)

var (
ErrClosed = errors.New("api is closed")
)

type Options struct {
Log logr.Logger
Client client.Interface
Expand Down Expand Up @@ -277,7 +281,7 @@ func (a *api) DeliverablePrefixes(ctx context.Context, prefixes ...string) (cont
func (a *api) waitReady(ctx context.Context) error {
select {
case <-a.closeCh:
return errors.New("api is closed")
return ErrClosed
case <-a.readyCh:
return nil
case <-ctx.Done():
Expand Down
15 changes: 10 additions & 5 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ type Options struct {
CounterGarbageCollectionInterval *time.Duration
}

type Engine struct {
type Interface interface {
Run(ctx context.Context) error
API() internalapi.Interface
}

type engine struct {
log logr.Logger
collector garbage.Interface
queue *queue.Queue
Expand All @@ -68,7 +73,7 @@ type Engine struct {
wg sync.WaitGroup
}

func New(opts Options) (*Engine, error) {
func New(opts Options) (Interface, error) {
collector, err := garbage.New(garbage.Options{
Log: opts.Log,
Client: opts.Client,
Expand Down Expand Up @@ -108,7 +113,7 @@ func New(opts Options) (*Engine, error) {
Log: opts.Log,
})

return &Engine{
return &engine{
log: opts.Log.WithName("engine"),
collector: collector,
queue: queue,
Expand All @@ -118,7 +123,7 @@ func New(opts Options) (*Engine, error) {
}, nil
}

func (e *Engine) Run(ctx context.Context) error {
func (e *engine) Run(ctx context.Context) error {
if !e.running.CompareAndSwap(false, true) {
return errors.New("engine is already running")
}
Expand Down Expand Up @@ -152,6 +157,6 @@ func (e *Engine) Run(ctx context.Context) error {
).Run(ctx)
}

func (e *Engine) API() internalapi.Interface {
func (e *engine) API() internalapi.Interface {
return e.api
}
47 changes: 47 additions & 0 deletions internal/engine/fake/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright (c) 2024 Diagrid Inc.
Licensed under the MIT License.
*/

package fake

import (
"context"

"github.com/diagridio/go-etcd-cron/internal/api"
)

type Fake struct {
runFn func(context.Context) error
apiFn func() api.Interface
}

func New() *Fake {
return &Fake{
runFn: func(context.Context) error {
return nil
},
apiFn: func() api.Interface {
return nil
},
}
}

func (f *Fake) WithRun(fn func(context.Context) error) *Fake {
f.runFn = fn
return f
}

func (f *Fake) WithAPI(a api.Interface) *Fake {
f.apiFn = func() api.Interface { return a }

return f
}

func (f *Fake) Run(ctx context.Context) error {
return f.runFn(ctx)
}

func (f *Fake) API() api.Interface {
return f.apiFn()
}
16 changes: 16 additions & 0 deletions internal/engine/fake/fake_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
Copyright (c) 2025 Diagrid Inc.
Licensed under the MIT License.
*/

package fake

import (
"testing"

"github.com/diagridio/go-etcd-cron/internal/engine"
)

func Test_Fake(t *testing.T) {
var _ engine.Interface = New()
}
Loading
Loading