From edbaf479709324e6b8281e1bf52292b11ed5dcc4 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 28 Jan 2025 22:55:31 +0000 Subject: [PATCH] Add a backoff mechanism when deleting runners This change adds a backoff mechanism when deleting github runners. If the delete operation fails, we record the event and retry with a geometric progression of 1.5 starting from 5 seconds, which is the pool consolidation timeout. Signed-off-by: Gabriel Adrian Samfira --- runner/pool/locking.go | 61 +++++++++++++++++++++++++++++++++++++++++- runner/pool/pool.go | 49 +++++++++++++++++++++++++-------- 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/runner/pool/locking.go b/runner/pool/locking.go index 7f6c7bc6..70471f98 100644 --- a/runner/pool/locking.go +++ b/runner/pool/locking.go @@ -1,6 +1,15 @@ package pool -import "sync" +import ( + "sync" + "time" + + "github.com/cloudbase/garm/runner/common" +) + +const ( + maxBackoffSeconds float64 = 1200 // 20 minutes +) type keyMutex struct { muxes sync.Map @@ -27,3 +36,53 @@ func (k *keyMutex) Unlock(key string, remove bool) { func (k *keyMutex) Delete(key string) { k.muxes.Delete(key) } + +type instanceBackOff struct { + backoffSeconds float64 + lastRecordedFailureTime time.Time + mux sync.Mutex +} + +type instanceDeleteBackoff struct { + muxes sync.Map +} + +func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) { + backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{}) + if !loaded { + return true, time.Time{} + } + + ib := backoff.(*instanceBackOff) + ib.mux.Lock() + defer ib.mux.Unlock() + + if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 { + return true, time.Time{} + } + + now := time.Now().UTC() + deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second) + return deadline.After(now), deadline +} + +func (i *instanceDeleteBackoff) Delete(key string) { + i.muxes.Delete(key) +} + +func (i *instanceDeleteBackoff) RecordFailure(key string) { + backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{}) + ib := backoff.(*instanceBackOff) + ib.mux.Lock() + defer ib.mux.Unlock() + + ib.lastRecordedFailureTime = time.Now().UTC() + if ib.backoffSeconds == 0 { + ib.backoffSeconds = common.PoolConsilitationInterval.Seconds() + } else { + // Geometric progression of 1.5 + newBackoff := ib.backoffSeconds * 1.5 + // Cap the backoff to 20 minutes + ib.backoffSeconds = min(newBackoff, maxBackoffSeconds) + } +} diff --git a/runner/pool/pool.go b/runner/pool/pool.go index cca22d4f..7e2a6080 100644 --- a/runner/pool/pool.go +++ b/runner/pool/pool.go @@ -16,9 +16,11 @@ package pool import ( "context" + "crypto/rand" "fmt" "log/slog" "math" + "math/big" "net/http" "strconv" "strings" @@ -90,6 +92,7 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta wg := &sync.WaitGroup{} keyMuxes := &keyMutex{} + backoff := &instanceDeleteBackoff{} repo := &basePoolManager{ ctx: ctx, @@ -103,6 +106,7 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta quit: make(chan struct{}), wg: wg, keyMux: keyMuxes, + backoff: backoff, consumer: consumer, } return repo, nil @@ -125,9 +129,10 @@ type basePoolManager struct { managerIsRunning bool managerErrorReason string - mux sync.Mutex - wg *sync.WaitGroup - keyMux *keyMutex + mux sync.Mutex + wg *sync.WaitGroup + keyMux *keyMutex + backoff *instanceDeleteBackoff } func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.ProviderBaseParams { @@ -1391,21 +1396,35 @@ func (r *basePoolManager) deletePendingInstances() error { continue } - currentStatus := instance.Status - // Set the status to deleting before launching the goroutine that removes - // the runner from the provider (which can take a long time). - if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil { - slog.With(slog.Any("error", err)).ErrorContext( - r.ctx, "failed to update runner status", - "runner_name", instance.Name) + shouldProcess, deadline := r.backoff.ShouldProcess(instance.Name) + if !shouldProcess { + slog.DebugContext( + r.ctx, "backoff in effect for instance", + "runner_name", instance.Name, "deadline", deadline) r.keyMux.Unlock(instance.Name, false) continue } go func(instance params.Instance) (err error) { + // Prevent Thundering Herd. Should alleviate some of the database + // is locked errors in sqlite3. + num, err := rand.Int(rand.Reader, big.NewInt(2000)) + if err != nil { + return fmt.Errorf("failed to generate random number: %w", err) + } + jitter := time.Duration(num.Int64()) * time.Millisecond + time.Sleep(jitter) + + currentStatus := instance.Status deleteMux := false defer func() { r.keyMux.Unlock(instance.Name, deleteMux) + if deleteMux { + // deleteMux is set only when the instance was successfully removed. + // We can use it as a marker to signal that the backoff is no longer + // needed. + r.backoff.Delete(instance.Name) + } }() defer func(instance params.Instance) { if err != nil { @@ -1414,14 +1433,22 @@ func (r *basePoolManager) deletePendingInstances() error { "runner_name", instance.Name) // failed to remove from provider. Set status to previous value, which will retry // the operation. - if _, err := r.setInstanceStatus(instance.Name, currentStatus, nil); err != nil { + if _, err := r.setInstanceStatus(instance.Name, currentStatus, []byte(err.Error())); err != nil { slog.With(slog.Any("error", err)).ErrorContext( r.ctx, "failed to update runner status", "runner_name", instance.Name) } + r.backoff.RecordFailure(instance.Name) } }(instance) + if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil { + slog.With(slog.Any("error", err)).ErrorContext( + r.ctx, "failed to update runner status", + "runner_name", instance.Name) + return err + } + slog.DebugContext( r.ctx, "removing instance from provider", "runner_name", instance.Name)