Skip to content

Commit

Permalink
Explicitly set a exponential backoff rate limiter for controller config
Browse files Browse the repository at this point in the history
The rate limiter base delay and max delay values can be configured using
flags. The default values are 500ms for base delay and 15min for max delay.
  • Loading branch information
thunderboltsid committed Apr 23, 2024
1 parent 8cef217 commit 8728c09
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 3 deletions.
18 changes: 17 additions & 1 deletion controllers/options.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package controllers

import "errors"
import (
"errors"

"k8s.io/client-go/util/workqueue"
)

// ControllerConfig is the configuration for cluster and machine controllers
type ControllerConfig struct {
MaxConcurrentReconciles int
RateLimiter workqueue.RateLimiter
}

// ControllerConfigOpts is a function that can be used to configure the controller config
Expand All @@ -20,3 +25,14 @@ func WithMaxConcurrentReconciles(max int) ControllerConfigOpts {
return nil
}
}

// WithRateLimiter sets the rate limiter for the controller
func WithRateLimiter(rateLimiter workqueue.RateLimiter) ControllerConfigOpts {
return func(c *ControllerConfig) error {
if rateLimiter == nil {
return errors.New("rate limiter cannot be nil")
}
c.RateLimiter = rateLimiter
return nil
}
}
36 changes: 36 additions & 0 deletions controllers/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/util/workqueue"
)

func TestWithMaxConcurrentReconciles(t *testing.T) {
Expand Down Expand Up @@ -37,3 +38,38 @@ func TestWithMaxConcurrentReconciles(t *testing.T) {
})
}
}

func TestWithRateLimiter(t *testing.T) {
tests := []struct {
name string
rateLimiter workqueue.RateLimiter
expectError bool
expectedType interface{}
}{
{
name: "TestWithRateLimiterNil",
rateLimiter: nil,
expectError: true,
expectedType: nil,
},
{
name: "TestWithRateLimiterSet",
rateLimiter: workqueue.DefaultControllerRateLimiter(),
expectError: false,
expectedType: &workqueue.MaxOfRateLimiter{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opt := WithRateLimiter(tt.rateLimiter)
config := &ControllerConfig{}
err := opt(config)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.IsType(t, tt.expectedType, config.RateLimiter)
}
})
}
}
23 changes: 21 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/go-logr/logr"
"github.com/spf13/pflag"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
Expand All @@ -35,6 +36,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
capiv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
capiflags "sigs.k8s.io/cluster-api/util/flags"
Expand Down Expand Up @@ -75,8 +77,15 @@ type managerConfig struct {
concurrentReconcilesNutanixMachine int
diagnosticsOptions capiflags.DiagnosticsOptions

logger logr.Logger
restConfig *rest.Config
logger logr.Logger
restConfig *rest.Config
rateLimiter workqueue.RateLimiter
}

func rateLimiter(baseDelay, maxDelay time.Duration, bucketSize, qps int) workqueue.RateLimiter {
exponentialBackoffLimiter := workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay)
bucketLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)}
return workqueue.NewMaxOfRateLimiter(exponentialBackoffLimiter, bucketLimiter)
}

func parseFlags(config *managerConfig) {
Expand All @@ -88,6 +97,13 @@ func parseFlags(config *managerConfig) {
pflag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", defaultMaxConcurrentReconciles,
"The maximum number of allowed, concurrent reconciles.")

var baseDelay, maxDelay time.Duration
var bucketSize, qps int
pflag.DurationVar(&baseDelay, "rate-limiter-base-delay", 500*time.Millisecond, "The base delay for the rate limiter.")
pflag.DurationVar(&maxDelay, "rate-limiter-max-delay", 15*time.Minute, "The maximum delay for the rate limiter.")
pflag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for the rate limiter.")
pflag.IntVar(&qps, "rate-limiter-qps", 10, "The QPS for the rate limiter.")

opts := zap.Options{
TimeEncoder: zapcore.RFC3339TimeEncoder,
}
Expand All @@ -100,6 +116,7 @@ func parseFlags(config *managerConfig) {

config.concurrentReconcilesNutanixCluster = maxConcurrentReconciles
config.concurrentReconcilesNutanixMachine = maxConcurrentReconciles
config.rateLimiter = rateLimiter(baseDelay, maxDelay, bucketSize, qps)
}

func setupLogger() logr.Logger {
Expand Down Expand Up @@ -189,6 +206,7 @@ func runManager(ctx context.Context, mgr manager.Manager, config *managerConfig)

clusterControllerOpts := []controllers.ControllerConfigOpts{
controllers.WithMaxConcurrentReconciles(config.concurrentReconcilesNutanixCluster),
controllers.WithRateLimiter(config.rateLimiter),
}

if err := setupNutanixClusterController(ctx, mgr, secretInformer, configMapInformer, clusterControllerOpts...); err != nil {
Expand All @@ -197,6 +215,7 @@ func runManager(ctx context.Context, mgr manager.Manager, config *managerConfig)

machineControllerOpts := []controllers.ControllerConfigOpts{
controllers.WithMaxConcurrentReconciles(config.concurrentReconcilesNutanixMachine),
controllers.WithRateLimiter(config.rateLimiter),
}

if err := setupNutanixMachineController(ctx, mgr, secretInformer, configMapInformer, machineControllerOpts...); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -125,6 +126,7 @@ func testRunManagerCommon(t *testing.T, ctrl *gomock.Controller) (*mockctlclient
concurrentReconcilesNutanixCluster: 1,
concurrentReconcilesNutanixMachine: 1,
restConfig: cfg,
rateLimiter: rateLimiter(500*time.Millisecond, 15*time.Minute, 100, 10),
}

client := mockctlclient.NewMockClient(ctrl)
Expand Down

0 comments on commit 8728c09

Please sign in to comment.