From 476cfa4c48abdaf8a8fb7c90614f9b12a6a9d7f9 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sat, 14 Jan 2023 17:59:20 +0800 Subject: [PATCH 1/8] Fix: add PoolScopeResource validation and dynamic configuration support --- .../openyurt/templates/pool-coordinator.yaml | 2 +- pkg/yurthub/kubernetes/meta/restmapper.go | 2 +- .../poolcoordinator/constants/constants.go | 7 - pkg/yurthub/poolcoordinator/coordinator.go | 10 +- .../poolcoordinator/resources/resources.go | 167 ++++++++++++++++++ .../verifiable_pool_scope_resource.go | 20 +++ pkg/yurthub/proxy/util/util.go | 15 +- pkg/yurthub/storage/etcd/keycache.go | 3 +- pkg/yurthub/util/util.go | 5 +- 9 files changed, 205 insertions(+), 26 deletions(-) create mode 100644 pkg/yurthub/poolcoordinator/resources/resources.go create mode 100644 pkg/yurthub/poolcoordinator/resources/verifiable_pool_scope_resource.go diff --git a/charts/openyurt/templates/pool-coordinator.yaml b/charts/openyurt/templates/pool-coordinator.yaml index 82f6bc667b8..d3d635d01fd 100644 --- a/charts/openyurt/templates/pool-coordinator.yaml +++ b/charts/openyurt/templates/pool-coordinator.yaml @@ -139,7 +139,7 @@ spec: - --listen-client-urls=https://0.0.0.0:{{ .Values.poolCoordinator.etcdPort }} - --cert-file=/etc/kubernetes/pki/etcd-server.crt - --client-cert-auth=true - - --max-txn-ops=1000 + - --max-txn-ops=102400 - --data-dir=/var/lib/etcd - --max-request-bytes=100000000 - --key-file=/etc/kubernetes/pki/etcd-server.key diff --git a/pkg/yurthub/kubernetes/meta/restmapper.go b/pkg/yurthub/kubernetes/meta/restmapper.go index e3354466cae..7a2bf8102ed 100644 --- a/pkg/yurthub/kubernetes/meta/restmapper.go +++ b/pkg/yurthub/kubernetes/meta/restmapper.go @@ -77,7 +77,7 @@ func NewDefaultRESTMapperFromScheme() *meta.DefaultRESTMapper { } func NewRESTMapperManager(baseDir string) (*RESTMapperManager, error) { - var dm map[schema.GroupVersionResource]schema.GroupVersionKind + dm := make(map[schema.GroupVersionResource]schema.GroupVersionKind) cachedFilePath := filepath.Join(baseDir, CacheDynamicRESTMapperKey) // Recover the mapping relationship between GVR and GVK from the hard disk storage := fs.FileSystemOperator{} diff --git a/pkg/yurthub/poolcoordinator/constants/constants.go b/pkg/yurthub/poolcoordinator/constants/constants.go index 70c044bb3c9..43924dd50b5 100644 --- a/pkg/yurthub/poolcoordinator/constants/constants.go +++ b/pkg/yurthub/poolcoordinator/constants/constants.go @@ -17,17 +17,10 @@ limitations under the License. package constants import ( - "k8s.io/apimachinery/pkg/runtime/schema" - "github.com/openyurtio/openyurt/pkg/yurthub/storage" ) var ( - PoolScopedResources = map[schema.GroupVersionResource]struct{}{ - {Group: "", Version: "v1", Resource: "endpoints"}: {}, - {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}: {}, - } - UploadResourcesKeyBuildInfo = map[storage.KeyBuildInfo]struct{}{ {Component: "kubelet", Resources: "pods", Group: "", Version: "v1"}: {}, {Component: "kubelet", Resources: "nodes", Group: "", Version: "v1"}: {}, diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 9b31e04fc2b..083ba480ea2 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "strconv" "sync" "time" @@ -169,6 +170,10 @@ func NewCoordinator( if err != nil { return nil, fmt.Errorf("failed to create proxied client, %v", err) } + + // init pool scope resources + resources.InitPoolScopeResourcesManger(proxiedClient, cfg.SharedFactory) + poolScopedCacheSyncManager := &poolScopedCacheSyncManager{ ctx: ctx, proxiedClient: proxiedClient, @@ -185,6 +190,9 @@ func NewCoordinator( } func (coordinator *coordinator) Run() { + // waiting for pool scope resource synced + resources.WaitUntilPoolScopeResourcesSync(coordinator.ctx) + for { var poolCacheManager cachemanager.CacheManager var cancelEtcdStorage = func() {} @@ -427,7 +435,7 @@ func (p *poolScopedCacheSyncManager) EnsureStart() error { ctx, cancel := context.WithCancel(p.ctx) hasInformersSynced := []cache.InformerSynced{} informerFactory := informers.NewSharedInformerFactory(p.proxiedClient, 0) - for gvr := range constants.PoolScopedResources { + for _, gvr := range resources.GetPoolScopeResources() { klog.Infof("coordinator informer with resources gvr %+v registered", gvr) informer, err := informerFactory.ForResource(gvr) if err != nil { diff --git a/pkg/yurthub/poolcoordinator/resources/resources.go b/pkg/yurthub/poolcoordinator/resources/resources.go new file mode 100644 index 00000000000..042581f3fd1 --- /dev/null +++ b/pkg/yurthub/poolcoordinator/resources/resources.go @@ -0,0 +1,167 @@ +package resources + +import ( + "context" + "encoding/json" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +type PoolScopeResourcesManger struct { + factory informers.SharedInformerFactory + validPoolScopedResources map[string]*verifiablePoolScopeResource + validPoolScopedResourcesLock sync.RWMutex + k8sClient kubernetes.Interface + hasSynced func() bool +} + +var poolScopeResourcesManger *PoolScopeResourcesManger + +func InitPoolScopeResourcesManger(client kubernetes.Interface, factory informers.SharedInformerFactory) *PoolScopeResourcesManger { + poolScopeResourcesManger = &PoolScopeResourcesManger{ + factory: factory, + k8sClient: client, + validPoolScopedResources: make(map[string]*verifiablePoolScopeResource), + } + configmapInformer := factory.Core().V1().ConfigMaps().Informer() + configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: poolScopeResourcesManger.addConfigmap, + // todo: now we do not support update of pool scope resources definition + }) + poolScopeResourcesManger.hasSynced = configmapInformer.HasSynced + + klog.Infof("init pool scope resources manager") + + poolScopeResourcesManger.setVerifiableGVRs(poolScopeResourcesManger.getInitPoolScopeResources()...) + return poolScopeResourcesManger +} + +func WaitUntilPoolScopeResourcesSync(ctx context.Context) { + cache.WaitForCacheSync(ctx.Done(), poolScopeResourcesManger.hasSynced) +} + +func IsPoolScopeResources(info *apirequest.RequestInfo) bool { + if info == nil || poolScopeResourcesManger == nil { + return false + } + _, ok := poolScopeResourcesManger.validPoolScopedResources[schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + }.String()] + return ok +} + +func GetPoolScopeResources() []schema.GroupVersionResource { + if poolScopeResourcesManger == nil { + return []schema.GroupVersionResource{} + } + return poolScopeResourcesManger.getPoolScopeResources() +} + +func (m *PoolScopeResourcesManger) getPoolScopeResources() []schema.GroupVersionResource { + poolScopeResources := make([]schema.GroupVersionResource, 0) + m.validPoolScopedResourcesLock.RLock() + defer m.validPoolScopedResourcesLock.RUnlock() + for _, v := range m.validPoolScopedResources { + poolScopeResources = append(poolScopeResources, v.GroupVersionResource) + } + return poolScopeResources +} + +// addVerifiableGVRs add given gvrs to validPoolScopedResources map +func (m *PoolScopeResourcesManger) addVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) { + m.validPoolScopedResourcesLock.Lock() + defer m.validPoolScopedResourcesLock.Unlock() + for _, gvr := range gvrs { + if ok, errMsg := gvr.Verify(); ok { + m.validPoolScopedResources[gvr.String()] = gvr + klog.Infof("PoolScopeResourcesManger add gvr %s success", gvr.String()) + } else { + klog.Warningf("PoolScopeResourcesManger add gvr %s failed, because %s", gvr.String(), errMsg) + } + } +} + +// addVerifiableGVRs clear validPoolScopedResources and set given gvrs to validPoolScopedResources map +func (m *PoolScopeResourcesManger) setVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) { + m.validPoolScopedResourcesLock.Lock() + defer m.validPoolScopedResourcesLock.Unlock() + m.validPoolScopedResources = make(map[string]*verifiablePoolScopeResource) + for _, gvr := range gvrs { + if ok, errMsg := gvr.Verify(); ok { + m.validPoolScopedResources[gvr.String()] = gvr + klog.Infof("PoolScopeResourcesManger update gvr %s success", gvr.String()) + } else { + klog.Warningf("PoolScopeResourcesManger update gvr %s failed, because %s", gvr.String(), errMsg) + } + } +} + +func (m *PoolScopeResourcesManger) addConfigmap(obj interface{}) { + cfg, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + + poolScopeResources := cfg.Data[util.PoolScopeResourcesKey] + poolScopeResourcesGVRs := make([]schema.GroupVersionResource, 0) + verifiablePoolScopeResourcesGVRs := make([]*verifiablePoolScopeResource, 0) + if err := json.Unmarshal([]byte(poolScopeResources), &poolScopeResourcesGVRs); err != nil { + klog.Errorf("PoolScopeResourcesManger unmarshal poolScopeResources %s failed with error = %s", + poolScopeResourcesGVRs, err.Error()) + return + } + klog.Infof("PoolScopeResourcesManger add configured pool scope resources %v", poolScopeResourcesGVRs) + for _, v := range poolScopeResourcesGVRs { + verifiablePoolScopeResourcesGVRs = append(verifiablePoolScopeResourcesGVRs, + newVerifiablePoolScopeResource(v, m.getGroupVersionVerifyFunction(m.k8sClient))) + } + m.addVerifiableGVRs(verifiablePoolScopeResourcesGVRs...) +} + +func (m *PoolScopeResourcesManger) getGroupVersionVerifyFunction(client kubernetes.Interface) func(gvr schema.GroupVersionResource) (bool, string) { + return func(gvr schema.GroupVersionResource) (bool, string) { + maxRetry := 3 + duration := time.Second * 5 + counter := 0 + var err error + for counter <= maxRetry { + if _, err = client.Discovery().ServerResourcesForGroupVersion(gvr.GroupVersion().String()); err == nil { + return true, "" // gvr found + } + if apierrors.IsNotFound(err) { + return false, err.Error() // gvr not found + } + // unexpected error, retry + counter++ + time.Sleep(duration) + } + return false, err.Error() + } +} + +func (m *PoolScopeResourcesManger) getInitPoolScopeResources() []*verifiablePoolScopeResource { + return []*verifiablePoolScopeResource{ + newVerifiablePoolScopeResource( + schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"}, + m.getGroupVersionVerifyFunction(m.k8sClient)), + newVerifiablePoolScopeResource( + schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + m.getGroupVersionVerifyFunction(m.k8sClient)), + newVerifiablePoolScopeResource( + schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1beta1", Resource: "endpointslices"}, + m.getGroupVersionVerifyFunction(m.k8sClient)), + } +} diff --git a/pkg/yurthub/poolcoordinator/resources/verifiable_pool_scope_resource.go b/pkg/yurthub/poolcoordinator/resources/verifiable_pool_scope_resource.go new file mode 100644 index 00000000000..18ca63b5099 --- /dev/null +++ b/pkg/yurthub/poolcoordinator/resources/verifiable_pool_scope_resource.go @@ -0,0 +1,20 @@ +package resources + +import "k8s.io/apimachinery/pkg/runtime/schema" + +type verifiablePoolScopeResource struct { + schema.GroupVersionResource + checkFunction func(gvr schema.GroupVersionResource) (bool, string) +} + +func newVerifiablePoolScopeResource(gvr schema.GroupVersionResource, + checkFunction func(gvr schema.GroupVersionResource) (bool, string)) *verifiablePoolScopeResource { + return &verifiablePoolScopeResource{ + GroupVersionResource: gvr, + checkFunction: checkFunction, + } +} + +func (v *verifiablePoolScopeResource) Verify() (bool, string) { + return v.checkFunction(v.GroupVersionResource) +} diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index 76632db1605..a86486620b0 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -40,6 +40,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/metrics" + "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -181,7 +182,7 @@ func WithIfPoolScopedResource(handler http.Handler) http.Handler { ctx := req.Context() if info, ok := apirequest.RequestInfoFrom(ctx); ok { var ifPoolScopedResource bool - if info.IsResourceRequest && isPoolScopedResource(info) { + if info.IsResourceRequest && resources.IsPoolScopeResources(info) { ifPoolScopedResource = true } ctx = util.WithIfPoolScopedResource(ctx, ifPoolScopedResource) @@ -191,18 +192,6 @@ func WithIfPoolScopedResource(handler http.Handler) http.Handler { }) } -func isPoolScopedResource(info *apirequest.RequestInfo) bool { - if info != nil { - if info.APIGroup == "" && info.APIVersion == "v1" && info.Resource == "endpoints" { - return true - } - if info.APIGroup == "discovery.k8s.io" && info.APIVersion == "v1" && info.Resource == "endpointslices" { - return true - } - } - return false -} - type wrapperResponseWriter struct { http.ResponseWriter http.Flusher diff --git a/pkg/yurthub/storage/etcd/keycache.go b/pkg/yurthub/storage/etcd/keycache.go index df670ebca7e..4e1ceb3cdb8 100644 --- a/pkg/yurthub/storage/etcd/keycache.go +++ b/pkg/yurthub/storage/etcd/keycache.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "fmt" + "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "strings" "sync" @@ -122,7 +123,7 @@ func (c *componentKeyCache) Recover() error { func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) { keys := &keySet{m: map[storageKey]struct{}{}} - for gvr := range coordinatorconstants.PoolScopedResources { + for _, gvr := range resources.GetPoolScopeResources() { getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) defer cancel() rootKey, err := c.keyFunc(storage.KeyBuildInfo{ diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 93f404a343e..7dd29cb08d0 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -82,8 +82,9 @@ const ( // DefaultPoolCoordinatorAPIServerSvcPort represents default pool coordinator apiServer port DefaultPoolCoordinatorAPIServerSvcPort = "443" - YurtHubNamespace = "kube-system" - CacheUserAgentsKey = "cache_agents" + YurtHubNamespace = "kube-system" + CacheUserAgentsKey = "cache_agents" + PoolScopeResourcesKey = "pool_scope_resources" YurtHubProxyPort = "10261" YurtHubPort = "10267" From 9542627ab57c689cfb28b5ef997abfada80c7f67 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sat, 14 Jan 2023 18:04:25 +0800 Subject: [PATCH 2/8] Fix: format --- pkg/yurthub/poolcoordinator/coordinator.go | 2 +- pkg/yurthub/storage/etcd/keycache.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 083ba480ea2..22f110a42cf 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "strconv" "sync" "time" @@ -48,6 +47,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/metrics" "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/certmanager" "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants" + "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/storage/etcd" "github.com/openyurtio/openyurt/pkg/yurthub/transport" diff --git a/pkg/yurthub/storage/etcd/keycache.go b/pkg/yurthub/storage/etcd/keycache.go index 4e1ceb3cdb8..57863948ead 100644 --- a/pkg/yurthub/storage/etcd/keycache.go +++ b/pkg/yurthub/storage/etcd/keycache.go @@ -20,13 +20,13 @@ import ( "bytes" "context" "fmt" - "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "strings" "sync" clientv3 "go.etcd.io/etcd/client/v3" coordinatorconstants "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants" + "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources" "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/util/fs" ) From 90fdcbe9db1b8ed9453459ad2f7ecde2b5246fa3 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sat, 14 Jan 2023 21:55:33 +0800 Subject: [PATCH 3/8] Fix: change proxy client to dynamic client to support crd resouce in pool scope --- pkg/yurthub/poolcoordinator/coordinator.go | 39 ++++++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 22f110a42cf..ea4526d2abb 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "k8s.io/klog/v2" "strconv" "sync" "time" @@ -30,13 +31,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" @@ -174,9 +176,14 @@ func NewCoordinator( // init pool scope resources resources.InitPoolScopeResourcesManger(proxiedClient, cfg.SharedFactory) + dynamicClient, err := buildDynamicClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client, %v", err) + } + poolScopedCacheSyncManager := &poolScopedCacheSyncManager{ ctx: ctx, - proxiedClient: proxiedClient, + dynamicClient: dynamicClient, coordinatorClient: coordinatorClient, nodeName: cfg.NodeName, getEtcdStore: coordinator.getEtcdStore, @@ -406,8 +413,8 @@ func (coordinator *coordinator) delegateNodeLease(cloudLeaseClient coordclientse type poolScopedCacheSyncManager struct { ctx context.Context isRunning bool - // proxiedClient is a client of Cloud APIServer which is proxied by yurthub. - proxiedClient kubernetes.Interface + // dynamicClient is a dynamic client of Cloud APIServer which is proxied by yurthub. + dynamicClient dynamic.Interface // coordinatorClient is a client of APIServer in pool-coordinator. coordinatorClient kubernetes.Interface // nodeName will be used to update the ownerReference of informer synced lease. @@ -434,18 +441,14 @@ func (p *poolScopedCacheSyncManager) EnsureStart() error { ctx, cancel := context.WithCancel(p.ctx) hasInformersSynced := []cache.InformerSynced{} - informerFactory := informers.NewSharedInformerFactory(p.proxiedClient, 0) + dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynamicClient, 0, metav1.NamespaceAll, nil) for _, gvr := range resources.GetPoolScopeResources() { klog.Infof("coordinator informer with resources gvr %+v registered", gvr) - informer, err := informerFactory.ForResource(gvr) - if err != nil { - cancel() - return fmt.Errorf("failed to add informer for %s, %v", gvr.String(), err) - } + informer := dynamicInformerFactory.ForResource(gvr) hasInformersSynced = append(hasInformersSynced, informer.Informer().HasSynced) } - informerFactory.Start(ctx.Done()) + dynamicInformerFactory.Start(ctx.Done()) go p.holdInformerSync(ctx, hasInformersSynced) p.cancel = cancel p.isRunning = true @@ -705,3 +708,17 @@ func buildProxiedClientWithUserAgent(proxyAddr string, userAgent string) (kubern } return client, nil } + +func buildDynamicClientWithUserAgent(proxyAddr string, userAgent string) (dynamic.Interface, error) { + kubeConfig, err := clientcmd.BuildConfigFromFlags(proxyAddr, "") + if err != nil { + return nil, err + } + + kubeConfig.UserAgent = userAgent + client, err := dynamic.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + return client, nil +} From 027d62374b63e6611117fa68366217bb43f9aa12 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sat, 14 Jan 2023 22:04:44 +0800 Subject: [PATCH 4/8] Fix --- pkg/yurthub/poolcoordinator/coordinator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index ea4526d2abb..55ed6826257 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -20,11 +20,12 @@ import ( "context" "encoding/json" "fmt" - "k8s.io/klog/v2" "strconv" "sync" "time" + "k8s.io/klog/v2" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" From 5be2fbd5acc336a456335a34d6b38394be1bb957 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Mon, 16 Jan 2023 14:56:59 +0800 Subject: [PATCH 5/8] Fix: --- pkg/yurthub/proxy/pool/pool.go | 1 + pkg/yurthub/storage/etcd/storage.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index 6080dc5da0e..869ed31a0b9 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -108,6 +108,7 @@ func (pp *PoolCoordinatorProxy) ServeHTTP(rw http.ResponseWriter, req *http.Requ util.Err(errors.NewBadRequest(fmt.Sprintf("pool-coordinator proxy cannot handle request(%s), cannot get requestInfo", hubutil.ReqString(req))), rw, req) return } + req.Header.Del("Authorization") // delete token with cloud apiServer RBAC and use yurthub authorization if reqInfo.IsResourceRequest { switch reqInfo.Verb { case "create": diff --git a/pkg/yurthub/storage/etcd/storage.go b/pkg/yurthub/storage/etcd/storage.go index 1e33a7de827..2aef0da789d 100644 --- a/pkg/yurthub/storage/etcd/storage.go +++ b/pkg/yurthub/storage/etcd/storage.go @@ -395,6 +395,7 @@ func (s *etcdStorage) ReplaceComponentList(component string, gvr schema.GroupVer oldKeyCache, loaded := s.localComponentKeyCache.LoadOrStore(component, newKeyCache) addedOrUpdated = newKeyCache.Difference(keySet{}) if loaded { + // FIXME: delete keys may cause unexpected problem deleted = oldKeyCache.Difference(newKeyCache) } From 66db812dcee6af909169b856c3d488dda1dbdfc4 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Mon, 16 Jan 2023 14:59:46 +0800 Subject: [PATCH 6/8] Fix: format --- pkg/yurthub/poolcoordinator/coordinator.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 55ed6826257..15c269ebccf 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -24,8 +24,6 @@ import ( "sync" "time" - "k8s.io/klog/v2" - coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -40,6 +38,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" From 2a29a65645c9ab209066cb15312a95dd9f01ef8b Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Mon, 16 Jan 2023 15:04:03 +0800 Subject: [PATCH 7/8] Fix --- pkg/yurthub/poolcoordinator/coordinator.go | 2 +- pkg/yurthub/poolcoordinator/resources/resources.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 15c269ebccf..732d156ff1b 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -174,7 +174,7 @@ func NewCoordinator( } // init pool scope resources - resources.InitPoolScopeResourcesManger(proxiedClient, cfg.SharedFactory) + resources.InitPoolScopeResourcesManger(proxiedClient) dynamicClient, err := buildDynamicClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent) if err != nil { diff --git a/pkg/yurthub/poolcoordinator/resources/resources.go b/pkg/yurthub/poolcoordinator/resources/resources.go index 042581f3fd1..6951abb6e55 100644 --- a/pkg/yurthub/poolcoordinator/resources/resources.go +++ b/pkg/yurthub/poolcoordinator/resources/resources.go @@ -28,9 +28,8 @@ type PoolScopeResourcesManger struct { var poolScopeResourcesManger *PoolScopeResourcesManger -func InitPoolScopeResourcesManger(client kubernetes.Interface, factory informers.SharedInformerFactory) *PoolScopeResourcesManger { +func InitPoolScopeResourcesManger(client kubernetes.Interface) *PoolScopeResourcesManger { poolScopeResourcesManger = &PoolScopeResourcesManger{ - factory: factory, k8sClient: client, validPoolScopedResources: make(map[string]*verifiablePoolScopeResource), } From e119c299d9800242808ffeaaf03a206ba6142089 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Mon, 16 Jan 2023 15:10:26 +0800 Subject: [PATCH 8/8] Fix: linter --- pkg/yurthub/poolcoordinator/coordinator.go | 2 +- pkg/yurthub/poolcoordinator/resources/resources.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 732d156ff1b..15c269ebccf 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -174,7 +174,7 @@ func NewCoordinator( } // init pool scope resources - resources.InitPoolScopeResourcesManger(proxiedClient) + resources.InitPoolScopeResourcesManger(proxiedClient, cfg.SharedFactory) dynamicClient, err := buildDynamicClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent) if err != nil { diff --git a/pkg/yurthub/poolcoordinator/resources/resources.go b/pkg/yurthub/poolcoordinator/resources/resources.go index 6951abb6e55..6826a31ae0b 100644 --- a/pkg/yurthub/poolcoordinator/resources/resources.go +++ b/pkg/yurthub/poolcoordinator/resources/resources.go @@ -28,7 +28,7 @@ type PoolScopeResourcesManger struct { var poolScopeResourcesManger *PoolScopeResourcesManger -func InitPoolScopeResourcesManger(client kubernetes.Interface) *PoolScopeResourcesManger { +func InitPoolScopeResourcesManger(client kubernetes.Interface, factory informers.SharedInformerFactory) *PoolScopeResourcesManger { poolScopeResourcesManger = &PoolScopeResourcesManger{ k8sClient: client, validPoolScopedResources: make(map[string]*verifiablePoolScopeResource),