Skip to content

Commit

Permalink
Fix: pool-coordinator (openyurtio#1126)
Browse files Browse the repository at this point in the history
* Fix: pool-coordinator
  • Loading branch information
LaurenceLiZhixin authored and Congrool committed Jan 17, 2023
1 parent 39029b3 commit 0a8659c
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 144 deletions.
56 changes: 28 additions & 28 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,40 +148,40 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, ctx.Done())
trace++

var coordinator poolcoordinator.Coordinator = &poolcoordinator.FakeCoordinator{}
var coordinatorHealthChecker healthchecker.HealthChecker = healthchecker.NewFakeChecker(false, make(map[string]int))
var coordinatorTransportManager transport.Interface = nil
var waittingForCoordinator func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) = nil
var coordinatorGetter func() poolcoordinator.Coordinator
var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = nil
coordinatorInformerRegistryChan := make(chan struct{})

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer.
waittingForCoordinator = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker)
trace++
}

// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.YurtSharedFactory.Start(ctx.Done())

if waittingForCoordinator != nil {
// Waitting for the coordinator to run, before using it to create other components.
coordinatorHealthChecker, coordinatorTransportManager, coordinator, err = waittingForCoordinator()
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
if err != nil {
return fmt.Errorf("failed to wait for coordinator to run, %v", err)
}
}

// wait for coordinator informer registry
<-coordinatorInformerRegistryChan

// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.YurtSharedFactory.Start(ctx.Done())

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
transportManager,
cloudHealthChecker,
tenantMgr,
coordinator,
coordinatorTransportManager,
coordinatorHealthChecker,
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand Down Expand Up @@ -230,14 +230,13 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi
func coordinatorRun(ctx context.Context,
cfg *config.YurtHubConfiguration,
restConfigMgr *hubrest.RestConfigManager,
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (waittingForReady func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error)) {
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker,
coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) {
var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
var returnErr error

readyCh := make(chan struct{})

go func() {
coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory)
close(coordinatorInformerRegistryChan) // notify the coordinator secret informer registry event
Expand All @@ -255,7 +254,7 @@ func coordinatorRun(ctx context.Context,
coordinatorClient, err := kubernetes.NewForConfig(&rest.Config{
Host: cfg.CoordinatorServerURL.String(),
Transport: coorTransportMgr.CurrentTransport(),
Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds),
Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second,
})
if err != nil {
returnErr = fmt.Errorf("failed to get coordinator client for pool coordinator, %v", err)
Expand All @@ -274,27 +273,28 @@ func coordinatorRun(ctx context.Context,
returnErr = fmt.Errorf("failed to create hub elector, %v", err)
return
}
elector.Run(ctx.Done())
go elector.Run(ctx.Done())

coor, err := poolcoordinator.NewCoordinator(ctx, cfg, restConfigMgr, coorCertManager, coorTransportMgr, elector)
if err != nil {
returnErr = fmt.Errorf("failed to create coordinator, %v", err)
return
}
coor.Run()
go coor.Run()

coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
coordinator = coor
returnErr = nil
}()

waittingForReady = func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) {
<-readyCh
return coordinatorHealthChecker, coordinatorTransportMgr, coordinator, returnErr
}

return waittingForReady
return func() healthchecker.HealthChecker {
return coordinatorHealthChecker
}, func() transport.Interface {
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
return coordinator
}, returnErr
}

func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
Expand Down
33 changes: 18 additions & 15 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func NewCoordinator(
poolScopedCacheSyncManager := &poolScopedCacheSyncManager{
ctx: ctx,
proxiedClient: proxiedClient,
coordinatorClient: cfg.CoordinatorClient,
coordinatorClient: coordinatorClient,
nodeName: cfg.NodeName,
getEtcdStore: coordinator.getEtcdStore,
}
Expand All @@ -185,12 +185,10 @@ func NewCoordinator(

func (coordinator *coordinator) Run() {
for {
var poolCacheManager cachemanager.CacheManager
var cancelEtcdStorage func()
var cancelEtcdStorage = func() {}
var needUploadLocalCache bool
var needCancelEtcdStorage bool
var isPoolCacheSynced bool
var etcdStorage storage.Store
var err error

select {
Expand All @@ -213,10 +211,13 @@ func (coordinator *coordinator) Run() {
needUploadLocalCache = true
needCancelEtcdStorage = true
isPoolCacheSynced = false
etcdStorage = nil
poolCacheManager = nil
coordinator.Lock()
coordinator.poolCacheManager = nil
coordinator.Unlock()
case LeaderHub:
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.Lock()
coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.Unlock()
if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
continue
Expand Down Expand Up @@ -245,14 +246,16 @@ func (coordinator *coordinator) Run() {
coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil {
klog.Errorf("failed to upload local cache when yurthub becomes leader, %v", err)
} else {
needUploadLocalCache = false
}
}
case FollowerHub:
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.Lock()
coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.Unlock()
if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
continue
Expand All @@ -263,7 +266,7 @@ func (coordinator *coordinator) Run() {
coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil {
klog.Errorf("failed to upload local cache when yurthub becomes follower, %v", err)
} else {
needUploadLocalCache = false
Expand All @@ -276,11 +279,9 @@ func (coordinator *coordinator) Run() {
// Because the caller of IsReady() may be concurrent.
coordinator.Lock()
if needCancelEtcdStorage {
coordinator.cancelEtcdStorage()
cancelEtcdStorage()
}
coordinator.electStatus = electorStatus
coordinator.poolCacheManager = poolCacheManager
coordinator.etcdStorage = etcdStorage
coordinator.cancelEtcdStorage = cancelEtcdStorage
coordinator.needUploadLocalCache = needUploadLocalCache
coordinator.isPoolCacheSynced = isPoolCacheSynced
Expand All @@ -299,7 +300,8 @@ func (coordinator *coordinator) IsReady() (cachemanager.CacheManager, bool) {
// If electStatus is not PendingHub, it means pool-coordinator is healthy.
coordinator.Lock()
defer coordinator.Unlock()
if coordinator.electStatus != PendingHub && coordinator.isPoolCacheSynced && !coordinator.needUploadLocalCache {
// fixme: coordinator.isPoolCacheSynced now is not considered
if coordinator.electStatus != PendingHub && !coordinator.needUploadLocalCache {
return coordinator.poolCacheManager, true
}
return nil, false
Expand Down Expand Up @@ -403,7 +405,8 @@ type poolScopedCacheSyncManager struct {

func (p *poolScopedCacheSyncManager) EnsureStart() error {
if !p.isRunning {
if err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{}); err != nil {
err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete informer sync lease, %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/poolcoordinator/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewHubElector(
coordinatorClient: coordinatorClient,
coordinatorHealthChecker: coordinatorHealthChecker,
cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker,
electorStatus: make(chan int32),
electorStatus: make(chan int32, 1),
}

rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock,
Expand All @@ -75,7 +75,7 @@ func NewHubElector(
RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("yurthub of %s became lease", cfg.NodeName)
klog.Infof("yurthub of %s became leader", cfg.NodeName)
he.electorStatus <- LeaderHub
},
OnStoppedLeading: func() {
Expand Down
52 changes: 37 additions & 15 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type PoolCoordinatorProxy struct {
func NewPoolCoordinatorProxy(
poolCoordinatorAddr *url.URL,
localCacheMgr cachemanager.CacheManager,
transportMgr transport.Interface,
transportMgrGetter func() transport.Interface,
filterMgr *manager.Manager,
isCoordinatorReady func() bool,
stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) {
Expand All @@ -66,17 +66,33 @@ func NewPoolCoordinatorProxy(
stopCh: stopCh,
}

proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
stopCh)
if err != nil {
return nil, fmt.Errorf("failed to create remote proxy for pool-coordinator, %v", err)
}
go func() {
ticker := time.NewTicker(time.Second * 5)
for {
select {
case <-ticker.C:
transportMgr := transportMgrGetter()
if transportMgr == nil {
break
}
proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
stopCh)
if err != nil {
klog.Errorf("failed to create remote proxy for pool-coordinator, %v", err)
return
}

pp.poolCoordinatorProxy = proxy
klog.Infof("create remote proxy for pool-coordinator success")
return
}
}
}()

pp.poolCoordinatorProxy = proxy
return pp, nil
}

Expand Down Expand Up @@ -117,7 +133,7 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque
ctx := req.Context()
info, _ := apirequest.RequestInfoFrom(ctx)
klog.V(4).Infof("pool handle post, req=%s, reqInfo=%s", hubutil.ReqString(req), hubutil.ReqInfoString(info))
if util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req) {
if (util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req)) && pp.poolCoordinatorProxy != nil {
// kubelet needs to create subjectaccessreviews for auth
pp.poolCoordinatorProxy.ServeHTTP(rw, req)
return nil
Expand All @@ -127,15 +143,15 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque
}

func (pp *PoolCoordinatorProxy) poolQuery(rw http.ResponseWriter, req *http.Request) error {
if util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req) {
if (util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req)) && pp.poolCoordinatorProxy != nil {
pp.poolCoordinatorProxy.ServeHTTP(rw, req)
return nil
}
return fmt.Errorf("unsupported query request")
}

func (pp *PoolCoordinatorProxy) poolWatch(rw http.ResponseWriter, req *http.Request) error {
if util.IsPoolScopedResouceListWatchRequest(req) {
if util.IsPoolScopedResouceListWatchRequest(req) && pp.poolCoordinatorProxy != nil {
clientReqCtx := req.Context()
poolServeCtx, poolServeCancel := context.WithCancel(clientReqCtx)

Expand Down Expand Up @@ -243,14 +259,20 @@ func (pp *PoolCoordinatorProxy) cacheResponse(req *http.Request, resp *http.Resp
if pp.localCacheMgr.CanCacheFor(req) {
ctx := req.Context()
req = req.WithContext(ctx)
wrapPrc, _ := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager")
wrapPrc, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "cache-manager")

rc, prc := hubutil.NewDualReadCloser(req, wrapPrc, true)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
if err := pp.localCacheMgr.CacheResponse(req, prc, stopCh); err != nil {
klog.Errorf("failed to cache req %s in local cache when cluster is unhealthy, %v", hubutil.ReqString(req), err)
}
}(req, prc, ctx.Done())

// after gunzip in filter, the header content encoding should be removed.
// because there's no need to gunzip response.body again.
if needUncompressed {
resp.Header.Del("Content-Encoding")
}
resp.Body = rc
}
}
Loading

0 comments on commit 0a8659c

Please sign in to comment.