diff --git a/cron/cron.go b/cron/cron.go index f104114..82fa6e6 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -8,7 +8,6 @@ package cron import ( "context" "errors" - "sync" "sync/atomic" "time" @@ -21,9 +20,9 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/diagridio/go-etcd-cron/api" - internalapi "github.com/diagridio/go-etcd-cron/internal/api" "github.com/diagridio/go-etcd-cron/internal/client" "github.com/diagridio/go-etcd-cron/internal/engine" + "github.com/diagridio/go-etcd-cron/internal/engine/retry" "github.com/diagridio/go-etcd-cron/internal/key" "github.com/diagridio/go-etcd-cron/internal/leadership" "github.com/diagridio/go-etcd-cron/internal/leadership/elector" @@ -82,14 +81,11 @@ type cron struct { gcgInterval *time.Duration replicaData *anypb.Any - engine atomic.Pointer[engine.Engine] + api *retry.Retry elected atomic.Bool wleaderCh chan<- []*anypb.Any - lock sync.RWMutex running atomic.Bool - readyCh chan struct{} - closeCh chan struct{} } // New creates a new cron instance. @@ -134,8 +130,7 @@ func New(opts Options) (api.Interface, error) { triggerFn: opts.TriggerFn, gcgInterval: opts.CounterGarbageCollectionInterval, wleaderCh: opts.WatchLeadership, - readyCh: make(chan struct{}), - closeCh: make(chan struct{}), + api: retry.New(), }, nil } @@ -145,7 +140,7 @@ func (c *cron) Run(ctx context.Context) error { return errors.New("cron already running") } - defer close(c.closeCh) + defer c.api.Close() defer c.log.Info("cron instance shutdown") leadership := leadership.New(leadership.Options{ @@ -167,9 +162,9 @@ func (c *cron) Run(ctx context.Context) error { return err } - c.log.Info("engine restarting due to leadership rebalance") - for { + c.log.Info("engine restarting due to leadership rebalance") + ectx, elected, err := leadership.Reelect(ctx) if ctx.Err() != nil { c.log.Error(err, "cron instance shutting down during leadership re-election") @@ -192,62 +187,32 @@ func (c *cron) Run(ctx context.Context) error { // Add forwards the call to the embedded API. func (c *cron) Add(ctx context.Context, name string, job *api.Job) error { - api, err := c.waitAPIReady(ctx) - if err != nil { - return err - } - - return api.Add(ctx, name, job) + return c.api.Add(ctx, name, job) } // Get forwards the call to the embedded API. func (c *cron) Get(ctx context.Context, name string) (*api.Job, error) { - api, err := c.waitAPIReady(ctx) - if err != nil { - return nil, err - } - - return api.Get(ctx, name) + return c.api.Get(ctx, name) } // Delete forwards the call to the embedded API. func (c *cron) Delete(ctx context.Context, name string) error { - api, err := c.waitAPIReady(ctx) - if err != nil { - return err - } - - return api.Delete(ctx, name) + return c.api.Delete(ctx, name) } // DeletePrefixes forwards the call to the embedded API. func (c *cron) DeletePrefixes(ctx context.Context, prefixes ...string) error { - api, err := c.waitAPIReady(ctx) - if err != nil { - return err - } - - return api.DeletePrefixes(ctx, prefixes...) + return c.api.DeletePrefixes(ctx, prefixes...) } // List forwards the call to the embedded API. func (c *cron) List(ctx context.Context, prefix string) (*api.ListResponse, error) { - api, err := c.waitAPIReady(ctx) - if err != nil { - return nil, err - } - - return api.List(ctx, prefix) + return c.api.List(ctx, prefix) } // DeliverablePrefixes forwards the call to the embedded API. func (c *cron) DeliverablePrefixes(ctx context.Context, prefixes ...string) (context.CancelFunc, error) { - api, err := c.waitAPIReady(ctx) - if err != nil { - return nil, err - } - - return api.DeliverablePrefixes(ctx, prefixes...) + return c.api.DeliverablePrefixes(ctx, prefixes...) } // IsElected returns true if cron is currently elected for leadership of its @@ -256,21 +221,6 @@ func (c *cron) IsElected() bool { return c.elected.Load() } -func (c *cron) waitAPIReady(ctx context.Context) (internalapi.Interface, error) { - c.lock.Lock() - readyCh := c.readyCh - c.lock.Unlock() - - select { - case <-readyCh: - return c.engine.Load().API(), nil - case <-c.closeCh: - return nil, errors.New("cron is closed") - case <-ctx.Done(): - return nil, ctx.Err() - } -} - // runEngine runs the cron engine with the given elected leadership. func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error { c.elected.Store(true) @@ -288,10 +238,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error { return err } - c.engine.Store(engine) - c.lock.Lock() - close(c.readyCh) - c.lock.Unlock() + c.api.Ready(engine) if c.wleaderCh != nil { select { @@ -304,9 +251,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error { err = engine.Run(ctx) c.elected.Store(false) - c.lock.Lock() - c.readyCh = make(chan struct{}) - c.lock.Unlock() + c.api.NotReady() if err != nil || ctx.Err() != nil { return err diff --git a/cron/cron_test.go b/cron/cron_test.go index 4948487..36ded08 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -11,8 +11,8 @@ import ( "testing" "time" - "github.com/dapr/kit/ptr" "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -54,11 +54,9 @@ func Test_Run(t *testing.T) { errCh1 <- cronI.Run(ctx) }() - select { - case <-cron.readyCh: - case <-time.After(1 * time.Second): - t.Fatal("timed out waiting for cron to be ready") - } + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.True(c, cron.IsElected()) + }, time.Second*5, 100*time.Millisecond) go func() { errCh2 <- cronI.Run(ctx) @@ -80,7 +78,7 @@ func Test_Run(t *testing.T) { } }) - t.Run("cron engine remains ready after leadership change that keeps partition total the same", func(t *testing.T) { + t.Run("cron instance fatal errors when leadership is overwritten", func(t *testing.T) { t.Parallel() replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data"))) @@ -109,11 +107,9 @@ func Test_Run(t *testing.T) { }() // wait until ready - select { - case <-cron.readyCh: - case <-time.After(1 * time.Second): - t.Fatal("timed out waiting for cron to be ready") - } + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.True(c, cron.IsElected()) + }, time.Second*5, 100*time.Millisecond) leadershipData, err := proto.Marshal(&stored.Leadership{ Total: 10, @@ -124,18 +120,9 @@ func Test_Run(t *testing.T) { _, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData)) require.NoError(t, err, "failed to insert leadership data into etcd") - // wait until ready - select { - case <-cron.readyCh: - case <-time.After(1 * time.Second): - t.Fatal("timed out waiting for cron to be ready") - } - - // confirm cron is ready - err = cronI.Add(context.Background(), "a123", &api.Job{ - DueTime: ptr.Of("10s"), - }) - require.NoError(t, err) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.False(c, cron.IsElected()) + }, time.Second*5, 100*time.Millisecond) cancel() select { diff --git a/internal/api/api.go b/internal/api/api.go index 74f2998..689f251 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -26,6 +26,10 @@ import ( "github.com/diagridio/go-etcd-cron/internal/scheduler" ) +var ( + ErrClosed = errors.New("api is closed") +) + type Options struct { Log logr.Logger Client client.Interface @@ -277,7 +281,7 @@ func (a *api) DeliverablePrefixes(ctx context.Context, prefixes ...string) (cont func (a *api) waitReady(ctx context.Context) error { select { case <-a.closeCh: - return errors.New("api is closed") + return ErrClosed case <-a.readyCh: return nil case <-ctx.Done(): diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 57ddaf4..3c2cae3 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -58,7 +58,12 @@ type Options struct { CounterGarbageCollectionInterval *time.Duration } -type Engine struct { +type Interface interface { + Run(ctx context.Context) error + API() internalapi.Interface +} + +type engine struct { log logr.Logger collector garbage.Interface queue *queue.Queue @@ -68,7 +73,7 @@ type Engine struct { wg sync.WaitGroup } -func New(opts Options) (*Engine, error) { +func New(opts Options) (Interface, error) { collector, err := garbage.New(garbage.Options{ Log: opts.Log, Client: opts.Client, @@ -108,7 +113,7 @@ func New(opts Options) (*Engine, error) { Log: opts.Log, }) - return &Engine{ + return &engine{ log: opts.Log.WithName("engine"), collector: collector, queue: queue, @@ -118,7 +123,7 @@ func New(opts Options) (*Engine, error) { }, nil } -func (e *Engine) Run(ctx context.Context) error { +func (e *engine) Run(ctx context.Context) error { if !e.running.CompareAndSwap(false, true) { return errors.New("engine is already running") } @@ -152,6 +157,6 @@ func (e *Engine) Run(ctx context.Context) error { ).Run(ctx) } -func (e *Engine) API() internalapi.Interface { +func (e *engine) API() internalapi.Interface { return e.api } diff --git a/internal/engine/fake/fake.go b/internal/engine/fake/fake.go new file mode 100644 index 0000000..8cb0bbd --- /dev/null +++ b/internal/engine/fake/fake.go @@ -0,0 +1,47 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package fake + +import ( + "context" + + "github.com/diagridio/go-etcd-cron/internal/api" +) + +type Fake struct { + runFn func(context.Context) error + apiFn func() api.Interface +} + +func New() *Fake { + return &Fake{ + runFn: func(context.Context) error { + return nil + }, + apiFn: func() api.Interface { + return nil + }, + } +} + +func (f *Fake) WithRun(fn func(context.Context) error) *Fake { + f.runFn = fn + return f +} + +func (f *Fake) WithAPI(a api.Interface) *Fake { + f.apiFn = func() api.Interface { return a } + + return f +} + +func (f *Fake) Run(ctx context.Context) error { + return f.runFn(ctx) +} + +func (f *Fake) API() api.Interface { + return f.apiFn() +} diff --git a/internal/engine/fake/fake_test.go b/internal/engine/fake/fake_test.go new file mode 100644 index 0000000..75968cb --- /dev/null +++ b/internal/engine/fake/fake_test.go @@ -0,0 +1,16 @@ +/* +Copyright (c) 2025 Diagrid Inc. +Licensed under the MIT License. +*/ + +package fake + +import ( + "testing" + + "github.com/diagridio/go-etcd-cron/internal/engine" +) + +func Test_Fake(t *testing.T) { + var _ engine.Interface = New() +} diff --git a/internal/engine/retry/retry.go b/internal/engine/retry/retry.go new file mode 100644 index 0000000..517b411 --- /dev/null +++ b/internal/engine/retry/retry.go @@ -0,0 +1,145 @@ +/* +Copyright (c) 2025 Diagrid Inc. +Licensed under the MIT License. +*/ + +package retry + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/diagridio/go-etcd-cron/api" + internalapi "github.com/diagridio/go-etcd-cron/internal/api" + "github.com/diagridio/go-etcd-cron/internal/engine" +) + +var errClosed = errors.New("cron is closed") + +// Retry is a engine wrapper for executing the cron API, which will retry calls +// when the API is "closing". This ensures that caller API calls will be held +// and eventually executed during leadership reshuffles. +type Retry struct { + engine atomic.Pointer[engine.Interface] + + lock sync.Mutex + readyCh chan struct{} + closeCh chan struct{} +} + +func New() *Retry { + return &Retry{ + readyCh: make(chan struct{}), + closeCh: make(chan struct{}), + } +} + +func (r *Retry) Add(ctx context.Context, name string, job *api.Job) error { + return r.handle(ctx, func(a internalapi.Interface) error { + return a.Add(ctx, name, job) + }) +} + +func (r *Retry) Get(ctx context.Context, name string) (*api.Job, error) { + var job *api.Job + var err error + err = r.handle(ctx, func(a internalapi.Interface) error { + job, err = a.Get(ctx, name) + return err + }) + return job, err +} + +func (r *Retry) Delete(ctx context.Context, name string) error { + return r.handle(ctx, func(a internalapi.Interface) error { + return a.Delete(ctx, name) + }) +} + +func (r *Retry) DeletePrefixes(ctx context.Context, prefixes ...string) error { + return r.handle(ctx, func(a internalapi.Interface) error { + return a.DeletePrefixes(ctx, prefixes...) + }) +} + +func (r *Retry) List(ctx context.Context, prefix string) (*api.ListResponse, error) { + var resp *api.ListResponse + var err error + err = r.handle(ctx, func(a internalapi.Interface) error { + resp, err = a.List(ctx, prefix) + return err + }) + return resp, err +} + +func (r *Retry) DeliverablePrefixes(ctx context.Context, prefixes ...string) (context.CancelFunc, error) { + var cancel context.CancelFunc + var err error + err = r.handle(ctx, func(a internalapi.Interface) error { + cancel, err = a.DeliverablePrefixes(ctx, prefixes...) + return err + }) + return cancel, err +} + +func (r *Retry) handle(ctx context.Context, fn func(internalapi.Interface) error) error { + for { + a, err := r.waitAPIReady(ctx) + if err != nil { + return err + } + + err = fn(a) + if err == nil || !errors.Is(err, internalapi.ErrClosed) { + return err + } + + select { + case <-time.After(time.Millisecond * 300): + case <-r.closeCh: + return errClosed + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Ready unblocks the Retry API calls, allowing them to be executed on the +// underlying engine. +func (r *Retry) Ready(engine engine.Interface) { + r.lock.Lock() + defer r.lock.Unlock() + + r.engine.Store(&engine) + close(r.readyCh) +} + +// NotReady blocks the Retry API calls, preventing them from being executed +// on the underlying engine. +func (r *Retry) NotReady() { + r.lock.Lock() + defer r.lock.Unlock() + r.readyCh = make(chan struct{}) +} + +func (r *Retry) Close() { + close(r.closeCh) +} + +func (r *Retry) waitAPIReady(ctx context.Context) (internalapi.Interface, error) { + r.lock.Lock() + readyCh := r.readyCh + r.lock.Unlock() + + select { + case <-readyCh: + return (*r.engine.Load()).API(), nil + case <-r.closeCh: + return nil, errClosed + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/internal/engine/retry/retry_test.go b/internal/engine/retry/retry_test.go new file mode 100644 index 0000000..efaec05 --- /dev/null +++ b/internal/engine/retry/retry_test.go @@ -0,0 +1,119 @@ +/* +Copyright (c) 2025 Diagrid Inc. +Licensed under the MIT License. +*/ + +package retry + +import ( + "context" + "errors" + "sync/atomic" + "testing" + + "github.com/diagridio/go-etcd-cron/internal/api" + "github.com/diagridio/go-etcd-cron/internal/engine/fake" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_handle(t *testing.T) { + t.Parallel() + + t.Run("if the given context is cancelled, should return error", func(t *testing.T) { + t.Parallel() + + r := New() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.Error(t, r.handle(ctx, nil)) + }) + + t.Run("if retry has been closed, then should return error", func(t *testing.T) { + t.Parallel() + + r := New() + r.Close() + require.Error(t, r.handle(context.Background(), nil)) + }) + + t.Run("when retry ready, should call given func", func(t *testing.T) { + t.Parallel() + + r := New() + r.Ready(fake.New()) + + var called atomic.Bool + require.NoError(t, r.handle(context.Background(), func(a api.Interface) error { + called.Store(true) + return nil + })) + + assert.True(t, called.Load()) + }) + + t.Run("if handle func returns error, expect error", func(t *testing.T) { + t.Parallel() + + r := New() + r.Ready(fake.New()) + + var called atomic.Bool + require.Error(t, r.handle(context.Background(), func(a api.Interface) error { + called.Store(true) + return errors.New("this is an error") + })) + + assert.True(t, called.Load()) + }) + + t.Run("if error api closed, expect multiple calls till it is not", func(t *testing.T) { + t.Parallel() + + r := New() + r.Ready(fake.New()) + + var called atomic.Int64 + require.NoError(t, r.handle(context.Background(), func(a api.Interface) error { + if called.Add(1) < 4 { + return api.ErrClosed + } + return nil + })) + + assert.Equal(t, int64(4), called.Load()) + }) + + t.Run("if context cancelled during retry loop, expect context error", func(t *testing.T) { + t.Parallel() + + r := New() + r.Ready(fake.New()) + + var called atomic.Int64 + ctx, cancel := context.WithCancel(context.Background()) + err := r.handle(ctx, func(a api.Interface) error { + if called.Add(1) > 3 { + cancel() + } + return api.ErrClosed + }) + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("if closed during retry loop, expect closed error", func(t *testing.T) { + t.Parallel() + + r := New() + r.Ready(fake.New()) + + var called atomic.Int64 + err := r.handle(context.Background(), func(a api.Interface) error { + if called.Add(1) > 3 { + r.Close() + } + return api.ErrClosed + }) + require.ErrorIs(t, err, errClosed) + }) +} diff --git a/tests/suite/retry_test.go b/tests/suite/retry_test.go index 0a417b5..1b460a1 100644 --- a/tests/suite/retry_test.go +++ b/tests/suite/retry_test.go @@ -6,48 +6,70 @@ Licensed under the MIT License. package suite import ( - "sync" + "context" + "errors" + "strconv" "testing" "time" "github.com/dapr/kit/ptr" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/diagridio/go-etcd-cron/api" - "github.com/diagridio/go-etcd-cron/tests/framework/cron/integration" + "github.com/diagridio/go-etcd-cron/cron" + "github.com/diagridio/go-etcd-cron/tests/framework/etcd" ) -func Test_retry(t *testing.T) { +func Test_leadership_retry(t *testing.T) { t.Parallel() - ok := api.TriggerResponseResult_FAILED - var lock sync.Mutex - cron := integration.New(t, integration.Options{ - Instances: 1, - TriggerFn: func(*api.TriggerRequest) *api.TriggerResponse { - lock.Lock() - defer lock.Unlock() - return &api.TriggerResponse{Result: ok} - }, - }) + t.Run("a API call should never fail during leadership changes", func(t *testing.T) { + t.Parallel() + + client := etcd.EmbeddedBareClient(t) + cron1, err := cron.New(cron.Options{ + Client: client, + ID: "123", + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { return nil }, + }) + require.NoError(t, err) + + errCh := make(chan error) + ctx1, cancel1 := context.WithCancel(context.Background()) + go func() { errCh <- cron1.Run(ctx1) }() + + t.Cleanup(func() { + cancel1() + require.NoError(t, <-errCh) + }) - job := &api.Job{ - DueTime: ptr.Of(time.Now().Format(time.RFC3339)), - } - require.NoError(t, cron.API().Add(cron.Context(), "yoyo", job)) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Greater(c, cron.Triggered(), 1) - }, 5*time.Second, 10*time.Millisecond) - lock.Lock() - triggered := cron.Triggered() - triggered++ - ok = api.TriggerResponseResult_SUCCESS - lock.Unlock() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, triggered, cron.Triggered()) - }, time.Second*10, time.Millisecond*10) - <-time.After(3 * time.Second) - assert.Equal(t, triggered, cron.Triggered()) + ierrCh := make(chan error, 10*2) + go func() { + for range 10 { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + c, err := cron.New(cron.Options{ + Client: client, + ID: "456", + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { return nil }, + }) + ierrCh <- err + if err := c.Run(ctx); errors.Is(err, context.DeadlineExceeded) { + ierrCh <- nil + } else { + ierrCh <- err + } + cancel() + } + }() + + for i := range 5000 { + require.NoError(t, cron1.Add(context.Background(), strconv.Itoa(i), &api.Job{ + DueTime: ptr.Of("100000s"), + })) + } + + for range 10 * 2 { + require.NoError(t, <-ierrCh) + } + }) }