Skip to content

Commit

Permalink
Leadership: re-reconcile on failed key read (#68)
Browse files Browse the repository at this point in the history
In the event of failed reading of other leadership keys, the leader will
continue to wait for the next informer reconcile instead of hard
erroring out.

This is useful for events whereby the leadership key space values are
not readable because they have been written by another cron who has a
different value API.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Jan 30, 2025
1 parent 277efd7 commit 74ca110
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
12 changes: 10 additions & 2 deletions internal/leadership/elector/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (e *Elector) Elect(ctx context.Context) (context.Context, *Elected, error)
for {
ldata, ok, err = e.quorumReconcile(ctx, resp)
if err != nil {
return nil, nil, fmt.Errorf("failed to reconcile to leadership quorum: %w", err)
e.log.Error(err, "failed to reconcile to leadership quorum")
}

if ctx.Err() != nil {
return nil, nil, err
}

if ok {
Expand Down Expand Up @@ -191,7 +195,11 @@ func (e *Elector) Reelect(ctx context.Context) (context.Context, *Elected, error
for {
ldata, ok, err = e.quorumReconcile(ctx, resp)
if err != nil {
return nil, nil, fmt.Errorf("failed to reconcile to leadership quorum: %w", err)
e.log.Error(err, "failed to reconcile to leadership quorum")
}

if ctx.Err() != nil {
return nil, nil, err
}

if ok {
Expand Down
54 changes: 54 additions & 0 deletions tests/suite/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"time"

"github.com/dapr/kit/ptr"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/types/known/anypb"

"github.com/diagridio/go-etcd-cron/api"
Expand Down Expand Up @@ -226,3 +228,55 @@ func Test_leadership_scaledown(t *testing.T) {

assert.Len(t, instanceCalled, 3)
}

func Test_leadership_wait_free(t *testing.T) {
t.Parallel()

client := etcd.EmbeddedBareClient(t)
opts := cron.Options{
Client: client,
Log: logr.Discard(),
ID: "123",
TriggerFn: func(_ context.Context, req *api.TriggerRequest) *api.TriggerResponse {
return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS}
},
}

ctx, cancel := context.WithCancel(context.Background())

_, err := client.Put(ctx, "leadership/456", "123")
require.NoError(t, err)

cr, err := cron.New(opts)
require.NoError(t, err)

errCh := make(chan error)
go func() { errCh <- cr.Run(ctx) }()

t.Cleanup(func() {
cancel()
select {
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for cron return")
case err := <-errCh:
require.NoError(t, err)
}
})

select {
case err := <-errCh:
t.Fatal(err)
case <-time.After(time.Second * 5):
}

_, err = client.Delete(ctx, "leadership/456")
require.NoError(t, err)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, cr.IsElected())
}, time.Second*10, time.Millisecond*10)

resp, err := client.Get(ctx, "", clientv3.WithPrefix())
require.NoError(t, err)
require.Len(t, resp.Kvs, 1)
}

0 comments on commit 74ca110

Please sign in to comment.