Skip to content

Commit

Permalink
feat: improve Operator performance by using caching to reduce api cal…
Browse files Browse the repository at this point in the history
…l and network impact (opendatahub-io#1190)

* feat: improve Operator performance by using caching to reduce api call
 and network impact
(work based on opendatahub-io#1189)
- secret: default application namespace + other default ones + istio cert
- configmap: all
- namespace: all
- ingressctrler: "default" one
- deployment: default application namespaces + default namespaces

Signed-off-by: Wen Zhou <wenzhou@redhat.com>

* revert: back out changes for webhook

Signed-off-by: Wen Zhou <wenzhou@redhat.com>

* update: move namespace for cache into functions and only add the ones for platform needs

Signed-off-by: Wen Zhou <wenzhou@redhat.com>

* update: rename functions

Signed-off-by: Wen Zhou <wenzhou@redhat.com>

---------

Signed-off-by: Wen Zhou <wenzhou@redhat.com>
  • Loading branch information
zdtsw authored Sep 3, 2024
1 parent fc005e3 commit 5759f5e
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 23 deletions.
2 changes: 2 additions & 0 deletions controllers/webhook/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -127,6 +128,7 @@ var _ = BeforeSuite(func() {
Host: webhookInstallOptions.LocalServingHost,
CertDir: webhookInstallOptions.LocalServingCertDir,
}),
Cache: cache.Options{Scheme: scheme},
})
Expect(err).NotTo(HaveOccurred())

Expand Down
113 changes: 90 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ import (
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -98,7 +100,7 @@ func init() { //nolint:gochecknoinits
utilruntime.Must(operatorv1.Install(scheme))
}

func main() { //nolint:funlen
func main() { //nolint:funlen,maintidx
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
Expand All @@ -125,15 +127,67 @@ func main() { //nolint:funlen

// root context
ctx := ctrl.SetupSignalHandler()
// Create new uncached client to run initial setup
setupCfg, err := config.GetConfig()
if err != nil {
setupLog.Error(err, "error getting config for setup")
os.Exit(1)
}
// uplift default limiataions
setupCfg.QPS = rest.DefaultQPS * controllerNum // 5 * 4 controllers
setupCfg.Burst = rest.DefaultBurst * controllerNum // 10 * 4 controllers

setupClient, err := client.New(setupCfg, client.Options{Scheme: scheme})
if err != nil {
setupLog.Error(err, "error getting client for setup")
os.Exit(1)
}
// Get operator platform
platform, err := cluster.GetPlatform(ctx, setupClient)
if err != nil {
setupLog.Error(err, "error getting platform")
os.Exit(1)
}

secretCache := createSecretCacheConfig(platform)
deploymentCache := createDeploymentCacheConfig(platform)
cacheOptions := cache.Options{
Scheme: scheme,
ByObject: map[client.Object]cache.ByObject{
// all CRD: mainly for pipeline v1 teckon and v2 argo and dashboard's own CRD
&apiextensionsv1.CustomResourceDefinition{}: {},
// Cannot find a label on various screts, so we need to watch all secrets
// this include, monitoring, dashboard, trustcabundle default cert etc for these NS
&corev1.Secret{}: {
Namespaces: secretCache,
},
// it is hard to find a label can be used for both trustCAbundle configmap and inferenceservice-config and deletionCM
&corev1.ConfigMap{}: {},
// TODO: we can limit scope of namespace if we find a way to only get list of DSProject
// also need for monitoring, trustcabundle
&corev1.Namespace{}: {},
// For catsrc (avoid frequently check cluster type)
&ofapiv1alpha1.CatalogSource{}: {
Field: fields.Set{"metadata.name": "addon-managed-odh-catalog"}.AsSelector(),
},
// For domain to get OpenshiftIngress and default cert
&operatorv1.IngressController{}: {
Field: fields.Set{"metadata.name": "default"}.AsSelector(),
},
// for prometheus and black-box deployment and ones we owns
&appsv1.Deployment{}: {Namespaces: deploymentCache},
},
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ // single pod does not need to have LeaderElection
Scheme: scheme,
Metrics: ctrlmetrics.Options{BindAddress: metricsAddr},
WebhookServer: ctrlwebhook.NewServer(ctrlwebhook.Options{
Port: 9443,
// TLSOpts: , // TODO: do we need tls for webhook
// TLSOpts: , // TODO: it was not set in the old code
}),
HealthProbeBindAddress: probeAddr,
Cache: cacheOptions,
LeaderElection: enableLeaderElection,
LeaderElectionID: "07ed84f7.opendatahub.io",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
Expand Down Expand Up @@ -202,27 +256,6 @@ func main() { //nolint:funlen
os.Exit(1)
}

// Create new uncached client to run initial setup
setupCfg, err := config.GetConfig()
if err != nil {
setupLog.Error(err, "error getting config for setup")
os.Exit(1)
}
// uplift default limiataions
setupCfg.QPS = rest.DefaultQPS * controllerNum // 5 * 4 controllers
setupCfg.Burst = rest.DefaultBurst * controllerNum // 10 * 4 controllers

setupClient, err := client.New(setupCfg, client.Options{Scheme: scheme})
if err != nil {
setupLog.Error(err, "error getting client for setup")
os.Exit(1)
}
// Get operator platform
platform, err := cluster.GetPlatform(ctx, setupClient)
if err != nil {
setupLog.Error(err, "error getting platform")
os.Exit(1)
}
// Check if user opted for disabling DSC configuration
disableDSCConfig, existDSCConfig := os.LookupEnv("DISABLE_DSC_CONFIG")
if existDSCConfig && disableDSCConfig != "false" {
Expand Down Expand Up @@ -285,3 +318,37 @@ func main() { //nolint:funlen
os.Exit(1)
}
}

func createSecretCacheConfig(platform cluster.Platform) map[string]cache.Config {
namespaceConfigs := map[string]cache.Config{
"istio-system": {FieldSelector: fields.Set{"metadata.name": "knative-serving-cert"}.AsSelector()}, // for expiration case
"openshift-ingress": {},
}
switch platform {
case cluster.ManagedRhods:
namespaceConfigs["redhat-ods-monitoring"] = cache.Config{}
namespaceConfigs["redhat-ods-applications"] = cache.Config{}
case cluster.SelfManagedRhods:
namespaceConfigs["redhat-ods-applications"] = cache.Config{}
default:
namespaceConfigs["opendatahub"] = cache.Config{}
}
return namespaceConfigs
}

func createDeploymentCacheConfig(platform cluster.Platform) map[string]cache.Config {
namespaceConfigs := map[string]cache.Config{}
switch platform {
case cluster.ManagedRhods: // no need workbench NS, only SFS no Deployment
namespaceConfigs["redhat-ods-monitoring"] = cache.Config{}
namespaceConfigs["redhat-ods-applications"] = cache.Config{}
//TODO: if ModelReg has a RHOAI NS
case cluster.SelfManagedRhods:
namespaceConfigs["redhat-ods-applications"] = cache.Config{}
//TODO: if ModelReg has a RHOAI NS
default:
namespaceConfigs["opendatahub"] = cache.Config{}
namespaceConfigs["odh-model-registries"] = cache.Config{}
}
return namespaceConfigs
}

0 comments on commit 5759f5e

Please sign in to comment.