From 57cd35ee606b03592b9076a02f725cfd47a8d160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=94=A1=E6=9E=97=E5=B3=B010243534?= Date: Wed, 21 Sep 2022 10:43:09 +0800 Subject: [PATCH] make servicetopology filter in yurthub work properly when service or nodepool change --- .../templates/yurt-controller-manager.yaml | 34 ++ .../app/controllermanager.go | 24 +- cmd/yurt-controller-manager/app/core.go | 15 + .../servicetopology/adapter/adapter.go | 90 ++++++ .../adapter/endpoints_adapter.go | 86 +++++ .../adapter/endpointslicev1_adapter.go | 97 ++++++ .../adapter/endpointslicev1beta1_adapter.go | 98 ++++++ .../servicetopology/servicetopology.go | 304 ++++++++++++++++++ pkg/yurtctl/constants/constants.go | 34 ++ 9 files changed, 775 insertions(+), 7 deletions(-) create mode 100644 pkg/controller/servicetopology/adapter/adapter.go create mode 100644 pkg/controller/servicetopology/adapter/endpoints_adapter.go create mode 100644 pkg/controller/servicetopology/adapter/endpointslicev1_adapter.go create mode 100644 pkg/controller/servicetopology/adapter/endpointslicev1beta1_adapter.go create mode 100644 pkg/controller/servicetopology/servicetopology.go diff --git a/charts/openyurt/templates/yurt-controller-manager.yaml b/charts/openyurt/templates/yurt-controller-manager.yaml index 97098be8e92..9a81eb42e08 100644 --- a/charts/openyurt/templates/yurt-controller-manager.yaml +++ b/charts/openyurt/templates/yurt-controller-manager.yaml @@ -95,6 +95,40 @@ rules: - kubernetes.io/kubelet-serving verbs: - approve + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch + - patch + - apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch + - patch + - apiGroups: + - "apps.openyurt.io" + resources: + - nodepools + verbs: + - get + - list + - watch + - apiGroups: + - "" + resources: + - services + verbs: + - get + - list + - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/cmd/yurt-controller-manager/app/controllermanager.go b/cmd/yurt-controller-manager/app/controllermanager.go index 9fea3d8ca9c..88c52cada33 100644 --- a/cmd/yurt-controller-manager/app/controllermanager.go +++ b/cmd/yurt-controller-manager/app/controllermanager.go @@ -54,6 +54,8 @@ import ( "github.com/openyurtio/openyurt/cmd/yurt-controller-manager/app/options" yurtctrlmgrconfig "github.com/openyurtio/openyurt/pkg/controller/apis/config" "github.com/openyurtio/openyurt/pkg/projectinfo" + yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned" + yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" ) const ( @@ -204,6 +206,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { } controllerContext.InformerFactory.Start(controllerContext.Stop) + controllerContext.YurtInformerFactory.Start(controllerContext.Stop) close(controllerContext.InformersStarted) select {} @@ -259,6 +262,9 @@ type ControllerContext struct { // InformerFactory gives access to informers for the controller. InformerFactory informers.SharedInformerFactory + // YurtInformerFactory gives access to yurt informers for the controller. + YurtInformerFactory yurtinformers.SharedInformerFactory + // ComponentConfig provides access to init options for a given controller ComponentConfig yurtctrlmgrconfig.YurtControllerManagerConfiguration @@ -306,6 +312,7 @@ func NewControllerInitializers() map[string]InitFunc { controllers["nodelifecycle"] = startNodeLifecycleController controllers["yurtcsrapprover"] = startYurtCSRApproverController controllers["daemonpodupdater"] = startDaemonPodUpdaterController + controllers["servicetopologycontroller"] = startServiceTopologyController return controllers } @@ -314,8 +321,10 @@ func NewControllerInitializers() map[string]InitFunc { // the shared-informers client and token controller. func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) { versionedClient := rootClientBuilder.ClientOrDie("shared-informers") + kubeConfig := rootClientBuilder.ConfigOrDie("yurt-informers") + yurtClient := yurtclientset.NewForConfigOrDie(kubeConfig) sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()) - + yurtInformers := yurtinformers.NewSharedInformerFactory(yurtClient, ResyncPeriod(s)()) // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil { @@ -323,12 +332,13 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien } ctx := ControllerContext{ - ClientBuilder: clientBuilder, - InformerFactory: sharedInformers, - ComponentConfig: s.ComponentConfig, - Stop: stop, - InformersStarted: make(chan struct{}), - ResyncPeriod: ResyncPeriod(s), + ClientBuilder: clientBuilder, + InformerFactory: sharedInformers, + YurtInformerFactory: yurtInformers, + ComponentConfig: s.ComponentConfig, + Stop: stop, + InformersStarted: make(chan struct{}), + ResyncPeriod: ResyncPeriod(s), } return ctx, nil } diff --git a/cmd/yurt-controller-manager/app/core.go b/cmd/yurt-controller-manager/app/core.go index fce440de10d..95728245cac 100644 --- a/cmd/yurt-controller-manager/app/core.go +++ b/cmd/yurt-controller-manager/app/core.go @@ -28,6 +28,7 @@ import ( "github.com/openyurtio/openyurt/pkg/controller/certificates" daemonpodupdater "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" lifecyclecontroller "github.com/openyurtio/openyurt/pkg/controller/nodelifecycle" + "github.com/openyurtio/openyurt/pkg/controller/servicetopology" ) func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) { @@ -76,6 +77,20 @@ func startDaemonPodUpdaterController(ctx ControllerContext) (http.Handler, bool, ) go daemonPodUpdaterCtrl.Run(2, ctx.Stop) + return nil, true, nil +} +func startServiceTopologyController(ctx ControllerContext) (http.Handler, bool, error) { + clientSet := ctx.ClientBuilder.ClientOrDie("yurt-servicetopology-controller") + + svcTopologyController, err := servicetopology.NewServiceTopologyController( + clientSet, + ctx.InformerFactory, + ctx.YurtInformerFactory, + ) + if err != nil { + return nil, false, err + } + go svcTopologyController.Run(ctx.Stop) return nil, true, nil } diff --git a/pkg/controller/servicetopology/adapter/adapter.go b/pkg/controller/servicetopology/adapter/adapter.go new file mode 100644 index 00000000000..be98f834047 --- /dev/null +++ b/pkg/controller/servicetopology/adapter/adapter.go @@ -0,0 +1,90 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package adapter + +import ( + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" +) + +type Adapter interface { + GetEnqueueKeysBySvc(svc *corev1.Service) []string + GetEnqueueKeysByNodePool(svcTopologyTypes map[string]string, allNpNodes sets.String) []string + UpdateTriggerAnnotations(namespace, name string) error +} + +func getSvcSelector(key, value string) labels.Selector { + return labels.SelectorFromSet( + map[string]string{ + key: value, + }, + ) +} + +func appendKeys(keys []string, obj interface{}) []string { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return nil + } + keys = append(keys, key) + return keys +} + +func isNodePoolTypeSvc(namespace, name string, svcTopologyTypes map[string]string) bool { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } + key, err := cache.MetaNamespaceKeyFunc(svc) + if err != nil { + runtime.HandleError(err) + return false + } + + return isNodePoolTypeTopology(svcTopologyTypes[key]) +} + +// TODO: if service topology need to support multi types, +// like openyurt.io/topologyKeys: "kubernetes.io/hostname, openyurt.io/nodepool, *". +// For simplicity,as long as the value of service topology annotation contains nodepool type, +// then this topology is recognized as nodepool type +func isNodePoolTypeTopology(topologyType string) bool { + if topologyType == servicetopology.AnnotationServiceTopologyValueNodePool { + return true + } + if topologyType == servicetopology.AnnotationServiceTopologyValueZone { + return true + } + return false +} + +func getUpdateTriggerPatch() []byte { + patch := fmt.Sprintf(`{"metadata":{"annotations": {"openyurt.io/update-trigger": "%d"}}}`, time.Now().Unix()) + return []byte(patch) +} diff --git a/pkg/controller/servicetopology/adapter/endpoints_adapter.go b/pkg/controller/servicetopology/adapter/endpoints_adapter.go new file mode 100644 index 00000000000..cbb44d71c89 --- /dev/null +++ b/pkg/controller/servicetopology/adapter/endpoints_adapter.go @@ -0,0 +1,86 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package adapter + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" +) + +func NewEndpointsAdapter(client kubernetes.Interface, epLister corelisters.EndpointsLister) Adapter { + return &endpoints{ + client: client, + epLister: epLister, + } +} + +type endpoints struct { + client kubernetes.Interface + epLister corelisters.EndpointsLister +} + +func (s *endpoints) GetEnqueueKeysBySvc(svc *corev1.Service) []string { + var keys []string + return appendKeys(keys, svc) +} + +func (s *endpoints) GetEnqueueKeysByNodePool(svcTopologyTypes map[string]string, allNpNodes sets.String) []string { + var keys []string + endpointsList, err := s.epLister.List(labels.Everything()) + if err != nil { + klog.V(4).Infof("Error listing endpoints sets: %v", err) + return keys + } + + for _, ep := range endpointsList { + if !isNodePoolTypeSvc(ep.Namespace, ep.Name, svcTopologyTypes) { + continue + } + + if s.getNodesInEp(ep).Intersection(allNpNodes).Len() == 0 { + continue + } + keys = appendKeys(keys, ep) + } + return keys +} + +func (s *endpoints) getNodesInEp(ep *corev1.Endpoints) sets.String { + nodes := sets.NewString() + for _, subset := range ep.Subsets { + for _, addr := range subset.Addresses { + if addr.NodeName != nil { + nodes.Insert(*addr.NodeName) + } + } + } + return nodes +} + +func (s *endpoints) UpdateTriggerAnnotations(namespace, name string) error { + patch := getUpdateTriggerPatch() + _, err := s.client.CoreV1().Endpoints(namespace).Patch(context.Background(), name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) + return err +} diff --git a/pkg/controller/servicetopology/adapter/endpointslicev1_adapter.go b/pkg/controller/servicetopology/adapter/endpointslicev1_adapter.go new file mode 100644 index 00000000000..6f00c4c0e8a --- /dev/null +++ b/pkg/controller/servicetopology/adapter/endpointslicev1_adapter.go @@ -0,0 +1,97 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package adapter + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + discorveryV1listers "k8s.io/client-go/listers/discovery/v1" + "k8s.io/klog/v2" +) + +func NewEndpointsV1Adapter(client kubernetes.Interface, epSliceLister discorveryV1listers.EndpointSliceLister) Adapter { + return &endpointslicev1{ + client: client, + epSliceLister: epSliceLister, + } +} + +type endpointslicev1 struct { + client kubernetes.Interface + epSliceLister discorveryV1listers.EndpointSliceLister +} + +func (s *endpointslicev1) GetEnqueueKeysBySvc(svc *corev1.Service) []string { + var keys []string + selector := getSvcSelector(discoveryv1.LabelServiceName, svc.Name) + epSliceList, err := s.epSliceLister.EndpointSlices(svc.Namespace).List(selector) + if err != nil { + klog.V(4).Infof("Error listing endpointslices sets: %v", err) + return keys + } + + for _, epSlice := range epSliceList { + keys = appendKeys(keys, epSlice) + } + return keys +} + +func (s *endpointslicev1) GetEnqueueKeysByNodePool(svcTopologyTypes map[string]string, allNpNodes sets.String) []string { + var keys []string + epSliceList, err := s.epSliceLister.List(labels.Everything()) + if err != nil { + klog.V(4).Infof("Error listing endpointslices sets: %v", err) + return keys + } + + for _, epSlice := range epSliceList { + svcNamespace := epSlice.Namespace + svcName := epSlice.Labels[discoveryv1.LabelServiceName] + if !isNodePoolTypeSvc(svcNamespace, svcName, svcTopologyTypes) { + continue + } + if s.getNodesInEpSlice(epSlice).Intersection(allNpNodes).Len() == 0 { + continue + } + keys = appendKeys(keys, epSlice) + } + + return keys +} + +func (s *endpointslicev1) getNodesInEpSlice(epSlice *discoveryv1.EndpointSlice) sets.String { + nodes := sets.NewString() + for _, ep := range epSlice.Endpoints { + if ep.NodeName != nil { + nodes.Insert(*ep.NodeName) + } + } + return nodes +} + +func (s *endpointslicev1) UpdateTriggerAnnotations(namespace, name string) error { + patch := getUpdateTriggerPatch() + _, err := s.client.DiscoveryV1().EndpointSlices(namespace).Patch(context.Background(), name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) + return err +} diff --git a/pkg/controller/servicetopology/adapter/endpointslicev1beta1_adapter.go b/pkg/controller/servicetopology/adapter/endpointslicev1beta1_adapter.go new file mode 100644 index 00000000000..c586b23fc82 --- /dev/null +++ b/pkg/controller/servicetopology/adapter/endpointslicev1beta1_adapter.go @@ -0,0 +1,98 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package adapter + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + discorveryV1beta1listers "k8s.io/client-go/listers/discovery/v1beta1" + "k8s.io/klog/v2" +) + +func NewEndpointsV1beta1Adapter(client kubernetes.Interface, epSliceLister discorveryV1beta1listers.EndpointSliceLister) Adapter { + return &endpointslicev1beta1{ + client: client, + epSliceLister: epSliceLister, + } +} + +type endpointslicev1beta1 struct { + client kubernetes.Interface + epSliceLister discorveryV1beta1listers.EndpointSliceLister +} + +func (s *endpointslicev1beta1) GetEnqueueKeysBySvc(svc *corev1.Service) []string { + var keys []string + selector := getSvcSelector(discoveryv1beta1.LabelServiceName, svc.Name) + epSliceList, err := s.epSliceLister.EndpointSlices(svc.Namespace).List(selector) + if err != nil { + klog.V(4).Infof("Error listing endpointslices sets: %v", err) + return keys + } + + for _, epSlice := range epSliceList { + keys = appendKeys(keys, epSlice) + } + return keys +} + +func (s *endpointslicev1beta1) GetEnqueueKeysByNodePool(svcTopologyTypes map[string]string, allNpNodes sets.String) []string { + var keys []string + epSliceList, err := s.epSliceLister.List(labels.Everything()) + if err != nil { + klog.V(4).Infof("Error listing endpointslices sets: %v", err) + return keys + } + + for _, epSlice := range epSliceList { + svcNamespace := epSlice.Namespace + svcName := epSlice.Labels[discoveryv1beta1.LabelServiceName] + if !isNodePoolTypeSvc(svcNamespace, svcName, svcTopologyTypes) { + continue + } + if s.getNodesInEpSlice(epSlice).Intersection(allNpNodes).Len() == 0 { + continue + } + keys = appendKeys(keys, epSlice) + } + + return keys +} + +func (s *endpointslicev1beta1) getNodesInEpSlice(epSlice *discoveryv1beta1.EndpointSlice) sets.String { + nodes := sets.NewString() + for _, ep := range epSlice.Endpoints { + nodeName, ok := ep.Topology[corev1.LabelHostname] + if ok { + nodes.Insert(nodeName) + } + } + return nodes +} + +func (s *endpointslicev1beta1) UpdateTriggerAnnotations(namespace, name string) error { + patch := getUpdateTriggerPatch() + _, err := s.client.DiscoveryV1beta1().EndpointSlices(namespace).Patch(context.Background(), name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) + return err +} diff --git a/pkg/controller/servicetopology/servicetopology.go b/pkg/controller/servicetopology/servicetopology.go new file mode 100644 index 00000000000..774a4037588 --- /dev/null +++ b/pkg/controller/servicetopology/servicetopology.go @@ -0,0 +1,304 @@ +/* +Copyright 2022 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package servicetopology + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/controller/servicetopology/adapter" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" + nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" + yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" +) + +const ( + endpointsWorkerSize = 2 + endpointsliceWorkerSize = 4 +) + +type ServiceTopologyController struct { + endpointsQueue workqueue.RateLimitingInterface + endpointsliceQueue workqueue.RateLimitingInterface + serviceInformerSynced cache.InformerSynced + nodepoolInformerSynced cache.InformerSynced + endpointsAdapter adapter.Adapter + endpointsliceAdapter adapter.Adapter + svcLister corelisters.ServiceLister +} + +func NewServiceTopologyController(client kubernetes.Interface, + sharedInformers informers.SharedInformerFactory, + yurtInformers yurtinformers.SharedInformerFactory) (*ServiceTopologyController, error) { + epSliceAdapter, err := getEndpointSliceAdapter(client, sharedInformers) + if err != nil { + return nil, fmt.Errorf("get endpointslice adapter with error: %v", err) + } + endpointsLister := sharedInformers.Core().V1().Endpoints().Lister() + epAdapter := adapter.NewEndpointsAdapter(client, endpointsLister) + + epQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + epSliceQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + + serviceInformer := sharedInformers.Core().V1().Services() + nodepoolInformer := yurtInformers.Apps().V1alpha1().NodePools() + sc := &ServiceTopologyController{ + endpointsQueue: epQueue, + endpointsliceQueue: epSliceQueue, + serviceInformerSynced: serviceInformer.Informer().HasSynced, + nodepoolInformerSynced: nodepoolInformer.Informer().HasSynced, + svcLister: serviceInformer.Lister(), + endpointsAdapter: epAdapter, + endpointsliceAdapter: epSliceAdapter, + } + serviceInformer.Informer().AddEventHandler(sc.getServiceHandler()) + nodepoolInformer.Informer().AddEventHandler(sc.getNodepoolHandler()) + return sc, nil +} + +func (s *ServiceTopologyController) Run(stopCh <-chan struct{}) { + defer runtime.HandleCrash() + defer s.endpointsQueue.ShutDown() + defer s.endpointsliceQueue.ShutDown() + + klog.Info("starting the service topology controller") + defer klog.Info("stopping the service topology controller") + if !cache.WaitForCacheSync(stopCh, s.serviceInformerSynced, s.nodepoolInformerSynced) { + klog.Error("sync service topology controller timeout") + return + } + for i := 0; i < endpointsWorkerSize; i++ { + go wait.Until(s.runEndpointsWorker, time.Second, stopCh) + } + for i := 0; i < endpointsliceWorkerSize; i++ { + go wait.Until(s.runEndpointsliceWorker, time.Second, stopCh) + } + <-stopCh +} + +func (s *ServiceTopologyController) runEndpointsWorker() { + for s.processNextEndpoints() { + } +} + +func (s *ServiceTopologyController) runEndpointsliceWorker() { + for s.processNextEndpointslice() { + } +} + +func (s *ServiceTopologyController) getServiceHandler() cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + oldSvc, ok := old.(*corev1.Service) + if !ok { + return + } + newSvc, ok := new.(*corev1.Service) + if !ok { + return + } + if s.isServiceTopologyTypeChanged(oldSvc, newSvc) { + s.enqueueEndpointsForSvc(newSvc) + s.enqueueEndpointsilceForSvc(newSvc) + } + }, + } +} + +func (s *ServiceTopologyController) getNodepoolHandler() cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + nodePool, ok := obj.(*nodepoolv1alpha1.NodePool) + if !ok { + return + } + klog.Infof("nodepool %s is deleted", nodePool.Name) + allNpNodes := sets.NewString(nodePool.Status.Nodes...) + svcTopologyTypes := s.getSvcTopologyTypes() + s.enqueueEndpointsForNodePool(svcTopologyTypes, allNpNodes, nodePool) + s.enqueueEndpointsliceForNodePool(svcTopologyTypes, allNpNodes, nodePool) + }, + UpdateFunc: func(old, new interface{}) { + oldNp, ok := old.(*nodepoolv1alpha1.NodePool) + if !ok { + return + } + newNp, ok := new.(*nodepoolv1alpha1.NodePool) + if !ok { + return + } + newNpNodes := sets.NewString(newNp.Status.Nodes...) + oldNpNodes := sets.NewString(oldNp.Status.Nodes...) + if newNpNodes.Equal(oldNpNodes) { + return + } + klog.Infof("the nodes record of nodepool %s is changed from %v to %v.", newNp.Name, oldNp.Status.Nodes, newNp.Status.Nodes) + allNpNodes := newNpNodes.Union(oldNpNodes) + svcTopologyTypes := s.getSvcTopologyTypes() + s.enqueueEndpointsForNodePool(svcTopologyTypes, allNpNodes, newNp) + s.enqueueEndpointsliceForNodePool(svcTopologyTypes, allNpNodes, newNp) + }, + } +} + +func (s *ServiceTopologyController) enqueueEndpointsForSvc(newSvc *corev1.Service) { + keys := s.endpointsAdapter.GetEnqueueKeysBySvc(newSvc) + klog.Infof("the topology configuration of svc %s/%s is changed, enqueue endpoints: %v", newSvc.Namespace, newSvc.Name, keys) + for _, key := range keys { + s.endpointsQueue.AddRateLimited(key) + } +} + +func (s *ServiceTopologyController) enqueueEndpointsilceForSvc(newSvc *corev1.Service) { + keys := s.endpointsliceAdapter.GetEnqueueKeysBySvc(newSvc) + klog.Infof("the topology configuration of svc %s/%s is changed, enqueue endpointslices: %v", newSvc.Namespace, newSvc.Name, keys) + for _, key := range keys { + s.endpointsliceQueue.AddRateLimited(key) + } +} + +func (s *ServiceTopologyController) enqueueEndpointsForNodePool(svcTopologyTypes map[string]string, allNpNodes sets.String, np *nodepoolv1alpha1.NodePool) { + keys := s.endpointsAdapter.GetEnqueueKeysByNodePool(svcTopologyTypes, allNpNodes) + klog.Infof("according to the change of the nodepool %s, enqueue endpoints: %v", np.Name, keys) + for _, key := range keys { + s.endpointsQueue.AddRateLimited(key) + } +} + +func (s *ServiceTopologyController) enqueueEndpointsliceForNodePool(svcTopologyTypes map[string]string, allNpNodes sets.String, np *nodepoolv1alpha1.NodePool) { + keys := s.endpointsliceAdapter.GetEnqueueKeysByNodePool(svcTopologyTypes, allNpNodes) + klog.Infof("according to the change of the nodepool %s, enqueue endpointslice: %v", np.Name, keys) + for _, key := range keys { + s.endpointsliceQueue.AddRateLimited(key) + } +} + +func (s *ServiceTopologyController) isServiceTopologyTypeChanged(oldSvc, newSvc *corev1.Service) bool { + oldType := oldSvc.Annotations[servicetopology.AnnotationServiceTopologyKey] + newType := newSvc.Annotations[servicetopology.AnnotationServiceTopologyKey] + if oldType == newType { + return false + } + return true +} + +func (s *ServiceTopologyController) getSvcTopologyTypes() map[string]string { + svcTopologyTypes := make(map[string]string) + svcList, err := s.svcLister.List(labels.Everything()) + if err != nil { + klog.V(4).Infof("Error listing service sets: %v", err) + return svcTopologyTypes + } + for _, svc := range svcList { + topologyType, ok := svc.Annotations[servicetopology.AnnotationServiceTopologyKey] + if !ok { + continue + } + + key, err := cache.MetaNamespaceKeyFunc(svc) + if err != nil { + runtime.HandleError(err) + continue + } + svcTopologyTypes[key] = topologyType + } + return svcTopologyTypes +} + +func (s *ServiceTopologyController) processNextEndpoints() bool { + key, quit := s.endpointsQueue.Get() + if quit { + return false + } + defer s.endpointsQueue.Done(key) + + klog.V(4).Infof("sync endpoints %s", key) + if err := s.syncEndpoints(key.(string)); err != nil { + s.endpointsQueue.AddRateLimited(key) + runtime.HandleError(fmt.Errorf("sync endpoints %v failed with : %v", key, err)) + return true + } + s.endpointsQueue.Forget(key) + return true +} + +func (s *ServiceTopologyController) syncEndpoints(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("split meta namespace key with error: %v", err) + } + + return s.endpointsAdapter.UpdateTriggerAnnotations(namespace, name) +} + +func (s *ServiceTopologyController) processNextEndpointslice() bool { + key, quit := s.endpointsliceQueue.Get() + if quit { + return false + } + defer s.endpointsliceQueue.Done(key) + + klog.V(4).Infof("sync endpointslice %s", key) + if err := s.syncEndpointslice(key.(string)); err != nil { + s.endpointsliceQueue.AddRateLimited(key) + runtime.HandleError(fmt.Errorf("sync endpointslice %v failed with : %v", key, err)) + return true + } + s.endpointsliceQueue.Forget(key) + return true +} + +func (s *ServiceTopologyController) syncEndpointslice(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("split meta namespace key with error: %v", err) + } + + return s.endpointsliceAdapter.UpdateTriggerAnnotations(namespace, name) +} + +func getEndpointSliceAdapter(client kubernetes.Interface, + sharedInformers informers.SharedInformerFactory) (adapter.Adapter, error) { + _, err := client.DiscoveryV1().EndpointSlices(corev1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}) + + if err == nil { + klog.Infof("v1.EndpointSlice is supported.") + epSliceLister := sharedInformers.Discovery().V1().EndpointSlices().Lister() + return adapter.NewEndpointsV1Adapter(client, epSliceLister), nil + } + if errors.IsNotFound(err) { + klog.Infof("fall back to v1beta1.EndpointSlice.") + epSliceLister := sharedInformers.Discovery().V1beta1().EndpointSlices().Lister() + return adapter.NewEndpointsV1beta1Adapter(client, epSliceLister), nil + } + return nil, err +} diff --git a/pkg/yurtctl/constants/constants.go b/pkg/yurtctl/constants/constants.go index 99ec9624f80..1a49d7436c2 100644 --- a/pkg/yurtctl/constants/constants.go +++ b/pkg/yurtctl/constants/constants.go @@ -138,6 +138,40 @@ rules: - signers verbs: - approve +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch + - patch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch + - patch +- apiGroups: + - "apps.openyurt.io" + resources: + - nodepools + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - services + verbs: + - get + - list + - watch ` YurtControllerManagerClusterRoleBinding = ` apiVersion: rbac.authorization.k8s.io/v1