From aa4964f14f6a7597622d179b2d8236ac51ef8c51 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Fri, 6 Jan 2023 17:42:40 +0800 Subject: [PATCH 1/5] Fix: pool-coordinator --- cmd/yurthub/app/config/config.go | 1 - cmd/yurthub/app/start.go | 53 ++++++------ pkg/yurthub/certificate/hubself/cert_mgr.go | 6 +- .../certificate/hubself/cert_mgr_test.go | 4 +- pkg/yurthub/filter/approver_test.go | 1 + pkg/yurthub/poolcoordinator/coordinator.go | 27 +++---- .../poolcoordinator/leader_election.go | 4 +- pkg/yurthub/proxy/pool/pool.go | 44 ++++++---- pkg/yurthub/proxy/proxy.go | 68 +++++++++------- pkg/yurthub/proxy/remote/loadbalancer.go | 46 +++++++---- pkg/yurthub/storage/etcd/keycache.go | 81 ++++++++++--------- 11 files changed, 188 insertions(+), 147 deletions(-) diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index a5bb966f326..8e2f0b325da 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -100,7 +100,6 @@ type YurtHubConfiguration struct { CoordinatorServerURL *url.URL CoordinatorStoragePrefix string CoordinatorStorageAddr string // ip:port - CoordinatorClient kubernetes.Interface LeaderElection componentbaseconfig.LeaderElectionConfiguration } diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 5028c44b2f9..30f7e8c8dc3 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -19,6 +19,7 @@ package app import ( "context" "fmt" + "k8s.io/klog/v2" "net/url" "path/filepath" "time" @@ -28,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/cmd/yurthub/app/options" @@ -180,30 +180,28 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { tenantMgr := tenant.New(cfg.YurtHubCertOrganizations, cfg.SharedFactory, ctx.Done()) trace++ - var coordinator poolcoordinator.Coordinator = &poolcoordinator.FakeCoordinator{} - var coordinatorHealthChecker healthchecker.HealthChecker = healthchecker.NewFakeChecker(false, make(map[string]int)) - var coordinatorTransportManager transport.Interface = nil - var waittingForCoordinator func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) = nil + var coordinatorGetter func() poolcoordinator.Coordinator + var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker + var coordinatorTransportManagerGetter func() transport.Interface = nil + if cfg.EnableCoordinator { klog.Infof("%d. start to run coordinator", trace) // coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check // if certs has been got from cloud APIServer. - waittingForCoordinator = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker) trace++ - } - - // Start the informer factory if all informers have been registered - cfg.SharedFactory.Start(ctx.Done()) - cfg.YurtSharedFactory.Start(ctx.Done()) - - if waittingForCoordinator != nil { // Waitting for the coordinator to run, before using it to create other components. - coordinatorHealthChecker, coordinatorTransportManager, coordinator, err = waittingForCoordinator() + coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker) if err != nil { return fmt.Errorf("failed to wait for coordinator to run, %v", err) } } + // wait for async coordinator informer registry + time.Sleep(time.Second * 5) + // Start the informer factory if all informers have been registered + cfg.SharedFactory.Start(ctx.Done()) + cfg.YurtSharedFactory.Start(ctx.Done()) + klog.Infof("%d. new reverse proxy handler for remote servers", trace) yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler( cfg, @@ -211,9 +209,9 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { transportManager, cloudHealthChecker, tenantMgr, - coordinator, - coordinatorTransportManager, - coordinatorHealthChecker, + coordinatorGetter, + coordinatorTransportManagerGetter, + coordinatorHealthCheckerGetter, ctx.Done()) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) @@ -270,7 +268,7 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi func coordinatorRun(ctx context.Context, cfg *config.YurtHubConfiguration, restConfigMgr *hubrest.RestConfigManager, - cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (waittingForReady func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error)) { + cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) { var coordinatorHealthChecker healthchecker.HealthChecker var coordinatorTransportMgr transport.Interface var coordinator poolcoordinator.Coordinator @@ -297,7 +295,7 @@ func coordinatorRun(ctx context.Context, coordinatorClient, err := kubernetes.NewForConfig(&rest.Config{ Host: cfg.CoordinatorServerURL.String(), Transport: coorTransportMgr.CurrentTransport(), - Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds), + Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second, }) if err != nil { returnErr = fmt.Errorf("failed to get coordinator client for pool coordinator, %v", err) @@ -316,14 +314,14 @@ func coordinatorRun(ctx context.Context, returnErr = fmt.Errorf("failed to create hub elector, %v", err) return } - elector.Run(ctx.Done()) + go elector.Run(ctx.Done()) coor, err := poolcoordinator.NewCoordinator(ctx, cfg, restConfigMgr, coorCertManager, coorTransportMgr, elector) if err != nil { returnErr = fmt.Errorf("failed to create coordinator, %v", err) return } - coor.Run() + go coor.Run() coordinatorTransportMgr = coorTransportMgr coordinatorHealthChecker = coorHealthChecker @@ -331,12 +329,13 @@ func coordinatorRun(ctx context.Context, returnErr = nil }() - waittingForReady = func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) { - <-readyCh - return coordinatorHealthChecker, coordinatorTransportMgr, coordinator, returnErr - } - - return waittingForReady + return func() healthchecker.HealthChecker { + return coordinatorHealthChecker + }, func() transport.Interface { + return coordinatorTransportMgr + }, func() poolcoordinator.Coordinator { + return coordinator + }, returnErr } func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) { diff --git a/pkg/yurthub/certificate/hubself/cert_mgr.go b/pkg/yurthub/certificate/hubself/cert_mgr.go index 805ac18bf1e..c173155f76c 100644 --- a/pkg/yurthub/certificate/hubself/cert_mgr.go +++ b/pkg/yurthub/certificate/hubself/cert_mgr.go @@ -251,7 +251,7 @@ func (ycm *yurtHubCertManager) initCaCert() error { klog.Infof("%s file not exists, so create it", caFile) } - insecureRestConfig, err := createInsecureRestClientConfig(ycm.remoteServers[0]) + insecureRestConfig, err := CreateInsecureRestClientConfig(ycm.remoteServers[0]) if err != nil { klog.Errorf("could not create insecure rest config, %v", err) return err @@ -524,8 +524,8 @@ func createBasic(apiServerAddr string, caCert []byte) *clientcmdapi.Config { } } -// createInsecureRestClientConfig create insecure rest client config. -func createInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config, error) { +// CreateInsecureRestClientConfig create insecure rest client config. +func CreateInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config, error) { if remoteServer == nil { return nil, fmt.Errorf("no healthy remote server") } diff --git a/pkg/yurthub/certificate/hubself/cert_mgr_test.go b/pkg/yurthub/certificate/hubself/cert_mgr_test.go index b7d415043c2..80e202498e6 100644 --- a/pkg/yurthub/certificate/hubself/cert_mgr_test.go +++ b/pkg/yurthub/certificate/hubself/cert_mgr_test.go @@ -147,9 +147,9 @@ func Test_createInsecureRestClientConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := createInsecureRestClientConfig(tt.args.remoteServer) + _, err := CreateInsecureRestClientConfig(tt.args.remoteServer) if (err != nil) != tt.wantErr { - t.Errorf("createInsecureRestClientConfig() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("CreateInsecureRestClientConfig() error = %v, wantErr %v", err, tt.wantErr) return } }) diff --git a/pkg/yurthub/filter/approver_test.go b/pkg/yurthub/filter/approver_test.go index d65554a6a38..2c58751a887 100644 --- a/pkg/yurthub/filter/approver_test.go +++ b/pkg/yurthub/filter/approver_test.go @@ -43,6 +43,7 @@ var supportedResourceAndVerbsForFilter = map[string]map[string]sets.String{ }, ServiceTopologyFilterName: { "endpoints": sets.NewString("list", "watch"), + "pods": sets.NewString("list", "watch"), "endpointslices": sets.NewString("list", "watch"), }, } diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 4cebbe9d7a5..dbb1901c9b4 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -161,7 +161,7 @@ func NewCoordinator( poolScopedCacheSyncManager := &poolScopedCacheSyncManager{ ctx: ctx, proxiedClient: proxiedClient, - coordinatorClient: cfg.CoordinatorClient, + coordinatorClient: coordinatorClient, nodeName: cfg.NodeName, getEtcdStore: coordinator.getEtcdStore, } @@ -175,12 +175,10 @@ func NewCoordinator( func (coordinator *coordinator) Run() { for { - var poolCacheManager cachemanager.CacheManager - var cancelEtcdStorage func() + var cancelEtcdStorage = func() {} var needUploadLocalCache bool var needCancelEtcdStorage bool var isPoolCacheSynced bool - var etcdStorage storage.Store var err error select { @@ -203,10 +201,9 @@ func (coordinator *coordinator) Run() { needUploadLocalCache = true needCancelEtcdStorage = true isPoolCacheSynced = false - etcdStorage = nil - poolCacheManager = nil + coordinator.poolCacheManager = nil case LeaderHub: - poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) continue @@ -249,14 +246,14 @@ func (coordinator *coordinator) Run() { }) if coordinator.needUploadLocalCache { - if err := coordinator.uploadLocalCache(etcdStorage); err != nil { + if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil { klog.Errorf("failed to upload local cache when yurthub becomes leader, %v", err) } else { needUploadLocalCache = false } } case FollowerHub: - poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) continue @@ -280,7 +277,7 @@ func (coordinator *coordinator) Run() { }) if coordinator.needUploadLocalCache { - if err := coordinator.uploadLocalCache(etcdStorage); err != nil { + if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil { klog.Errorf("failed to upload local cache when yurthub becomes follower, %v", err) } else { needUploadLocalCache = false @@ -293,11 +290,9 @@ func (coordinator *coordinator) Run() { // Because the caller of IsReady() may be concurrent. coordinator.Lock() if needCancelEtcdStorage { - coordinator.cancelEtcdStorage() + cancelEtcdStorage() } coordinator.electStatus = electorStatus - coordinator.poolCacheManager = poolCacheManager - coordinator.etcdStorage = etcdStorage coordinator.cancelEtcdStorage = cancelEtcdStorage coordinator.needUploadLocalCache = needUploadLocalCache coordinator.isPoolCacheSynced = isPoolCacheSynced @@ -316,7 +311,8 @@ func (coordinator *coordinator) IsReady() (cachemanager.CacheManager, bool) { // If electStatus is not PendingHub, it means pool-coordinator is healthy. coordinator.Lock() defer coordinator.Unlock() - if coordinator.electStatus != PendingHub && coordinator.isPoolCacheSynced && !coordinator.needUploadLocalCache { + // fixme: coordinator.isPoolCacheSynced now is not considered + if coordinator.electStatus != PendingHub && !coordinator.needUploadLocalCache { return coordinator.poolCacheManager, true } return nil, false @@ -430,7 +426,8 @@ type poolScopedCacheSyncManager struct { func (p *poolScopedCacheSyncManager) EnsureStart() error { if !p.isRunning { - if err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{}); err != nil { + err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to delete informer sync lease, %v", err) } diff --git a/pkg/yurthub/poolcoordinator/leader_election.go b/pkg/yurthub/poolcoordinator/leader_election.go index 565caf4c9ac..ffafe9a39bf 100644 --- a/pkg/yurthub/poolcoordinator/leader_election.go +++ b/pkg/yurthub/poolcoordinator/leader_election.go @@ -55,7 +55,7 @@ func NewHubElector( coordinatorClient: coordinatorClient, coordinatorHealthChecker: coordinatorHealthChecker, cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker, - electorStatus: make(chan int32), + electorStatus: make(chan int32, 1), } rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock, @@ -75,7 +75,7 @@ func NewHubElector( RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { - klog.Infof("yurthub of %s became lease", cfg.NodeName) + klog.Infof("yurthub of %s became leader", cfg.NodeName) he.electorStatus <- LeaderHub }, OnStoppedLeading: func() { diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index e81eec4129f..057175ece67 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -51,7 +51,7 @@ type PoolCoordinatorProxy struct { func NewPoolCoordinatorProxy( poolCoordinatorAddr *url.URL, localCacheMgr cachemanager.CacheManager, - transportMgr transport.Interface, + transportMgrGetter func() transport.Interface, filterMgr *manager.Manager, isCoordinatorReady func() bool, stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) { @@ -66,17 +66,33 @@ func NewPoolCoordinatorProxy( stopCh: stopCh, } - proxy, err := util.NewRemoteProxy( - poolCoordinatorAddr, - pp.modifyResponse, - pp.errorHandler, - transportMgr, - stopCh) - if err != nil { - return nil, fmt.Errorf("failed to create remote proxy for pool-coordinator, %v", err) - } + go func() { + ticker := time.NewTicker(time.Second * 5) + for { + select { + case <-ticker.C: + transportMgr := transportMgrGetter() + if transportMgr == nil { + break + } + proxy, err := util.NewRemoteProxy( + poolCoordinatorAddr, + pp.modifyResponse, + pp.errorHandler, + transportMgr, + stopCh) + if err != nil { + klog.Errorf("failed to create remote proxy for pool-coordinator, %v", err) + return + } + + pp.poolCoordinatorProxy = proxy + klog.Infof("create remote proxy for pool-coordinator success") + return + } + } + }() - pp.poolCoordinatorProxy = proxy return pp, nil } @@ -117,7 +133,7 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque ctx := req.Context() info, _ := apirequest.RequestInfoFrom(ctx) klog.V(4).Infof("pool handle post, req=%s, reqInfo=%s", hubutil.ReqString(req), hubutil.ReqInfoString(info)) - if util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req) { + if (util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req)) && pp.poolCoordinatorProxy != nil { // kubelet needs to create subjectaccessreviews for auth pp.poolCoordinatorProxy.ServeHTTP(rw, req) return nil @@ -127,7 +143,7 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque } func (pp *PoolCoordinatorProxy) poolQuery(rw http.ResponseWriter, req *http.Request) error { - if util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req) { + if (util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req)) && pp.poolCoordinatorProxy != nil { pp.poolCoordinatorProxy.ServeHTTP(rw, req) return nil } @@ -135,7 +151,7 @@ func (pp *PoolCoordinatorProxy) poolQuery(rw http.ResponseWriter, req *http.Requ } func (pp *PoolCoordinatorProxy) poolWatch(rw http.ResponseWriter, req *http.Request) error { - if util.IsPoolScopedResouceListWatchRequest(req) { + if util.IsPoolScopedResouceListWatchRequest(req) && pp.poolCoordinatorProxy != nil { clientReqCtx := req.Context() poolServeCtx, poolServeCancel := context.WithCancel(clientReqCtx) diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 75168388717..714e2921097 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -47,17 +47,17 @@ import ( ) type yurtReverseProxy struct { - resolver apirequest.RequestInfoResolver - loadBalancer remote.LoadBalancer - cloudHealthChecker healthchecker.MultipleBackendsHealthChecker - coordinatorHealtChecker healthchecker.HealthChecker - localProxy http.Handler - poolProxy http.Handler - maxRequestsInFlight int - tenantMgr tenant.Interface - isCoordinatorReady func() bool - workingMode hubutil.WorkingMode - enablePoolCoordinator bool + resolver apirequest.RequestInfoResolver + loadBalancer remote.LoadBalancer + cloudHealthChecker healthchecker.MultipleBackendsHealthChecker + coordinatorHealtCheckerGetter func() healthchecker.HealthChecker + localProxy http.Handler + poolProxy http.Handler + maxRequestsInFlight int + tenantMgr tenant.Interface + isCoordinatorReady func() bool + workingMode hubutil.WorkingMode + enablePoolCoordinator bool } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -68,9 +68,9 @@ func NewYurtReverseProxyHandler( transportMgr transport.Interface, cloudHealthChecker healthchecker.MultipleBackendsHealthChecker, tenantMgr tenant.Interface, - coordinator poolcoordinator.Coordinator, - coordinatorTransportMgr transport.Interface, - coordinatorHealthChecker healthchecker.HealthChecker, + coordinatorGetter func() poolcoordinator.Coordinator, + coordinatorTransportMgrGetter func() transport.Interface, + coordinatorHealthCheckerGetter func() healthchecker.HealthChecker, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -82,7 +82,7 @@ func NewYurtReverseProxyHandler( yurtHubCfg.RemoteServers, localCacheMgr, transportMgr, - coordinator, + coordinatorGetter, cloudHealthChecker, yurtHubCfg.FilterManager, yurtHubCfg.WorkingMode, @@ -93,10 +93,18 @@ func NewYurtReverseProxyHandler( var localProxy, poolProxy http.Handler isCoordinatorHealthy := func() bool { + coordinator := coordinatorGetter() + if coordinator == nil { + return false + } _, healthy := coordinator.IsHealthy() return healthy } isCoordinatorReady := func() bool { + coordinator := coordinatorGetter() + if coordinator == nil { + return false + } _, ready := coordinator.IsReady() return ready } @@ -115,7 +123,7 @@ func NewYurtReverseProxyHandler( poolProxy, err = pool.NewPoolCoordinatorProxy( yurtHubCfg.CoordinatorServerURL, localCacheMgr, - coordinatorTransportMgr, + coordinatorTransportMgrGetter, yurtHubCfg.FilterManager, isCoordinatorReady, stopCh) @@ -126,17 +134,17 @@ func NewYurtReverseProxyHandler( } yurtProxy := &yurtReverseProxy{ - resolver: resolver, - loadBalancer: lb, - cloudHealthChecker: cloudHealthChecker, - coordinatorHealtChecker: coordinatorHealthChecker, - localProxy: localProxy, - poolProxy: poolProxy, - maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight, - isCoordinatorReady: isCoordinatorReady, - enablePoolCoordinator: yurtHubCfg.EnableCoordinator, - tenantMgr: tenantMgr, - workingMode: yurtHubCfg.WorkingMode, + resolver: resolver, + loadBalancer: lb, + cloudHealthChecker: cloudHealthChecker, + coordinatorHealtCheckerGetter: coordinatorHealthCheckerGetter, + localProxy: localProxy, + poolProxy: poolProxy, + maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight, + isCoordinatorReady: isCoordinatorReady, + enablePoolCoordinator: yurtHubCfg.EnableCoordinator, + tenantMgr: tenantMgr, + workingMode: yurtHubCfg.WorkingMode, } return yurtProxy.buildHandlerChain(yurtProxy), nil @@ -199,7 +207,11 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) func (p *yurtReverseProxy) handleKubeletLease(rw http.ResponseWriter, req *http.Request) { p.cloudHealthChecker.RenewKubeletLeaseTime() - p.coordinatorHealtChecker.RenewKubeletLeaseTime() + coordinatorHealtChecker := p.coordinatorHealtCheckerGetter() + if coordinatorHealtChecker != nil { + coordinatorHealtChecker.RenewKubeletLeaseTime() + } + if p.localProxy != nil { p.localProxy.ServeHTTP(rw, req) } diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 9029a1eaa9d..76db32f124b 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -128,13 +128,13 @@ type LoadBalancer interface { } type loadBalancer struct { - backends []*util.RemoteProxy - algo loadBalancerAlgo - localCacheMgr cachemanager.CacheManager - filterManager *manager.Manager - coordinator poolcoordinator.Coordinator - workingMode hubutil.WorkingMode - stopCh <-chan struct{} + backends []*util.RemoteProxy + algo loadBalancerAlgo + localCacheMgr cachemanager.CacheManager + filterManager *manager.Manager + coordinatorGetter func() poolcoordinator.Coordinator + workingMode hubutil.WorkingMode + stopCh <-chan struct{} } // NewLoadBalancer creates a loadbalancer for specified remote servers @@ -143,17 +143,17 @@ func NewLoadBalancer( remoteServers []*url.URL, localCacheMgr cachemanager.CacheManager, transportMgr transport.Interface, - coordinator poolcoordinator.Coordinator, + coordinatorGetter func() poolcoordinator.Coordinator, healthChecker healthchecker.MultipleBackendsHealthChecker, filterManager *manager.Manager, workingMode hubutil.WorkingMode, stopCh <-chan struct{}) (LoadBalancer, error) { lb := &loadBalancer{ - localCacheMgr: localCacheMgr, - filterManager: filterManager, - coordinator: coordinator, - workingMode: workingMode, - stopCh: stopCh, + localCacheMgr: localCacheMgr, + filterManager: filterManager, + coordinatorGetter: coordinatorGetter, + workingMode: workingMode, + stopCh: stopCh, } backends := make([]*util.RemoteProxy, 0, len(remoteServers)) for i := range remoteServers { @@ -207,7 +207,14 @@ func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { for { select { case <-t.C: - if _, isReady := lb.coordinator.IsReady(); isReady { + if lb.coordinatorGetter == nil { + continue + } + coordinator := lb.coordinatorGetter() + if coordinator == nil { + continue + } + if _, isReady := coordinator.IsReady(); isReady { klog.Infof("notified the pool coordinator is ready, cancel the req %s making it handled by pool coordinator", hubutil.ReqString(req)) cloudServeCancel() return @@ -326,7 +333,16 @@ func (lb *loadBalancer) cacheResponse(req *http.Request, resp *http.Response) { wrapPrc, _ := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") resp.Body = wrapPrc - poolCacheManager, isHealthy := lb.coordinator.IsHealthy() + var poolCacheManager cachemanager.CacheManager + var isHealthy bool + + coordinator := lb.coordinatorGetter() + if coordinator == nil { + isHealthy = false + } else { + poolCacheManager, isHealthy = coordinator.IsHealthy() + } + if isHealthy && poolCacheManager != nil { if !isLeaderHubUserAgent(ctx) { if isRequestOfNodeAndPod(ctx) { diff --git a/pkg/yurthub/storage/etcd/keycache.go b/pkg/yurthub/storage/etcd/keycache.go index 5a9c21ca227..8122c7f7b66 100644 --- a/pkg/yurthub/storage/etcd/keycache.go +++ b/pkg/yurthub/storage/etcd/keycache.go @@ -120,48 +120,49 @@ func (c *componentKeyCache) Recover() error { } func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) { - client := c.getEtcdClient() - if client == nil { - return nil, fmt.Errorf("got empty etcd client") - } + // FIXME: now getEtcdClient would cause nil pointer + //client := c.getEtcdClient() + //if client == nil { + // return nil, fmt.Errorf("got empty etcd client") + //} keys := &keySet{m: map[storageKey]struct{}{}} - for gvr := range coordinatorconstants.PoolScopedResources { - getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) - defer cancel() - rootKey, err := c.keyFunc(storage.KeyBuildInfo{ - Component: coordinatorconstants.DefaultPoolScopedUserAgent, - Group: gvr.Group, - Version: gvr.Version, - Resources: gvr.Resource, - }) - if err != nil { - return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) - } - getResp, err := client.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) - if err != nil { - return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) - } - - for _, kv := range getResp.Kvs { - ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key)) - if err != nil { - return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key) - } - key, err := c.keyFunc(storage.KeyBuildInfo{ - Component: coordinatorconstants.DefaultPoolScopedUserAgent, - Group: gvr.Group, - Version: gvr.Version, - Resources: gvr.Resource, - Namespace: ns, - Name: name, - }) - if err != nil { - return nil, fmt.Errorf("failed to create resource key for %v", kv.Key) - } - keys.m[key.(storageKey)] = struct{}{} - } - } + //for gvr := range coordinatorconstants.PoolScopedResources { + //getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) + //defer cancel() + //rootKey, err := c.keyFunc(storage.KeyBuildInfo{ + // Component: coordinatorconstants.DefaultPoolScopedUserAgent, + // Group: gvr.Group, + // Version: gvr.Version, + // Resources: gvr.Resource, + //}) + //if err != nil { + // return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) + //} + //getResp, err := client.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) + //if err != nil { + // return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) + //} + // + //for _, kv := range getResp.Kvs { + // ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key)) + // if err != nil { + // return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key) + // } + // key, err := c.keyFunc(storage.KeyBuildInfo{ + // Component: coordinatorconstants.DefaultPoolScopedUserAgent, + // Group: gvr.Group, + // Version: gvr.Version, + // Resources: gvr.Resource, + // Namespace: ns, + // Name: name, + // }) + // if err != nil { + // return nil, fmt.Errorf("failed to create resource key for %v", kv.Key) + // } + // keys.m[key.(storageKey)] = struct{}{} + //} + //} return keys, nil } From 74d0f69ff59b8736b0d89fef6c5ff57593e2c244 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sat, 7 Jan 2023 13:24:58 +0800 Subject: [PATCH 2/5] Fix: imports --- cmd/yurthub/app/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 30f7e8c8dc3..021e4068d77 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -19,7 +19,6 @@ package app import ( "context" "fmt" - "k8s.io/klog/v2" "net/url" "path/filepath" "time" @@ -29,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/cmd/yurthub/app/options" From 81413c62ae719bac363623ec2e0e48ca3ad5edcc Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Mon, 9 Jan 2023 09:54:24 +0800 Subject: [PATCH 3/5] Fix --- cmd/yurthub/app/start.go | 17 ++++++++--------- pkg/yurthub/certificate/hubself/cert_mgr.go | 6 +++--- pkg/yurthub/filter/approver_test.go | 1 - pkg/yurthub/poolcoordinator/coordinator.go | 6 ++++++ 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 021e4068d77..1b37f4884f1 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -183,6 +183,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { var coordinatorGetter func() poolcoordinator.Coordinator var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker var coordinatorTransportManagerGetter func() transport.Interface = nil + coordinatorInformerRegistryChan := make(chan struct{}) if cfg.EnableCoordinator { klog.Infof("%d. start to run coordinator", trace) @@ -190,14 +191,15 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { // if certs has been got from cloud APIServer. trace++ // Waitting for the coordinator to run, before using it to create other components. - coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker) + coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan) if err != nil { return fmt.Errorf("failed to wait for coordinator to run, %v", err) } } - // wait for async coordinator informer registry - time.Sleep(time.Second * 5) + // wait for coordinator informer registry + <-coordinatorInformerRegistryChan + // Start the informer factory if all informers have been registered cfg.SharedFactory.Start(ctx.Done()) cfg.YurtSharedFactory.Start(ctx.Done()) @@ -268,19 +270,16 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi func coordinatorRun(ctx context.Context, cfg *config.YurtHubConfiguration, restConfigMgr *hubrest.RestConfigManager, - cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) { + cloudHealthChecker healthchecker.MultipleBackendsHealthChecker, + coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) { var coordinatorHealthChecker healthchecker.HealthChecker var coordinatorTransportMgr transport.Interface var coordinator poolcoordinator.Coordinator var returnErr error - readyCh := make(chan struct{}) - go func() { - // We should notify others(waittingForReady) if the routine exited. - defer close(readyCh) - coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.YurtClient, cfg.SharedFactory) + close(coordinatorInformerRegistryChan) // notify the coordinator secret informer registry event if err != nil { returnErr = fmt.Errorf("failed to create coordinator cert manager, %v", err) return diff --git a/pkg/yurthub/certificate/hubself/cert_mgr.go b/pkg/yurthub/certificate/hubself/cert_mgr.go index c173155f76c..805ac18bf1e 100644 --- a/pkg/yurthub/certificate/hubself/cert_mgr.go +++ b/pkg/yurthub/certificate/hubself/cert_mgr.go @@ -251,7 +251,7 @@ func (ycm *yurtHubCertManager) initCaCert() error { klog.Infof("%s file not exists, so create it", caFile) } - insecureRestConfig, err := CreateInsecureRestClientConfig(ycm.remoteServers[0]) + insecureRestConfig, err := createInsecureRestClientConfig(ycm.remoteServers[0]) if err != nil { klog.Errorf("could not create insecure rest config, %v", err) return err @@ -524,8 +524,8 @@ func createBasic(apiServerAddr string, caCert []byte) *clientcmdapi.Config { } } -// CreateInsecureRestClientConfig create insecure rest client config. -func CreateInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config, error) { +// createInsecureRestClientConfig create insecure rest client config. +func createInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config, error) { if remoteServer == nil { return nil, fmt.Errorf("no healthy remote server") } diff --git a/pkg/yurthub/filter/approver_test.go b/pkg/yurthub/filter/approver_test.go index 2c58751a887..d65554a6a38 100644 --- a/pkg/yurthub/filter/approver_test.go +++ b/pkg/yurthub/filter/approver_test.go @@ -43,7 +43,6 @@ var supportedResourceAndVerbsForFilter = map[string]map[string]sets.String{ }, ServiceTopologyFilterName: { "endpoints": sets.NewString("list", "watch"), - "pods": sets.NewString("list", "watch"), "endpointslices": sets.NewString("list", "watch"), }, } diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index dbb1901c9b4..b33453ff4ef 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -201,9 +201,13 @@ func (coordinator *coordinator) Run() { needUploadLocalCache = true needCancelEtcdStorage = true isPoolCacheSynced = false + coordinator.Lock() coordinator.poolCacheManager = nil + coordinator.Unlock() case LeaderHub: + coordinator.Lock() coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.Unlock() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) continue @@ -253,7 +257,9 @@ func (coordinator *coordinator) Run() { } } case FollowerHub: + coordinator.Lock() coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.Unlock() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) continue From 144557a2d734596fbc8e880ee6ef8cbeb78b0053 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 11 Jan 2023 11:36:47 +0800 Subject: [PATCH 4/5] Fix: gzip bug --- Makefile | 2 ++ pkg/yurthub/proxy/pool/pool.go | 8 +++++++- pkg/yurthub/proxy/proxy.go | 5 ++++- pkg/yurthub/proxy/remote/loadbalancer.go | 10 ++++++++-- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index e3be722c461..368050eb763 100644 --- a/Makefile +++ b/Makefile @@ -171,3 +171,5 @@ docker-push-node-servant: docker run --privileged --rm tonistiigi/binfmt --uninstall qemu-aarch64 docker run --rm --privileged tonistiigi/binfmt --install all docker buildx build --no-cache --push ${DOCKER_BUILD_ARGS} --platform ${TARGET_PLATFORMS} -f hack/dockerfiles/Dockerfile.yurt-node-servant . -t ${IMAGE_REPO}/node-servant:${GIT_VERSION} +docker-build-laurence-yurt-controller-manager: + docker buildx build --no-cache --push --platform linux/amd64 -f hack/dockerfiles/Dockerfile.yurt-controller-manager . -t ack-agility-registry.cn-shanghai.cr.aliyuncs.com/edgepaas/yurt-controller-manager:pool-coordinator diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index 057175ece67..6080dc5da0e 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -259,7 +259,7 @@ func (pp *PoolCoordinatorProxy) cacheResponse(req *http.Request, resp *http.Resp if pp.localCacheMgr.CanCacheFor(req) { ctx := req.Context() req = req.WithContext(ctx) - wrapPrc, _ := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") + wrapPrc, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") rc, prc := hubutil.NewDualReadCloser(req, wrapPrc, true) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { @@ -267,6 +267,12 @@ func (pp *PoolCoordinatorProxy) cacheResponse(req *http.Request, resp *http.Resp klog.Errorf("failed to cache req %s in local cache when cluster is unhealthy, %v", hubutil.ReqString(req), err) } }(req, prc, ctx.Done()) + + // after gunzip in filter, the header content encoding should be removed. + // because there's no need to gunzip response.body again. + if needUncompressed { + resp.Header.Del("Content-Encoding") + } resp.Body = rc } } diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 714e2921097..655236efdcb 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -37,6 +37,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator" + "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/pool" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" @@ -185,12 +186,14 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) return } + com, _ := hubutil.ClientComponentFrom(req.Context()) + switch { case util.IsKubeletLeaseReq(req): p.handleKubeletLease(rw, req) case util.IsEventCreateRequest(req): p.eventHandler(rw, req) - case util.IsPoolScopedResouceListWatchRequest(req): + case util.IsPoolScopedResouceListWatchRequest(req) && com != constants.DefaultPoolScopedUserAgent: p.poolScopedResouceHandler(rw, req) case util.IsSubjectAccessReviewCreateGetRequest(req): p.subjectAccessReviewHandler(rw, req) diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 76db32f124b..8c881c495ac 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -194,7 +194,8 @@ func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { return } klog.V(3).Infof("picked backend %s by %s for request %s", rp.Name(), lb.algo.Name(), hubutil.ReqString(req)) - if util.IsPoolScopedResouceListWatchRequest(req) { + com, _ := hubutil.ClientComponentFrom(req.Context()) + if util.IsPoolScopedResouceListWatchRequest(req) && com != coordinatorconstants.DefaultPoolScopedUserAgent { // We get here possibly because the pool-coordinator is not ready. // We should cancel the watch request when pool-coordinator becomes ready. klog.Infof("pool-coordinator is not ready, we use cloud APIServer to temporarily handle the req: %s", hubutil.ReqString(req)) @@ -330,7 +331,12 @@ func (lb *loadBalancer) modifyResponse(resp *http.Response) error { func (lb *loadBalancer) cacheResponse(req *http.Request, resp *http.Response) { if lb.localCacheMgr.CanCacheFor(req) { ctx := req.Context() - wrapPrc, _ := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") + wrapPrc, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") + // after gunzip in filter, the header content encoding should be removed. + // because there's no need to gunzip response.body again. + if needUncompressed { + resp.Header.Del("Content-Encoding") + } resp.Body = wrapPrc var poolCacheManager cachemanager.CacheManager From 02397dcb4806f067fcb3774aea6547c64ed97791 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 11 Jan 2023 11:37:52 +0800 Subject: [PATCH 5/5] Fix --- Makefile | 2 -- 1 file changed, 2 deletions(-) diff --git a/Makefile b/Makefile index 368050eb763..e3be722c461 100644 --- a/Makefile +++ b/Makefile @@ -171,5 +171,3 @@ docker-push-node-servant: docker run --privileged --rm tonistiigi/binfmt --uninstall qemu-aarch64 docker run --rm --privileged tonistiigi/binfmt --install all docker buildx build --no-cache --push ${DOCKER_BUILD_ARGS} --platform ${TARGET_PLATFORMS} -f hack/dockerfiles/Dockerfile.yurt-node-servant . -t ${IMAGE_REPO}/node-servant:${GIT_VERSION} -docker-build-laurence-yurt-controller-manager: - docker buildx build --no-cache --push --platform linux/amd64 -f hack/dockerfiles/Dockerfile.yurt-controller-manager . -t ack-agility-registry.cn-shanghai.cr.aliyuncs.com/edgepaas/yurt-controller-manager:pool-coordinator