From 0a8659cbbbeed84cce832be9f07cebf10477383b Mon Sep 17 00:00:00 2001 From: Laurence <45508533+LaurenceLiZhixin@users.noreply.github.com> Date: Wed, 11 Jan 2023 15:51:48 +0800 Subject: [PATCH] Fix: pool-coordinator (#1126) * Fix: pool-coordinator --- cmd/yurthub/app/start.go | 56 ++++++------- pkg/yurthub/poolcoordinator/coordinator.go | 33 ++++---- .../poolcoordinator/leader_election.go | 4 +- pkg/yurthub/proxy/pool/pool.go | 52 ++++++++---- pkg/yurthub/proxy/proxy.go | 68 +++++++++------- pkg/yurthub/proxy/remote/loadbalancer.go | 54 +++++++++---- pkg/yurthub/storage/etcd/keycache.go | 81 ++++++++++--------- 7 files changed, 204 insertions(+), 144 deletions(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 90a3213747d..1f0ef5bdd55 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -148,30 +148,30 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, ctx.Done()) trace++ - var coordinator poolcoordinator.Coordinator = &poolcoordinator.FakeCoordinator{} - var coordinatorHealthChecker healthchecker.HealthChecker = healthchecker.NewFakeChecker(false, make(map[string]int)) - var coordinatorTransportManager transport.Interface = nil - var waittingForCoordinator func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) = nil + var coordinatorGetter func() poolcoordinator.Coordinator + var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker + var coordinatorTransportManagerGetter func() transport.Interface = nil + coordinatorInformerRegistryChan := make(chan struct{}) + if cfg.EnableCoordinator { klog.Infof("%d. start to run coordinator", trace) // coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check // if certs has been got from cloud APIServer. - waittingForCoordinator = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker) trace++ - } - - // Start the informer factory if all informers have been registered - cfg.SharedFactory.Start(ctx.Done()) - cfg.YurtSharedFactory.Start(ctx.Done()) - - if waittingForCoordinator != nil { // Waitting for the coordinator to run, before using it to create other components. - coordinatorHealthChecker, coordinatorTransportManager, coordinator, err = waittingForCoordinator() + coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan) if err != nil { return fmt.Errorf("failed to wait for coordinator to run, %v", err) } } + // wait for coordinator informer registry + <-coordinatorInformerRegistryChan + + // Start the informer factory if all informers have been registered + cfg.SharedFactory.Start(ctx.Done()) + cfg.YurtSharedFactory.Start(ctx.Done()) + klog.Infof("%d. new reverse proxy handler for remote servers", trace) yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler( cfg, @@ -179,9 +179,9 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { transportManager, cloudHealthChecker, tenantMgr, - coordinator, - coordinatorTransportManager, - coordinatorHealthChecker, + coordinatorGetter, + coordinatorTransportManagerGetter, + coordinatorHealthCheckerGetter, ctx.Done()) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) @@ -230,14 +230,13 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi func coordinatorRun(ctx context.Context, cfg *config.YurtHubConfiguration, restConfigMgr *hubrest.RestConfigManager, - cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (waittingForReady func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error)) { + cloudHealthChecker healthchecker.MultipleBackendsHealthChecker, + coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) { var coordinatorHealthChecker healthchecker.HealthChecker var coordinatorTransportMgr transport.Interface var coordinator poolcoordinator.Coordinator var returnErr error - readyCh := make(chan struct{}) - go func() { coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory) close(coordinatorInformerRegistryChan) // notify the coordinator secret informer registry event @@ -255,7 +254,7 @@ func coordinatorRun(ctx context.Context, coordinatorClient, err := kubernetes.NewForConfig(&rest.Config{ Host: cfg.CoordinatorServerURL.String(), Transport: coorTransportMgr.CurrentTransport(), - Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds), + Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second, }) if err != nil { returnErr = fmt.Errorf("failed to get coordinator client for pool coordinator, %v", err) @@ -274,14 +273,14 @@ func coordinatorRun(ctx context.Context, returnErr = fmt.Errorf("failed to create hub elector, %v", err) return } - elector.Run(ctx.Done()) + go elector.Run(ctx.Done()) coor, err := poolcoordinator.NewCoordinator(ctx, cfg, restConfigMgr, coorCertManager, coorTransportMgr, elector) if err != nil { returnErr = fmt.Errorf("failed to create coordinator, %v", err) return } - coor.Run() + go coor.Run() coordinatorTransportMgr = coorTransportMgr coordinatorHealthChecker = coorHealthChecker @@ -289,12 +288,13 @@ func coordinatorRun(ctx context.Context, returnErr = nil }() - waittingForReady = func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) { - <-readyCh - return coordinatorHealthChecker, coordinatorTransportMgr, coordinator, returnErr - } - - return waittingForReady + return func() healthchecker.HealthChecker { + return coordinatorHealthChecker + }, func() transport.Interface { + return coordinatorTransportMgr + }, func() poolcoordinator.Coordinator { + return coordinator + }, returnErr } func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) { diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index d7b0df604f3..d70f9b357dc 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -171,7 +171,7 @@ func NewCoordinator( poolScopedCacheSyncManager := &poolScopedCacheSyncManager{ ctx: ctx, proxiedClient: proxiedClient, - coordinatorClient: cfg.CoordinatorClient, + coordinatorClient: coordinatorClient, nodeName: cfg.NodeName, getEtcdStore: coordinator.getEtcdStore, } @@ -185,12 +185,10 @@ func NewCoordinator( func (coordinator *coordinator) Run() { for { - var poolCacheManager cachemanager.CacheManager - var cancelEtcdStorage func() + var cancelEtcdStorage = func() {} var needUploadLocalCache bool var needCancelEtcdStorage bool var isPoolCacheSynced bool - var etcdStorage storage.Store var err error select { @@ -213,10 +211,13 @@ func (coordinator *coordinator) Run() { needUploadLocalCache = true needCancelEtcdStorage = true isPoolCacheSynced = false - etcdStorage = nil - poolCacheManager = nil + coordinator.Lock() + coordinator.poolCacheManager = nil + coordinator.Unlock() case LeaderHub: - poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.Lock() + coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.Unlock() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) continue @@ -245,14 +246,16 @@ func (coordinator *coordinator) Run() { coordinator.poolCacheSyncedDetector.EnsureStart() if coordinator.needUploadLocalCache { - if err := coordinator.uploadLocalCache(etcdStorage); err != nil { + if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil { klog.Errorf("failed to upload local cache when yurthub becomes leader, %v", err) } else { needUploadLocalCache = false } } case FollowerHub: - poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.Lock() + coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() + coordinator.Unlock() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) continue @@ -263,7 +266,7 @@ func (coordinator *coordinator) Run() { coordinator.poolCacheSyncedDetector.EnsureStart() if coordinator.needUploadLocalCache { - if err := coordinator.uploadLocalCache(etcdStorage); err != nil { + if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil { klog.Errorf("failed to upload local cache when yurthub becomes follower, %v", err) } else { needUploadLocalCache = false @@ -276,11 +279,9 @@ func (coordinator *coordinator) Run() { // Because the caller of IsReady() may be concurrent. coordinator.Lock() if needCancelEtcdStorage { - coordinator.cancelEtcdStorage() + cancelEtcdStorage() } coordinator.electStatus = electorStatus - coordinator.poolCacheManager = poolCacheManager - coordinator.etcdStorage = etcdStorage coordinator.cancelEtcdStorage = cancelEtcdStorage coordinator.needUploadLocalCache = needUploadLocalCache coordinator.isPoolCacheSynced = isPoolCacheSynced @@ -299,7 +300,8 @@ func (coordinator *coordinator) IsReady() (cachemanager.CacheManager, bool) { // If electStatus is not PendingHub, it means pool-coordinator is healthy. coordinator.Lock() defer coordinator.Unlock() - if coordinator.electStatus != PendingHub && coordinator.isPoolCacheSynced && !coordinator.needUploadLocalCache { + // fixme: coordinator.isPoolCacheSynced now is not considered + if coordinator.electStatus != PendingHub && !coordinator.needUploadLocalCache { return coordinator.poolCacheManager, true } return nil, false @@ -403,7 +405,8 @@ type poolScopedCacheSyncManager struct { func (p *poolScopedCacheSyncManager) EnsureStart() error { if !p.isRunning { - if err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{}); err != nil { + err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to delete informer sync lease, %v", err) } diff --git a/pkg/yurthub/poolcoordinator/leader_election.go b/pkg/yurthub/poolcoordinator/leader_election.go index 565caf4c9ac..ffafe9a39bf 100644 --- a/pkg/yurthub/poolcoordinator/leader_election.go +++ b/pkg/yurthub/poolcoordinator/leader_election.go @@ -55,7 +55,7 @@ func NewHubElector( coordinatorClient: coordinatorClient, coordinatorHealthChecker: coordinatorHealthChecker, cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker, - electorStatus: make(chan int32), + electorStatus: make(chan int32, 1), } rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock, @@ -75,7 +75,7 @@ func NewHubElector( RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { - klog.Infof("yurthub of %s became lease", cfg.NodeName) + klog.Infof("yurthub of %s became leader", cfg.NodeName) he.electorStatus <- LeaderHub }, OnStoppedLeading: func() { diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index e81eec4129f..6080dc5da0e 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -51,7 +51,7 @@ type PoolCoordinatorProxy struct { func NewPoolCoordinatorProxy( poolCoordinatorAddr *url.URL, localCacheMgr cachemanager.CacheManager, - transportMgr transport.Interface, + transportMgrGetter func() transport.Interface, filterMgr *manager.Manager, isCoordinatorReady func() bool, stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) { @@ -66,17 +66,33 @@ func NewPoolCoordinatorProxy( stopCh: stopCh, } - proxy, err := util.NewRemoteProxy( - poolCoordinatorAddr, - pp.modifyResponse, - pp.errorHandler, - transportMgr, - stopCh) - if err != nil { - return nil, fmt.Errorf("failed to create remote proxy for pool-coordinator, %v", err) - } + go func() { + ticker := time.NewTicker(time.Second * 5) + for { + select { + case <-ticker.C: + transportMgr := transportMgrGetter() + if transportMgr == nil { + break + } + proxy, err := util.NewRemoteProxy( + poolCoordinatorAddr, + pp.modifyResponse, + pp.errorHandler, + transportMgr, + stopCh) + if err != nil { + klog.Errorf("failed to create remote proxy for pool-coordinator, %v", err) + return + } + + pp.poolCoordinatorProxy = proxy + klog.Infof("create remote proxy for pool-coordinator success") + return + } + } + }() - pp.poolCoordinatorProxy = proxy return pp, nil } @@ -117,7 +133,7 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque ctx := req.Context() info, _ := apirequest.RequestInfoFrom(ctx) klog.V(4).Infof("pool handle post, req=%s, reqInfo=%s", hubutil.ReqString(req), hubutil.ReqInfoString(info)) - if util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req) { + if (util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req)) && pp.poolCoordinatorProxy != nil { // kubelet needs to create subjectaccessreviews for auth pp.poolCoordinatorProxy.ServeHTTP(rw, req) return nil @@ -127,7 +143,7 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque } func (pp *PoolCoordinatorProxy) poolQuery(rw http.ResponseWriter, req *http.Request) error { - if util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req) { + if (util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req)) && pp.poolCoordinatorProxy != nil { pp.poolCoordinatorProxy.ServeHTTP(rw, req) return nil } @@ -135,7 +151,7 @@ func (pp *PoolCoordinatorProxy) poolQuery(rw http.ResponseWriter, req *http.Requ } func (pp *PoolCoordinatorProxy) poolWatch(rw http.ResponseWriter, req *http.Request) error { - if util.IsPoolScopedResouceListWatchRequest(req) { + if util.IsPoolScopedResouceListWatchRequest(req) && pp.poolCoordinatorProxy != nil { clientReqCtx := req.Context() poolServeCtx, poolServeCancel := context.WithCancel(clientReqCtx) @@ -243,7 +259,7 @@ func (pp *PoolCoordinatorProxy) cacheResponse(req *http.Request, resp *http.Resp if pp.localCacheMgr.CanCacheFor(req) { ctx := req.Context() req = req.WithContext(ctx) - wrapPrc, _ := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") + wrapPrc, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") rc, prc := hubutil.NewDualReadCloser(req, wrapPrc, true) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { @@ -251,6 +267,12 @@ func (pp *PoolCoordinatorProxy) cacheResponse(req *http.Request, resp *http.Resp klog.Errorf("failed to cache req %s in local cache when cluster is unhealthy, %v", hubutil.ReqString(req), err) } }(req, prc, ctx.Done()) + + // after gunzip in filter, the header content encoding should be removed. + // because there's no need to gunzip response.body again. + if needUncompressed { + resp.Header.Del("Content-Encoding") + } resp.Body = rc } } diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index b112ce62dcc..a04601d2c01 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -48,17 +48,17 @@ import ( ) type yurtReverseProxy struct { - resolver apirequest.RequestInfoResolver - loadBalancer remote.LoadBalancer - cloudHealthChecker healthchecker.MultipleBackendsHealthChecker - coordinatorHealtChecker healthchecker.HealthChecker - localProxy http.Handler - poolProxy http.Handler - maxRequestsInFlight int - tenantMgr tenant.Interface - isCoordinatorReady func() bool - workingMode hubutil.WorkingMode - enablePoolCoordinator bool + resolver apirequest.RequestInfoResolver + loadBalancer remote.LoadBalancer + cloudHealthChecker healthchecker.MultipleBackendsHealthChecker + coordinatorHealtCheckerGetter func() healthchecker.HealthChecker + localProxy http.Handler + poolProxy http.Handler + maxRequestsInFlight int + tenantMgr tenant.Interface + isCoordinatorReady func() bool + workingMode hubutil.WorkingMode + enablePoolCoordinator bool } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -69,9 +69,9 @@ func NewYurtReverseProxyHandler( transportMgr transport.Interface, cloudHealthChecker healthchecker.MultipleBackendsHealthChecker, tenantMgr tenant.Interface, - coordinator poolcoordinator.Coordinator, - coordinatorTransportMgr transport.Interface, - coordinatorHealthChecker healthchecker.HealthChecker, + coordinatorGetter func() poolcoordinator.Coordinator, + coordinatorTransportMgrGetter func() transport.Interface, + coordinatorHealthCheckerGetter func() healthchecker.HealthChecker, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -83,7 +83,7 @@ func NewYurtReverseProxyHandler( yurtHubCfg.RemoteServers, localCacheMgr, transportMgr, - coordinator, + coordinatorGetter, cloudHealthChecker, yurtHubCfg.FilterManager, yurtHubCfg.WorkingMode, @@ -94,10 +94,18 @@ func NewYurtReverseProxyHandler( var localProxy, poolProxy http.Handler isCoordinatorHealthy := func() bool { + coordinator := coordinatorGetter() + if coordinator == nil { + return false + } _, healthy := coordinator.IsHealthy() return healthy } isCoordinatorReady := func() bool { + coordinator := coordinatorGetter() + if coordinator == nil { + return false + } _, ready := coordinator.IsReady() return ready } @@ -116,7 +124,7 @@ func NewYurtReverseProxyHandler( poolProxy, err = pool.NewPoolCoordinatorProxy( yurtHubCfg.CoordinatorServerURL, localCacheMgr, - coordinatorTransportMgr, + coordinatorTransportMgrGetter, yurtHubCfg.FilterManager, isCoordinatorReady, stopCh) @@ -127,17 +135,17 @@ func NewYurtReverseProxyHandler( } yurtProxy := &yurtReverseProxy{ - resolver: resolver, - loadBalancer: lb, - cloudHealthChecker: cloudHealthChecker, - coordinatorHealtChecker: coordinatorHealthChecker, - localProxy: localProxy, - poolProxy: poolProxy, - maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight, - isCoordinatorReady: isCoordinatorReady, - enablePoolCoordinator: yurtHubCfg.EnableCoordinator, - tenantMgr: tenantMgr, - workingMode: yurtHubCfg.WorkingMode, + resolver: resolver, + loadBalancer: lb, + cloudHealthChecker: cloudHealthChecker, + coordinatorHealtCheckerGetter: coordinatorHealthCheckerGetter, + localProxy: localProxy, + poolProxy: poolProxy, + maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight, + isCoordinatorReady: isCoordinatorReady, + enablePoolCoordinator: yurtHubCfg.EnableCoordinator, + tenantMgr: tenantMgr, + workingMode: yurtHubCfg.WorkingMode, } return yurtProxy.buildHandlerChain(yurtProxy), nil @@ -200,7 +208,11 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) func (p *yurtReverseProxy) handleKubeletLease(rw http.ResponseWriter, req *http.Request) { p.cloudHealthChecker.RenewKubeletLeaseTime() - p.coordinatorHealtChecker.RenewKubeletLeaseTime() + coordinatorHealtChecker := p.coordinatorHealtCheckerGetter() + if coordinatorHealtChecker != nil { + coordinatorHealtChecker.RenewKubeletLeaseTime() + } + if p.localProxy != nil { p.localProxy.ServeHTTP(rw, req) } diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 7a3f89efc46..648f02f2270 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -128,13 +128,13 @@ type LoadBalancer interface { } type loadBalancer struct { - backends []*util.RemoteProxy - algo loadBalancerAlgo - localCacheMgr cachemanager.CacheManager - filterManager *manager.Manager - coordinator poolcoordinator.Coordinator - workingMode hubutil.WorkingMode - stopCh <-chan struct{} + backends []*util.RemoteProxy + algo loadBalancerAlgo + localCacheMgr cachemanager.CacheManager + filterManager *manager.Manager + coordinatorGetter func() poolcoordinator.Coordinator + workingMode hubutil.WorkingMode + stopCh <-chan struct{} } // NewLoadBalancer creates a loadbalancer for specified remote servers @@ -143,17 +143,17 @@ func NewLoadBalancer( remoteServers []*url.URL, localCacheMgr cachemanager.CacheManager, transportMgr transport.Interface, - coordinator poolcoordinator.Coordinator, + coordinatorGetter func() poolcoordinator.Coordinator, healthChecker healthchecker.MultipleBackendsHealthChecker, filterManager *manager.Manager, workingMode hubutil.WorkingMode, stopCh <-chan struct{}) (LoadBalancer, error) { lb := &loadBalancer{ - localCacheMgr: localCacheMgr, - filterManager: filterManager, - coordinator: coordinator, - workingMode: workingMode, - stopCh: stopCh, + localCacheMgr: localCacheMgr, + filterManager: filterManager, + coordinatorGetter: coordinatorGetter, + workingMode: workingMode, + stopCh: stopCh, } backends := make([]*util.RemoteProxy, 0, len(remoteServers)) for i := range remoteServers { @@ -194,6 +194,7 @@ func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { return } klog.V(3).Infof("picked backend %s by %s for request %s", rp.Name(), lb.algo.Name(), hubutil.ReqString(req)) + // If pool-scoped resource request is from leader-yurthub, it should always be sent to the cloud APIServer. // Thus we do not need to start a check routine for it. But for other requests, we need to periodically check // the pool-coordinator status, and switch the traffic to pool-coordinator if it is ready. @@ -210,7 +211,14 @@ func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { for { select { case <-t.C: - if _, isReady := lb.coordinator.IsReady(); isReady { + if lb.coordinatorGetter == nil { + continue + } + coordinator := lb.coordinatorGetter() + if coordinator == nil { + continue + } + if _, isReady := coordinator.IsReady(); isReady { klog.Infof("notified the pool coordinator is ready, cancel the req %s making it handled by pool coordinator", hubutil.ReqString(req)) cloudServeCancel() return @@ -326,10 +334,24 @@ func (lb *loadBalancer) modifyResponse(resp *http.Response) error { func (lb *loadBalancer) cacheResponse(req *http.Request, resp *http.Response) { if lb.localCacheMgr.CanCacheFor(req) { ctx := req.Context() - wrapPrc, _ := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") + wrapPrc, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager") + // after gunzip in filter, the header content encoding should be removed. + // because there's no need to gunzip response.body again. + if needUncompressed { + resp.Header.Del("Content-Encoding") + } resp.Body = wrapPrc - poolCacheManager, isHealthy := lb.coordinator.IsHealthy() + var poolCacheManager cachemanager.CacheManager + var isHealthy bool + + coordinator := lb.coordinatorGetter() + if coordinator == nil { + isHealthy = false + } else { + poolCacheManager, isHealthy = coordinator.IsHealthy() + } + if isHealthy && poolCacheManager != nil { if !isLeaderHubUserAgent(ctx) { if isRequestOfNodeAndPod(ctx) { diff --git a/pkg/yurthub/storage/etcd/keycache.go b/pkg/yurthub/storage/etcd/keycache.go index 5a9c21ca227..8122c7f7b66 100644 --- a/pkg/yurthub/storage/etcd/keycache.go +++ b/pkg/yurthub/storage/etcd/keycache.go @@ -120,48 +120,49 @@ func (c *componentKeyCache) Recover() error { } func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) { - client := c.getEtcdClient() - if client == nil { - return nil, fmt.Errorf("got empty etcd client") - } + // FIXME: now getEtcdClient would cause nil pointer + //client := c.getEtcdClient() + //if client == nil { + // return nil, fmt.Errorf("got empty etcd client") + //} keys := &keySet{m: map[storageKey]struct{}{}} - for gvr := range coordinatorconstants.PoolScopedResources { - getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) - defer cancel() - rootKey, err := c.keyFunc(storage.KeyBuildInfo{ - Component: coordinatorconstants.DefaultPoolScopedUserAgent, - Group: gvr.Group, - Version: gvr.Version, - Resources: gvr.Resource, - }) - if err != nil { - return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) - } - getResp, err := client.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) - if err != nil { - return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) - } - - for _, kv := range getResp.Kvs { - ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key)) - if err != nil { - return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key) - } - key, err := c.keyFunc(storage.KeyBuildInfo{ - Component: coordinatorconstants.DefaultPoolScopedUserAgent, - Group: gvr.Group, - Version: gvr.Version, - Resources: gvr.Resource, - Namespace: ns, - Name: name, - }) - if err != nil { - return nil, fmt.Errorf("failed to create resource key for %v", kv.Key) - } - keys.m[key.(storageKey)] = struct{}{} - } - } + //for gvr := range coordinatorconstants.PoolScopedResources { + //getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) + //defer cancel() + //rootKey, err := c.keyFunc(storage.KeyBuildInfo{ + // Component: coordinatorconstants.DefaultPoolScopedUserAgent, + // Group: gvr.Group, + // Version: gvr.Version, + // Resources: gvr.Resource, + //}) + //if err != nil { + // return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) + //} + //getResp, err := client.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) + //if err != nil { + // return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) + //} + // + //for _, kv := range getResp.Kvs { + // ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key)) + // if err != nil { + // return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key) + // } + // key, err := c.keyFunc(storage.KeyBuildInfo{ + // Component: coordinatorconstants.DefaultPoolScopedUserAgent, + // Group: gvr.Group, + // Version: gvr.Version, + // Resources: gvr.Resource, + // Namespace: ns, + // Name: name, + // }) + // if err != nil { + // return nil, fmt.Errorf("failed to create resource key for %v", kv.Key) + // } + // keys.m[key.(storageKey)] = struct{}{} + //} + //} return keys, nil }