Skip to content

Commit

Permalink
Add a backoff mechanism when deleting runners
Browse files Browse the repository at this point in the history
This change adds a backoff mechanism when deleting github runners.
If the delete operation fails, we record the event and retry with
a geometric progression of 1.5 starting from 5 seconds, which is the
pool consolidation timeout.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
  • Loading branch information
gabriel-samfira committed Jan 28, 2025
1 parent e0e60d4 commit edbaf47
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 12 deletions.
61 changes: 60 additions & 1 deletion runner/pool/locking.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package pool

import "sync"
import (
"sync"
"time"

"github.com/cloudbase/garm/runner/common"
)

const (
maxBackoffSeconds float64 = 1200 // 20 minutes
)

type keyMutex struct {
muxes sync.Map
Expand All @@ -27,3 +36,53 @@ func (k *keyMutex) Unlock(key string, remove bool) {
func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}

type instanceBackOff struct {
backoffSeconds float64
lastRecordedFailureTime time.Time
mux sync.Mutex
}

type instanceDeleteBackoff struct {
muxes sync.Map
}

func (i *instanceDeleteBackoff) ShouldProcess(key string) (bool, time.Time) {
backoff, loaded := i.muxes.LoadOrStore(key, &instanceBackOff{})
if !loaded {
return true, time.Time{}
}

ib := backoff.(*instanceBackOff)
ib.mux.Lock()
defer ib.mux.Unlock()

if ib.lastRecordedFailureTime.IsZero() || ib.backoffSeconds == 0 {
return true, time.Time{}
}

now := time.Now().UTC()
deadline := ib.lastRecordedFailureTime.Add(time.Duration(ib.backoffSeconds) * time.Second)
return deadline.After(now), deadline
}

func (i *instanceDeleteBackoff) Delete(key string) {
i.muxes.Delete(key)
}

func (i *instanceDeleteBackoff) RecordFailure(key string) {
backoff, _ := i.muxes.LoadOrStore(key, &instanceBackOff{})
ib := backoff.(*instanceBackOff)
ib.mux.Lock()
defer ib.mux.Unlock()

ib.lastRecordedFailureTime = time.Now().UTC()
if ib.backoffSeconds == 0 {
ib.backoffSeconds = common.PoolConsilitationInterval.Seconds()
} else {
// Geometric progression of 1.5
newBackoff := ib.backoffSeconds * 1.5
// Cap the backoff to 20 minutes
ib.backoffSeconds = min(newBackoff, maxBackoffSeconds)
}
}
49 changes: 38 additions & 11 deletions runner/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package pool

import (
"context"
"crypto/rand"
"fmt"
"log/slog"
"math"
"math/big"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -90,6 +92,7 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta

wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
backoff := &instanceDeleteBackoff{}

repo := &basePoolManager{
ctx: ctx,
Expand All @@ -103,6 +106,7 @@ func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, insta
quit: make(chan struct{}),
wg: wg,
keyMux: keyMuxes,
backoff: backoff,
consumer: consumer,
}
return repo, nil
Expand All @@ -125,9 +129,10 @@ type basePoolManager struct {
managerIsRunning bool
managerErrorReason string

mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
backoff *instanceDeleteBackoff
}

func (r *basePoolManager) getProviderBaseParams(pool params.Pool) common.ProviderBaseParams {
Expand Down Expand Up @@ -1391,21 +1396,35 @@ func (r *basePoolManager) deletePendingInstances() error {
continue
}

currentStatus := instance.Status
// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", instance.Name)
shouldProcess, deadline := r.backoff.ShouldProcess(instance.Name)
if !shouldProcess {
slog.DebugContext(
r.ctx, "backoff in effect for instance",
"runner_name", instance.Name, "deadline", deadline)
r.keyMux.Unlock(instance.Name, false)
continue
}

go func(instance params.Instance) (err error) {
// Prevent Thundering Herd. Should alleviate some of the database
// is locked errors in sqlite3.
num, err := rand.Int(rand.Reader, big.NewInt(2000))
if err != nil {
return fmt.Errorf("failed to generate random number: %w", err)
}
jitter := time.Duration(num.Int64()) * time.Millisecond
time.Sleep(jitter)

currentStatus := instance.Status
deleteMux := false
defer func() {
r.keyMux.Unlock(instance.Name, deleteMux)
if deleteMux {
// deleteMux is set only when the instance was successfully removed.
// We can use it as a marker to signal that the backoff is no longer
// needed.
r.backoff.Delete(instance.Name)
}
}()
defer func(instance params.Instance) {
if err != nil {
Expand All @@ -1414,14 +1433,22 @@ func (r *basePoolManager) deletePendingInstances() error {
"runner_name", instance.Name)
// failed to remove from provider. Set status to previous value, which will retry
// the operation.
if _, err := r.setInstanceStatus(instance.Name, currentStatus, nil); err != nil {
if _, err := r.setInstanceStatus(instance.Name, currentStatus, []byte(err.Error())); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", instance.Name)
}
r.backoff.RecordFailure(instance.Name)
}
}(instance)

if _, err := r.setInstanceStatus(instance.Name, commonParams.InstanceDeleting, nil); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", instance.Name)
return err
}

slog.DebugContext(
r.ctx, "removing instance from provider",
"runner_name", instance.Name)
Expand Down

0 comments on commit edbaf47

Please sign in to comment.