diff --git a/controllers/options.go b/controllers/options.go index 2a465316ea..6ec46382c2 100644 --- a/controllers/options.go +++ b/controllers/options.go @@ -1,10 +1,15 @@ package controllers -import "errors" +import ( + "errors" + + "k8s.io/client-go/util/workqueue" +) // ControllerConfig is the configuration for cluster and machine controllers type ControllerConfig struct { MaxConcurrentReconciles int + RateLimiter workqueue.RateLimiter } // ControllerConfigOpts is a function that can be used to configure the controller config @@ -20,3 +25,14 @@ func WithMaxConcurrentReconciles(max int) ControllerConfigOpts { return nil } } + +// WithRateLimiter sets the rate limiter for the controller +func WithRateLimiter(rateLimiter workqueue.RateLimiter) ControllerConfigOpts { + return func(c *ControllerConfig) error { + if rateLimiter == nil { + return errors.New("rate limiter cannot be nil") + } + c.RateLimiter = rateLimiter + return nil + } +} diff --git a/controllers/options_test.go b/controllers/options_test.go index 6420eb986f..9849158202 100644 --- a/controllers/options_test.go +++ b/controllers/options_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "k8s.io/client-go/util/workqueue" ) func TestWithMaxConcurrentReconciles(t *testing.T) { @@ -37,3 +38,38 @@ func TestWithMaxConcurrentReconciles(t *testing.T) { }) } } + +func TestWithRateLimiter(t *testing.T) { + tests := []struct { + name string + rateLimiter workqueue.RateLimiter + expectError bool + expectedType interface{} + }{ + { + name: "TestWithRateLimiterNil", + rateLimiter: nil, + expectError: true, + expectedType: nil, + }, + { + name: "TestWithRateLimiterSet", + rateLimiter: workqueue.DefaultControllerRateLimiter(), + expectError: false, + expectedType: &workqueue.MaxOfRateLimiter{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opt := WithRateLimiter(tt.rateLimiter) + config := &ControllerConfig{} + err := opt(config) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.IsType(t, tt.expectedType, config.RateLimiter) + } + }) + } +} diff --git a/main.go b/main.go index 17a87af985..f120ca33dd 100644 --- a/main.go +++ b/main.go @@ -17,20 +17,13 @@ limitations under the License. package main import ( + "errors" "flag" "os" "time" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc. - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc. - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc. - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc. - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc. - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc. - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. - "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" @@ -38,6 +31,7 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" capiv1 "sigs.k8s.io/cluster-api/api/v1beta1" bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1" ctrl "sigs.k8s.io/controller-runtime" @@ -60,11 +54,8 @@ var gitCommitHash string func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(capiv1.AddToScheme(scheme)) - utilruntime.Must(bootstrapv1.AddToScheme(scheme)) - utilruntime.Must(infrav1alpha4.AddToScheme(scheme)) utilruntime.Must(infrav1beta1.AddToScheme(scheme)) @@ -82,18 +73,20 @@ func main() { enableLeaderElection bool probeAddr string maxConcurrentReconciles int + baseDelay time.Duration + maxDelay time.Duration + bucketSize int + qps int ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") - flag.BoolVar(&enableLeaderElection, "leader-elect", false, - "Enable leader election for controller manager. "+ - "Enabling this will ensure there is only one active controller manager.") - flag.IntVar( - &maxConcurrentReconciles, - "max-concurrent-reconciles", - defaultMaxConcurrentReconciles, - "The maximum number of allowed, concurrent reconciles.") + flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") + flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", defaultMaxConcurrentReconciles, "The maximum number of allowed, concurrent reconciles.") + flag.DurationVar(&baseDelay, "rate-limiter-base-delay", 500*time.Millisecond, "The base delay for the rate limiter.") + flag.DurationVar(&maxDelay, "rate-limiter-max-delay", 15*time.Minute, "The maximum delay for the rate limiter.") + flag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for the rate limiter.") + flag.IntVar(&qps, "rate-limiter-qps", 10, "The QPS for the rate limiter.") opts := zap.Options{ TimeEncoder: zapcore.RFC3339TimeEncoder, @@ -117,6 +110,12 @@ func main() { os.Exit(1) } + rateLimiter, err := compositeRateLimiter(baseDelay, maxDelay, bucketSize, qps) + if err != nil { + setupLog.Error(err, "unable to create composite rate limiter") + os.Exit(1) + } + // Set up the context that's going to be used in controllers and for the manager. ctx := ctrl.SetupSignalHandler() @@ -143,6 +142,7 @@ func main() { configMapInformer, mgr.GetScheme(), controllers.WithMaxConcurrentReconciles(maxConcurrentReconciles), + controllers.WithRateLimiter(rateLimiter), ) if err != nil { setupLog.Error(err, "unable to create controller", "controller", "NutanixCluster") @@ -159,6 +159,7 @@ func main() { configMapInformer, mgr.GetScheme(), controllers.WithMaxConcurrentReconciles(maxConcurrentReconciles), + controllers.WithRateLimiter(rateLimiter), ) if err != nil { setupLog.Error(err, "unable to create controller", "controller", "NutanixMachine") @@ -185,3 +186,48 @@ func main() { os.Exit(1) } } + +// compositeRateLimiter will build a limiter similar to the default from DefaultControllerRateLimiter but with custom values. +func compositeRateLimiter(baseDelay, maxDelay time.Duration, bucketSize, qps int) (workqueue.RateLimiter, error) { + // Validate the rate limiter configuration + if err := validateRateLimiterConfig(baseDelay, maxDelay, bucketSize, qps); err != nil { + return nil, err + } + exponentialBackoffLimiter := workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay) + bucketLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)} + return workqueue.NewMaxOfRateLimiter(exponentialBackoffLimiter, bucketLimiter), nil +} + +// validateRateLimiterConfig validates the rate limiter configuration parameters +func validateRateLimiterConfig(baseDelay, maxDelay time.Duration, bucketSize, qps int) error { + // Check if baseDelay is a non-negative value + if baseDelay < 0 { + return errors.New("baseDelay cannot be negative") + } + + // Check if maxDelay is non-negative and greater than or equal to baseDelay + if maxDelay < 0 { + return errors.New("maxDelay cannot be negative") + } + + if maxDelay < baseDelay { + return errors.New("maxDelay should be greater than or equal to baseDelay") + } + + // Check if bucketSize is a positive number + if bucketSize <= 0 { + return errors.New("bucketSize must be positive") + } + + // Check if qps is a positive number + if qps <= 0 { + return errors.New("minimum QPS must be positive") + } + + // Check if bucketSize is at least as large as the QPS + if bucketSize < qps { + return errors.New("bucketSize must be at least as large as the QPS to handle bursts effectively") + } + + return nil +} diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000000..fd759e9446 --- /dev/null +++ b/main_test.go @@ -0,0 +1,87 @@ +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRateLimiter(t *testing.T) { + tests := []struct { + name string + baseDelay time.Duration + maxDelay time.Duration + maxBurst int + qps int + expectedErr string + }{ + { + name: "valid rate limiter", + baseDelay: 500 * time.Millisecond, + maxDelay: 15 * time.Minute, + maxBurst: 100, + qps: 10, + }, + { + name: "negative base delay", + baseDelay: -500 * time.Millisecond, + maxDelay: 15 * time.Minute, + maxBurst: 100, + qps: 10, + expectedErr: "baseDelay cannot be negative", + }, + { + name: "negative max delay", + baseDelay: 500 * time.Millisecond, + maxDelay: -15 * time.Minute, + maxBurst: 100, + qps: 10, + expectedErr: "maxDelay cannot be negative", + }, + { + name: "maxDelay should be greater than or equal to baseDelay", + baseDelay: 500 * time.Millisecond, + maxDelay: 400 * time.Millisecond, + maxBurst: 100, + qps: 10, + expectedErr: "maxDelay should be greater than or equal to baseDelay", + }, + { + name: "bucketSize must be positive", + baseDelay: 500 * time.Millisecond, + maxDelay: 15 * time.Minute, + maxBurst: 0, + qps: 10, + expectedErr: "bucketSize must be positive", + }, + { + name: "qps must be positive", + baseDelay: 500 * time.Millisecond, + maxDelay: 15 * time.Minute, + maxBurst: 100, + qps: 0, + expectedErr: "minimum QPS must be positive", + }, + { + name: "bucketSize must be greater than or equal to qps", + baseDelay: 500 * time.Millisecond, + maxDelay: 15 * time.Minute, + maxBurst: 10, + qps: 100, + expectedErr: "bucketSize must be at least as large as the QPS to handle bursts effectively", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := compositeRateLimiter(tt.baseDelay, tt.maxDelay, tt.maxBurst, tt.qps) + if tt.expectedErr != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +}