Skip to content

Commit

Permalink
fix: Fix a couple of bugs for ClusterServingRuntime (#257)
Browse files Browse the repository at this point in the history
Fix a couple of bugs for ClusterServingRuntime and also rename two variables to reflect the cluster scope mode is enabled

Signed-off-by: Chin Huang <chhuang@us.ibm.com>

#### Motivation
Fix bugs and make code more readable


Signed-off-by: Chin Huang <chhuang@us.ibm.com>
  • Loading branch information
chinhuang007 authored Oct 6, 2022
1 parent 3cfb2db commit da45d91
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 29 deletions.
5 changes: 2 additions & 3 deletions controllers/config/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Apply(cl client.Client, owner metav1.Object, templatePath string, context i
return nil
}

func Delete(cl client.Client, owner metav1.Object, templatePath string, context interface{}, fns ...mf.Transformer) error {
func Delete(cl client.Client, owner metav1.Object, templatePath string, context interface{}, namespace string, fns ...mf.Transformer) error {
m, err := mf.ManifestFrom(PathTemplateSource(templatePath, context))
if err != nil {
return err
Expand All @@ -64,9 +64,8 @@ func Delete(cl client.Client, owner metav1.Object, templatePath string, context
if owner != nil {
asMfOwner := owner.(mf.Owner)
fns = append(fns, mf.InjectOwner(asMfOwner))
fns = append(fns, mf.InjectNamespace(asMfOwner.GetNamespace()))
fns = append(fns, mf.InjectNamespace(namespace))
}

m, err = m.Transform(fns...)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion controllers/modelmesh/modelmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (m *Deployment) ensureMMContainerIsLast(deployment *appsv1.Deployment) erro

func (m *Deployment) Delete(ctx context.Context, client client.Client) error {
m.Log.Info("Deleting modelmesh deployment ", "name", m.Name, "namespace", m.Namespace)
return config.Delete(client, m.Owner, "config/internal/base/deployment.yaml.tmpl", m)
return config.Delete(client, m.Owner, "config/internal/base/deployment.yaml.tmpl", m, m.Namespace)
}

func (m *Deployment) transform(deployment *appsv1.Deployment, funcs ...func(deployment *appsv1.Deployment) error) error {
Expand Down
6 changes: 3 additions & 3 deletions controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type ServiceReconciler struct {
ConfigProvider *config.ConfigProvider
ConfigMapName types.NamespacedName
ControllerDeployment types.NamespacedName
NamespaceOwned bool
ClusterScope bool

MMServices *MMServiceMap
ModelEventStream *mmesh.ModelMeshEventStream
Expand Down Expand Up @@ -127,7 +127,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

var namespace string
var owner metav1.Object
if r.NamespaceOwned {
if r.ClusterScope {
// Per-namespace Services owned by the Namespace resource itself
namespace = req.Name
n := &corev1.Namespace{}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (r *ServiceReconciler) reconcileServiceMonitor(ctx context.Context, metrics

func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).Named("ServiceReconciler").Owns(&corev1.Service{})
if r.NamespaceOwned {
if r.ClusterScope {
// Services are owned by Namespace resources
r.setupForClusterScope(builder)
} else {
Expand Down
87 changes: 67 additions & 20 deletions controllers/servingruntime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ type ServingRuntimeReconciler struct {
ConfigMapName types.NamespacedName
ControllerName string
ControllerNamespace string
// whether the controller has RBAC permission to read namespaces
HasNamespaceAccess bool
// whether the controller has cluster scope permissions
ClusterScope bool
// whether the controller is enabled to read and watch ClusterServingRuntimes
EnableCSRWatch bool
// store some information about current runtimes for making scaling decisions
Expand All @@ -98,7 +98,7 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
log := r.Log.WithValues("servingruntime", req.NamespacedName)
log.V(1).Info("ServingRuntime reconciler called")
// Make sure the namespace has serving enabled
mmEnabled, err := modelMeshEnabled2(ctx, req.Namespace, r.ControllerNamespace, r.Client, r.HasNamespaceAccess)
mmEnabled, err := modelMeshEnabled2(ctx, req.Namespace, r.ControllerNamespace, r.Client, r.ClusterScope)
if err != nil {
return RequeueResult, err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque

// Delete etcd secret when there is no ServingRuntimes in a namespace
etcdSecretName := cfg.GetEtcdSecretName()
if len(runtimes.Items) == 0 {
if len(srSpecs) == 0 {
// We don't delete the etcd secret in the controller namespace
if req.Namespace != r.ControllerNamespace {
s := &corev1.Secret{}
Expand Down Expand Up @@ -259,7 +259,6 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
AnnotationsMap: cfg.RuntimePodAnnotations,
LabelsMap: cfg.RuntimePodLabels,
}

// if the runtime is disabled, delete the deployment
if spec.IsDisabled() || !spec.IsMultiModelRuntime() || !mmEnabled {
log.Info("Runtime is disabled, incompatible with modelmesh, or namespace is not modelmesh-enabled")
Expand Down Expand Up @@ -454,9 +453,9 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
// watch the user configmap and reconcile all runtimes when it changes
Watches(&source.Kind{Type: &corev1.ConfigMap{}},
config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request {
return r.requestsForRuntimes("", func(rt *kserveapi.ServingRuntime) bool {
mme, err := modelMeshEnabled2(context.TODO(), rt.GetNamespace(),
r.ControllerNamespace, r.Client, r.HasNamespaceAccess)
return r.requestsForRuntimes("", func(namespace string) bool {
mme, err := modelMeshEnabled2(context.TODO(), namespace,
r.ControllerNamespace, r.Client, r.ClusterScope)
return err != nil || mme // in case of error just reconcile anyhow
})
}, r.ConfigProvider, &r.Client)).
Expand All @@ -466,7 +465,7 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
return r.runtimeRequestsForPredictor(o.(*api.Predictor), "Predictor")
}))

if r.HasNamespaceAccess {
if r.ClusterScope {
// watch namespaces to check the modelmesh-enabled flag
builder = builder.Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(
func(o client.Object) []reconcile.Request {
Expand Down Expand Up @@ -509,25 +508,72 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
}

func (r *ServingRuntimeReconciler) requestsForRuntimes(namespace string,
filter func(*kserveapi.ServingRuntime) bool) []reconcile.Request {
filter func(string) bool) []reconcile.Request {
var opts []client.ListOption
if namespace != "" {
opts = []client.ListOption{client.InNamespace(namespace)}
}
list := &kserveapi.ServingRuntimeList{}
if err := r.Client.List(context.TODO(), list, opts...); err != nil {
runtimes := &kserveapi.ServingRuntimeList{}
if err := r.Client.List(context.TODO(), runtimes, opts...); err != nil {
r.Log.Error(err, "Error listing ServingRuntimes to reconcile", "namespace", namespace)
return []reconcile.Request{}
}
requests := make([]reconcile.Request, 0, len(list.Items))
for i := range list.Items {
rt := &list.Items[i]
if filter == nil || filter(rt) {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{Name: rt.Name, Namespace: rt.Namespace},
})

var requests []reconcile.Request
var csrs *kserveapi.ClusterServingRuntimeList
if r.EnableCSRWatch {
csrs = &kserveapi.ClusterServingRuntimeList{}
if err := r.Client.List(context.TODO(), csrs); err != nil {
r.Log.Error(err, "Error listing ClusterServingRuntimes to reconcile")
return []reconcile.Request{}
}
}
if csrs != nil && len(csrs.Items) > 0 {
srnns := make(map[types.NamespacedName]struct{})
var namespaces []string
if namespace != "" {
namespaces = []string{namespace}
} else {
list := &corev1.NamespaceList{}
if err := r.Client.List(context.TODO(), list); err != nil {
r.Log.Error(err, "Error listing namespaces to reconcile")
return []reconcile.Request{}
}
for i := range list.Items {
ns := &list.Items[i]
if filter == nil || filter(ns.Name) {
namespaces = append(namespaces, ns.Name)
}
}
}
for i := range csrs.Items {
csr := &csrs.Items[i]
if csr.Spec.IsMultiModelRuntime() {
for _, ns := range namespaces {
srnns[types.NamespacedName{Namespace: ns, Name: csr.Name}] = struct{}{}
}
}
}
for i := range runtimes.Items {
rt := &runtimes.Items[i]
if filter == nil || filter(rt.Namespace) {
srnns[types.NamespacedName{Namespace: rt.Namespace, Name: rt.Name}] = struct{}{}
}
}
for srnn := range srnns {
requests = append(requests, reconcile.Request{NamespacedName: srnn})
}
} else {
for i := range runtimes.Items {
rt := &runtimes.Items[i]
if filter == nil || filter(rt.Namespace) {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{Name: rt.Name, Namespace: rt.Namespace},
})
}
}
}

return requests
}

Expand Down Expand Up @@ -559,13 +605,14 @@ func (r *ServingRuntimeReconciler) clusterServingRuntimeRequests(csr *kserveapi.

// return nothing if can't get namespaces
if err := r.Client.List(context.TODO(), list); err != nil || len(list.Items) == 0 {
r.Log.Error(err, "Error listing namespaces to reconcile")
return []reconcile.Request{}
}
requests := make([]reconcile.Request, 0, len(list.Items))

for i := range list.Items {
ns := &list.Items[i]
mme, err := modelMeshEnabled2(context.TODO(), ns.Name, r.ControllerNamespace, r.Client, r.HasNamespaceAccess)
mme, err := modelMeshEnabled2(context.TODO(), ns.Name, r.ControllerNamespace, r.Client, r.ClusterScope)
if err == nil && mme {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: ns.Name,
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("Service"),
Scheme: mgr.GetScheme(),
ControllerDeployment: types.NamespacedName{Namespace: ControllerNamespace, Name: controllerDeploymentName},
NamespaceOwned: clusterScopeMode,
ClusterScope: clusterScopeMode,
MMServices: mmServiceMap,
ModelEventStream: modelEventStream,
ConfigProvider: cp,
Expand Down Expand Up @@ -418,7 +418,7 @@ func main() {
ConfigMapName: types.NamespacedName{Namespace: ControllerNamespace, Name: UserConfigMapName},
ControllerNamespace: ControllerNamespace,
ControllerName: controllerDeploymentName,
HasNamespaceAccess: clusterScopeMode,
ClusterScope: clusterScopeMode,
EnableCSRWatch: enableCSRWatch,
RegistryMap: registryMap,
}).SetupWithManager(mgr, enableIsvcWatch, runtimeControllerEvents); err != nil {
Expand Down

0 comments on commit da45d91

Please sign in to comment.