Skip to content

Commit

Permalink
Add configurable QPS and burst settings for kube API client (#2411)
Browse files Browse the repository at this point in the history
* Add configurable QPS and burst settings for kube API client

Introduce new flags to configure `QPS` and `Burst` for the Kubernetes API client, enabling better control over API rate limits.

Signed-off-by: R.K <ron.kahn@run.ai>

* Set a token bucket rate limiter for Kubernetes client

Replaced direct QPS and Burst configuration with a token bucket rate limiter using Kubernetes client-go's flowcontrol package.

Signed-off-by: R.K <ron.kahn@run.ai>

* Reorganize import for flowcontrol in main.go

Signed-off-by: R.K <ron.kahn@run.ai>

---------

Signed-off-by: R.K <ron.kahn@run.ai>
  • Loading branch information
ronk21runai authored Feb 13, 2025
1 parent 778cd72 commit 078ec30
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions cmd/training-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/util/flowcontrol"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -81,6 +82,8 @@ func main() {
var webhookServerPort int
var webhookServiceName string
var webhookSecretName string
var clientQps int
var clientBurst int

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -95,7 +98,8 @@ func main() {
flag.StringVar(&namespace, "namespace", os.Getenv(EnvKubeflowNamespace), "The namespace to monitor kubeflow jobs. If unset, it monitors all namespaces cluster-wide."+
"If set, it only monitors kubeflow jobs in the given namespace.")
flag.IntVar(&controllerThreads, "controller-threads", 1, "Number of worker threads used by the controller.")

flag.IntVar(&clientQps, "kube-api-qps", 20, "QPS indicates the maximum QPS to the master from this client.")
flag.IntVar(&clientBurst, "kube-api-burst", 30, "Maximum burst for throttle.")
// PyTorch related flags
flag.StringVar(&config.Config.PyTorchInitContainerImage, "pytorch-init-container-image",
config.PyTorchInitContainerImageDefault, "The image for pytorch init container")
Expand Down Expand Up @@ -131,7 +135,10 @@ func main() {
}
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
cfg := ctrl.GetConfigOrDie()
cfg.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(clientQps), clientBurst)

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
Expand Down

0 comments on commit 078ec30

Please sign in to comment.