Skip to content

Commit

Permalink
Fix a crash that can occur when handling keepalives (#4559)
Browse files Browse the repository at this point in the history
This commit fixes a crash that can occur when another backend
revokes a lease for an agent, or if an operator manually revokes the
lease.

Signed-off-by: Eric Chlebek <eric@sensu.io>
  • Loading branch information
echlebek authored Dec 15, 2021
1 parent 7105b96 commit fca04ae
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ chances of a cluster recovering after the loss of a backend.
- New agent sessions will no longer result in a leaked Etcd lease.
- sensu-backend now prints warning and continues instead of crashing
when --event-log-file cannot be written to.
- Fixed an infinite loop that can occur when keepalive leases are revoked on
another backend, or by an etcd operator.

## [6.6.1, 6.6.2] - 2021-11-29

Expand Down
71 changes: 34 additions & 37 deletions backend/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ func NewSwitchSet(client *clientv3.Client, name string, dead, alive EventFunc, l
// TTL value is 5. If a smaller value is passed, then an error will be returned
// and no registration will occur.
func (t *SwitchSet) Alive(ctx context.Context, id string, ttl int64) error {
return kvc.Backoff(ctx).Retry(func(n int) (done bool, err error) {
return kvc.RetryRequest(n, t.ping(ctx, id, ttl, true))
})
return t.ping(ctx, id, ttl, true)
}

// BuryAndRevokeLease is similar to Bury() but will revoke the lease associated
Expand Down Expand Up @@ -230,9 +228,7 @@ func (t *SwitchSet) Bury(ctx context.Context, id string) error {
// TTL value is 5. If a smaller value is passed, then an error will be returned
// and no registration will occur.
func (t *SwitchSet) Dead(ctx context.Context, id string, ttl int64) error {
return kvc.Backoff(ctx).Retry(func(n int) (done bool, err error) {
return kvc.RetryRequest(n, t.ping(ctx, id, ttl, false))
})
return t.ping(ctx, id, ttl, false)
}

func isBuried(event *clientv3.Event) bool {
Expand All @@ -258,25 +254,44 @@ func (t *SwitchSet) ping(ctx context.Context, id string, ttl int64, alive bool)
key := path.Join(t.prefix, id)
val := fmt.Sprintf("%d", putVal)

leaseID, err := t.getLeaseID(ctx, ttl, id)
if err != nil {
return err
}

return kvc.Backoff(ctx).Retry(func(n int) (done bool, err error) {
_, err = t.client.Put(ctx, key, val, clientv3.WithLease(leaseID), clientv3.WithPrevKV())
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypePut, etcd.LeaseStatusFor(err)).Inc()
return kvc.RetryRequest(n, err)
// WithIgnoreLease will re-use the existing lease
resp, err := t.client.Put(ctx, key, val, clientv3.WithIgnoreLease(), clientv3.WithPrevKV())
if err != nil {
// The existing lease wasn't found, it must have expired or
// been revoked. This isn't strictly an error as it can occur
// in the course of normal operation. As such, we won't track
// metrics for it.
leaseID, err := t.newLease(ctx, ttl)
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypePut, etcd.LeaseStatusFor(err)).Inc()
if err != nil {
return kvc.RetryRequest(n, err)
}
_, err = t.client.Put(ctx, key, val, clientv3.WithLease(leaseID))
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypePut, etcd.LeaseStatusFor(err)).Inc()
if err != nil {
return false, err
}
return true, nil
}
leaseID := clientv3.LeaseID(resp.PrevKv.Lease)
_, err = t.client.KeepAliveOnce(ctx, leaseID)
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypeKeepalive, etcd.LeaseStatusFor(err)).Inc()
if err != nil {
return false, err
}
return true, nil
})
}

func (t *SwitchSet) getLeaseIDFromKV(ctx context.Context, kv *mvccpb.KeyValue, ttl int64, switchID string) (clientv3.LeaseID, error) {
func (t *SwitchSet) getLeaseIDFromKV(ctx context.Context, kv *mvccpb.KeyValue, ttl int64) (clientv3.LeaseID, error) {
leaseID := clientv3.LeaseID(kv.Lease)
if leaseID == 0 {
return t.getLeaseID(ctx, ttl, switchID)
return t.newLease(ctx, ttl)
}
if _, err := t.client.KeepAliveOnce(ctx, leaseID); err != nil {
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypeKeepalive, etcd.LeaseOperationStatusExpired).Inc()
_, err := t.client.KeepAliveOnce(ctx, leaseID)
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypeKeepalive, etcd.LeaseStatusFor(err)).Inc()
if err != nil {
return t.newLease(ctx, ttl)
}
return leaseID, nil
Expand All @@ -295,24 +310,6 @@ func (t *SwitchSet) newLease(ctx context.Context, ttl int64) (clientv3.LeaseID,
return leaseID, err
}

func (t *SwitchSet) getLeaseID(ctx context.Context, ttl int64, switchID string) (clientv3.LeaseID, error) {
key := path.Join(t.prefix, switchID)
var resp *clientv3.GetResponse
err := kvc.Backoff(ctx).Retry(func(n int) (done bool, err error) {
resp, err = t.client.Get(ctx, key, clientv3.WithSerializable(), clientv3.WithKeysOnly())
return kvc.RetryRequest(n, err)
})
if err != nil {
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypeFind, etcd.LeaseOperationStatusError).Inc()
return 0, err
}
if len(resp.Kvs) == 0 {
etcd.LeaseOperationsCounter.WithLabelValues("liveness", etcd.LeaseOperationTypeFind, etcd.LeaseOperationStatusExpired).Inc()
return t.newLease(ctx, ttl)
}
return t.getLeaseIDFromKV(ctx, resp.Kvs[0], ttl, switchID)
}

func (t *SwitchSet) getTTLFromEvent(event *clientv3.Event) (int64, State) {
var (
ttl, prev int64
Expand Down Expand Up @@ -424,7 +421,7 @@ func (t *SwitchSet) handleEvent(ctx context.Context, event *clientv3.Event) {
}

t.logger.Debugf("creating a lease for %s with TTL %d", key, leaseTTL)
leaseID, err := t.getLeaseIDFromKV(ctx, event.Kv, leaseTTL, strings.TrimPrefix(key, t.prefix))
leaseID, err := t.getLeaseIDFromKV(ctx, event.Kv, leaseTTL)
if err != nil {
t.logger.WithError(err).Errorf("error while granting lease for %s", key)
return
Expand Down

0 comments on commit fca04ae

Please sign in to comment.