From da45d91b8059bb2491d5c9085d91334141c7ebd6 Mon Sep 17 00:00:00 2001 From: Chin Huang Date: Thu, 6 Oct 2022 12:27:32 -0700 Subject: [PATCH] fix: Fix a couple of bugs for ClusterServingRuntime (#257) Fix a couple of bugs for ClusterServingRuntime and also rename two variables to reflect the cluster scope mode is enabled Signed-off-by: Chin Huang #### Motivation Fix bugs and make code more readable Signed-off-by: Chin Huang --- controllers/config/manifest.go | 5 +- controllers/modelmesh/modelmesh.go | 2 +- controllers/service_controller.go | 6 +- controllers/servingruntime_controller.go | 87 ++++++++++++++++++------ main.go | 4 +- 5 files changed, 75 insertions(+), 29 deletions(-) diff --git a/controllers/config/manifest.go b/controllers/config/manifest.go index 8e0ac247..6c3a6311 100644 --- a/controllers/config/manifest.go +++ b/controllers/config/manifest.go @@ -54,7 +54,7 @@ func Apply(cl client.Client, owner metav1.Object, templatePath string, context i return nil } -func Delete(cl client.Client, owner metav1.Object, templatePath string, context interface{}, fns ...mf.Transformer) error { +func Delete(cl client.Client, owner metav1.Object, templatePath string, context interface{}, namespace string, fns ...mf.Transformer) error { m, err := mf.ManifestFrom(PathTemplateSource(templatePath, context)) if err != nil { return err @@ -64,9 +64,8 @@ func Delete(cl client.Client, owner metav1.Object, templatePath string, context if owner != nil { asMfOwner := owner.(mf.Owner) fns = append(fns, mf.InjectOwner(asMfOwner)) - fns = append(fns, mf.InjectNamespace(asMfOwner.GetNamespace())) + fns = append(fns, mf.InjectNamespace(namespace)) } - m, err = m.Transform(fns...) if err != nil { return err diff --git a/controllers/modelmesh/modelmesh.go b/controllers/modelmesh/modelmesh.go index a9d11d7a..bb020a2b 100644 --- a/controllers/modelmesh/modelmesh.go +++ b/controllers/modelmesh/modelmesh.go @@ -178,7 +178,7 @@ func (m *Deployment) ensureMMContainerIsLast(deployment *appsv1.Deployment) erro func (m *Deployment) Delete(ctx context.Context, client client.Client) error { m.Log.Info("Deleting modelmesh deployment ", "name", m.Name, "namespace", m.Namespace) - return config.Delete(client, m.Owner, "config/internal/base/deployment.yaml.tmpl", m) + return config.Delete(client, m.Owner, "config/internal/base/deployment.yaml.tmpl", m, m.Namespace) } func (m *Deployment) transform(deployment *appsv1.Deployment, funcs ...func(deployment *appsv1.Deployment) error) error { diff --git a/controllers/service_controller.go b/controllers/service_controller.go index 1db6572b..22ee2799 100644 --- a/controllers/service_controller.go +++ b/controllers/service_controller.go @@ -97,7 +97,7 @@ type ServiceReconciler struct { ConfigProvider *config.ConfigProvider ConfigMapName types.NamespacedName ControllerDeployment types.NamespacedName - NamespaceOwned bool + ClusterScope bool MMServices *MMServiceMap ModelEventStream *mmesh.ModelMeshEventStream @@ -127,7 +127,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct var namespace string var owner metav1.Object - if r.NamespaceOwned { + if r.ClusterScope { // Per-namespace Services owned by the Namespace resource itself namespace = req.Name n := &corev1.Namespace{} @@ -407,7 +407,7 @@ func (r *ServiceReconciler) reconcileServiceMonitor(ctx context.Context, metrics func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { builder := ctrl.NewControllerManagedBy(mgr).Named("ServiceReconciler").Owns(&corev1.Service{}) - if r.NamespaceOwned { + if r.ClusterScope { // Services are owned by Namespace resources r.setupForClusterScope(builder) } else { diff --git a/controllers/servingruntime_controller.go b/controllers/servingruntime_controller.go index 1c3859d5..38975062 100644 --- a/controllers/servingruntime_controller.go +++ b/controllers/servingruntime_controller.go @@ -71,8 +71,8 @@ type ServingRuntimeReconciler struct { ConfigMapName types.NamespacedName ControllerName string ControllerNamespace string - // whether the controller has RBAC permission to read namespaces - HasNamespaceAccess bool + // whether the controller has cluster scope permissions + ClusterScope bool // whether the controller is enabled to read and watch ClusterServingRuntimes EnableCSRWatch bool // store some information about current runtimes for making scaling decisions @@ -98,7 +98,7 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque log := r.Log.WithValues("servingruntime", req.NamespacedName) log.V(1).Info("ServingRuntime reconciler called") // Make sure the namespace has serving enabled - mmEnabled, err := modelMeshEnabled2(ctx, req.Namespace, r.ControllerNamespace, r.Client, r.HasNamespaceAccess) + mmEnabled, err := modelMeshEnabled2(ctx, req.Namespace, r.ControllerNamespace, r.Client, r.ClusterScope) if err != nil { return RequeueResult, err } @@ -138,7 +138,7 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque // Delete etcd secret when there is no ServingRuntimes in a namespace etcdSecretName := cfg.GetEtcdSecretName() - if len(runtimes.Items) == 0 { + if len(srSpecs) == 0 { // We don't delete the etcd secret in the controller namespace if req.Namespace != r.ControllerNamespace { s := &corev1.Secret{} @@ -259,7 +259,6 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque AnnotationsMap: cfg.RuntimePodAnnotations, LabelsMap: cfg.RuntimePodLabels, } - // if the runtime is disabled, delete the deployment if spec.IsDisabled() || !spec.IsMultiModelRuntime() || !mmEnabled { log.Info("Runtime is disabled, incompatible with modelmesh, or namespace is not modelmesh-enabled") @@ -454,9 +453,9 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, // watch the user configmap and reconcile all runtimes when it changes Watches(&source.Kind{Type: &corev1.ConfigMap{}}, config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request { - return r.requestsForRuntimes("", func(rt *kserveapi.ServingRuntime) bool { - mme, err := modelMeshEnabled2(context.TODO(), rt.GetNamespace(), - r.ControllerNamespace, r.Client, r.HasNamespaceAccess) + return r.requestsForRuntimes("", func(namespace string) bool { + mme, err := modelMeshEnabled2(context.TODO(), namespace, + r.ControllerNamespace, r.Client, r.ClusterScope) return err != nil || mme // in case of error just reconcile anyhow }) }, r.ConfigProvider, &r.Client)). @@ -466,7 +465,7 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, return r.runtimeRequestsForPredictor(o.(*api.Predictor), "Predictor") })) - if r.HasNamespaceAccess { + if r.ClusterScope { // watch namespaces to check the modelmesh-enabled flag builder = builder.Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc( func(o client.Object) []reconcile.Request { @@ -509,25 +508,72 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, } func (r *ServingRuntimeReconciler) requestsForRuntimes(namespace string, - filter func(*kserveapi.ServingRuntime) bool) []reconcile.Request { + filter func(string) bool) []reconcile.Request { var opts []client.ListOption if namespace != "" { opts = []client.ListOption{client.InNamespace(namespace)} } - list := &kserveapi.ServingRuntimeList{} - if err := r.Client.List(context.TODO(), list, opts...); err != nil { + runtimes := &kserveapi.ServingRuntimeList{} + if err := r.Client.List(context.TODO(), runtimes, opts...); err != nil { r.Log.Error(err, "Error listing ServingRuntimes to reconcile", "namespace", namespace) return []reconcile.Request{} } - requests := make([]reconcile.Request, 0, len(list.Items)) - for i := range list.Items { - rt := &list.Items[i] - if filter == nil || filter(rt) { - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{Name: rt.Name, Namespace: rt.Namespace}, - }) + + var requests []reconcile.Request + var csrs *kserveapi.ClusterServingRuntimeList + if r.EnableCSRWatch { + csrs = &kserveapi.ClusterServingRuntimeList{} + if err := r.Client.List(context.TODO(), csrs); err != nil { + r.Log.Error(err, "Error listing ClusterServingRuntimes to reconcile") + return []reconcile.Request{} } } + if csrs != nil && len(csrs.Items) > 0 { + srnns := make(map[types.NamespacedName]struct{}) + var namespaces []string + if namespace != "" { + namespaces = []string{namespace} + } else { + list := &corev1.NamespaceList{} + if err := r.Client.List(context.TODO(), list); err != nil { + r.Log.Error(err, "Error listing namespaces to reconcile") + return []reconcile.Request{} + } + for i := range list.Items { + ns := &list.Items[i] + if filter == nil || filter(ns.Name) { + namespaces = append(namespaces, ns.Name) + } + } + } + for i := range csrs.Items { + csr := &csrs.Items[i] + if csr.Spec.IsMultiModelRuntime() { + for _, ns := range namespaces { + srnns[types.NamespacedName{Namespace: ns, Name: csr.Name}] = struct{}{} + } + } + } + for i := range runtimes.Items { + rt := &runtimes.Items[i] + if filter == nil || filter(rt.Namespace) { + srnns[types.NamespacedName{Namespace: rt.Namespace, Name: rt.Name}] = struct{}{} + } + } + for srnn := range srnns { + requests = append(requests, reconcile.Request{NamespacedName: srnn}) + } + } else { + for i := range runtimes.Items { + rt := &runtimes.Items[i] + if filter == nil || filter(rt.Namespace) { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: rt.Name, Namespace: rt.Namespace}, + }) + } + } + } + return requests } @@ -559,13 +605,14 @@ func (r *ServingRuntimeReconciler) clusterServingRuntimeRequests(csr *kserveapi. // return nothing if can't get namespaces if err := r.Client.List(context.TODO(), list); err != nil || len(list.Items) == 0 { + r.Log.Error(err, "Error listing namespaces to reconcile") return []reconcile.Request{} } requests := make([]reconcile.Request, 0, len(list.Items)) for i := range list.Items { ns := &list.Items[i] - mme, err := modelMeshEnabled2(context.TODO(), ns.Name, r.ControllerNamespace, r.Client, r.HasNamespaceAccess) + mme, err := modelMeshEnabled2(context.TODO(), ns.Name, r.ControllerNamespace, r.Client, r.ClusterScope) if err == nil && mme { requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{ Namespace: ns.Name, diff --git a/main.go b/main.go index 23ed49b2..d3e3d0d1 100644 --- a/main.go +++ b/main.go @@ -289,7 +289,7 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("Service"), Scheme: mgr.GetScheme(), ControllerDeployment: types.NamespacedName{Namespace: ControllerNamespace, Name: controllerDeploymentName}, - NamespaceOwned: clusterScopeMode, + ClusterScope: clusterScopeMode, MMServices: mmServiceMap, ModelEventStream: modelEventStream, ConfigProvider: cp, @@ -418,7 +418,7 @@ func main() { ConfigMapName: types.NamespacedName{Namespace: ControllerNamespace, Name: UserConfigMapName}, ControllerNamespace: ControllerNamespace, ControllerName: controllerDeploymentName, - HasNamespaceAccess: clusterScopeMode, + ClusterScope: clusterScopeMode, EnableCSRWatch: enableCSRWatch, RegistryMap: registryMap, }).SetupWithManager(mgr, enableIsvcWatch, runtimeControllerEvents); err != nil {