Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: pool-coordinator #1126

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ type YurtHubConfiguration struct {
CoordinatorServerURL *url.URL
CoordinatorStoragePrefix string
CoordinatorStorageAddr string // ip:port
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

Expand Down
51 changes: 25 additions & 26 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,40 +180,38 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
tenantMgr := tenant.New(cfg.YurtHubCertOrganizations, cfg.SharedFactory, ctx.Done())
trace++

var coordinator poolcoordinator.Coordinator = &poolcoordinator.FakeCoordinator{}
var coordinatorHealthChecker healthchecker.HealthChecker = healthchecker.NewFakeChecker(false, make(map[string]int))
var coordinatorTransportManager transport.Interface = nil
var waittingForCoordinator func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) = nil
var coordinatorGetter func() poolcoordinator.Coordinator
var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = nil

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer.
waittingForCoordinator = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker)
trace++
}

// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.YurtSharedFactory.Start(ctx.Done())

if waittingForCoordinator != nil {
// Waitting for the coordinator to run, before using it to create other components.
coordinatorHealthChecker, coordinatorTransportManager, coordinator, err = waittingForCoordinator()
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker)
if err != nil {
return fmt.Errorf("failed to wait for coordinator to run, %v", err)
}
}

// wait for async coordinator informer registry
time.Sleep(time.Second * 5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not a good way to use time.Sleep to sync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I'll change it to notification.

// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.YurtSharedFactory.Start(ctx.Done())

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
transportManager,
cloudHealthChecker,
tenantMgr,
coordinator,
coordinatorTransportManager,
coordinatorHealthChecker,
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand Down Expand Up @@ -270,7 +268,7 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi
func coordinatorRun(ctx context.Context,
cfg *config.YurtHubConfiguration,
restConfigMgr *hubrest.RestConfigManager,
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (waittingForReady func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error)) {
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) {
var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
Expand All @@ -297,7 +295,7 @@ func coordinatorRun(ctx context.Context,
coordinatorClient, err := kubernetes.NewForConfig(&rest.Config{
Host: cfg.CoordinatorServerURL.String(),
Transport: coorTransportMgr.CurrentTransport(),
Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds),
Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second,
})
if err != nil {
returnErr = fmt.Errorf("failed to get coordinator client for pool coordinator, %v", err)
Expand All @@ -316,27 +314,28 @@ func coordinatorRun(ctx context.Context,
returnErr = fmt.Errorf("failed to create hub elector, %v", err)
return
}
elector.Run(ctx.Done())
go elector.Run(ctx.Done())

coor, err := poolcoordinator.NewCoordinator(ctx, cfg, restConfigMgr, coorCertManager, coorTransportMgr, elector)
if err != nil {
returnErr = fmt.Errorf("failed to create coordinator, %v", err)
return
}
coor.Run()
go coor.Run()

coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
coordinator = coor
returnErr = nil
}()

waittingForReady = func() (healthchecker.HealthChecker, transport.Interface, poolcoordinator.Coordinator, error) {
<-readyCh
return coordinatorHealthChecker, coordinatorTransportMgr, coordinator, returnErr
}

return waittingForReady
return func() healthchecker.HealthChecker {
return coordinatorHealthChecker
}, func() transport.Interface {
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
return coordinator
}, returnErr
}

func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/yurthub/certificate/hubself/cert_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (ycm *yurtHubCertManager) initCaCert() error {
klog.Infof("%s file not exists, so create it", caFile)
}

insecureRestConfig, err := createInsecureRestClientConfig(ycm.remoteServers[0])
insecureRestConfig, err := CreateInsecureRestClientConfig(ycm.remoteServers[0])
if err != nil {
klog.Errorf("could not create insecure rest config, %v", err)
return err
Expand Down Expand Up @@ -524,8 +524,8 @@ func createBasic(apiServerAddr string, caCert []byte) *clientcmdapi.Config {
}
}

// createInsecureRestClientConfig create insecure rest client config.
func createInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config, error) {
// CreateInsecureRestClientConfig create insecure rest client config.
func CreateInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we expose the function?
I don't find anywhere that uses it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it.

if remoteServer == nil {
return nil, fmt.Errorf("no healthy remote server")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/certificate/hubself/cert_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func Test_createInsecureRestClientConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := createInsecureRestClientConfig(tt.args.remoteServer)
_, err := CreateInsecureRestClientConfig(tt.args.remoteServer)
if (err != nil) != tt.wantErr {
t.Errorf("createInsecureRestClientConfig() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("CreateInsecureRestClientConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
Expand Down
1 change: 1 addition & 0 deletions pkg/yurthub/filter/approver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var supportedResourceAndVerbsForFilter = map[string]map[string]sets.String{
},
ServiceTopologyFilterName: {
"endpoints": sets.NewString("list", "watch"),
"pods": sets.NewString("list", "watch"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to add this entry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it.

"endpointslices": sets.NewString("list", "watch"),
},
}
Expand Down
27 changes: 12 additions & 15 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewCoordinator(
poolScopedCacheSyncManager := &poolScopedCacheSyncManager{
ctx: ctx,
proxiedClient: proxiedClient,
coordinatorClient: cfg.CoordinatorClient,
coordinatorClient: coordinatorClient,
nodeName: cfg.NodeName,
getEtcdStore: coordinator.getEtcdStore,
}
Expand All @@ -175,12 +175,10 @@ func NewCoordinator(

func (coordinator *coordinator) Run() {
for {
var poolCacheManager cachemanager.CacheManager
var cancelEtcdStorage func()
var cancelEtcdStorage = func() {}
var needUploadLocalCache bool
var needCancelEtcdStorage bool
var isPoolCacheSynced bool
var etcdStorage storage.Store
var err error

select {
Expand All @@ -203,10 +201,9 @@ func (coordinator *coordinator) Run() {
needUploadLocalCache = true
needCancelEtcdStorage = true
isPoolCacheSynced = false
etcdStorage = nil
poolCacheManager = nil
coordinator.poolCacheManager = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not update the coordinator field if we do not acquire lock, since these fields are used and exposed by IsReady and IsHealthy function, which may be called concurrently.

For example, we assume that coordinator is healthy when coordinator.electStatus != PendingHub, at which time the coordinator.poolCacheManager should also not be nil. Thus the condition coordinator.electStatus != PendingHub && coordinator.poolCacheManager != nil is a defined situation. And the caller of IsHealthy will always get non-nil poolCacheManager when it find the returned bool value of IsHealthy is true.

However, if we update coordinator.poolCacheManager here. The caller of IsHealthy may find that the poolcoordinator is healthy, but got a nil poolCacheManager, which situation is undefined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add lock here

case LeaderHub:
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
continue
Expand Down Expand Up @@ -249,14 +246,14 @@ func (coordinator *coordinator) Run() {
})

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil {
klog.Errorf("failed to upload local cache when yurthub becomes leader, %v", err)
} else {
needUploadLocalCache = false
}
}
case FollowerHub:
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
continue
Expand All @@ -280,7 +277,7 @@ func (coordinator *coordinator) Run() {
})

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil {
klog.Errorf("failed to upload local cache when yurthub becomes follower, %v", err)
} else {
needUploadLocalCache = false
Expand All @@ -293,11 +290,9 @@ func (coordinator *coordinator) Run() {
// Because the caller of IsReady() may be concurrent.
coordinator.Lock()
if needCancelEtcdStorage {
coordinator.cancelEtcdStorage()
cancelEtcdStorage()
}
coordinator.electStatus = electorStatus
coordinator.poolCacheManager = poolCacheManager
coordinator.etcdStorage = etcdStorage
coordinator.cancelEtcdStorage = cancelEtcdStorage
coordinator.needUploadLocalCache = needUploadLocalCache
coordinator.isPoolCacheSynced = isPoolCacheSynced
Expand All @@ -316,7 +311,8 @@ func (coordinator *coordinator) IsReady() (cachemanager.CacheManager, bool) {
// If electStatus is not PendingHub, it means pool-coordinator is healthy.
coordinator.Lock()
defer coordinator.Unlock()
if coordinator.electStatus != PendingHub && coordinator.isPoolCacheSynced && !coordinator.needUploadLocalCache {
// fixme: coordinator.isPoolCacheSynced now is not considered
if coordinator.electStatus != PendingHub && !coordinator.needUploadLocalCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to remove the isPoolCacheSynced?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now isPoolCacheSynced is never set to 'true'.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acctually we need isPoolCacheSynced. I'll submit a pr to fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by #1131

return coordinator.poolCacheManager, true
}
return nil, false
Expand Down Expand Up @@ -430,7 +426,8 @@ type poolScopedCacheSyncManager struct {

func (p *poolScopedCacheSyncManager) EnsureStart() error {
if !p.isRunning {
if err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{}); err != nil {
err := p.coordinatorClient.CoordinationV1().Leases(namespaceInformerLease).Delete(p.ctx, nameInformerLease, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete informer sync lease, %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/poolcoordinator/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewHubElector(
coordinatorClient: coordinatorClient,
coordinatorHealthChecker: coordinatorHealthChecker,
cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker,
electorStatus: make(chan int32),
electorStatus: make(chan int32, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to set the capability of electorStatus as 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving he.electorStatus <- PendingHub to the begining of Run()?

}

rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock,
Expand All @@ -75,7 +75,7 @@ func NewHubElector(
RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("yurthub of %s became lease", cfg.NodeName)
klog.Infof("yurthub of %s became leader", cfg.NodeName)
he.electorStatus <- LeaderHub
},
OnStoppedLeading: func() {
Expand Down
44 changes: 30 additions & 14 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type PoolCoordinatorProxy struct {
func NewPoolCoordinatorProxy(
poolCoordinatorAddr *url.URL,
localCacheMgr cachemanager.CacheManager,
transportMgr transport.Interface,
transportMgrGetter func() transport.Interface,
filterMgr *manager.Manager,
isCoordinatorReady func() bool,
stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) {
Expand All @@ -66,17 +66,33 @@ func NewPoolCoordinatorProxy(
stopCh: stopCh,
}

proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
stopCh)
if err != nil {
return nil, fmt.Errorf("failed to create remote proxy for pool-coordinator, %v", err)
}
go func() {
ticker := time.NewTicker(time.Second * 5)
for {
select {
case <-ticker.C:
transportMgr := transportMgrGetter()
if transportMgr == nil {
break
}
proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
stopCh)
if err != nil {
klog.Errorf("failed to create remote proxy for pool-coordinator, %v", err)
return
}

pp.poolCoordinatorProxy = proxy
klog.Infof("create remote proxy for pool-coordinator success")
return
}
}
}()

pp.poolCoordinatorProxy = proxy
return pp, nil
}

Expand Down Expand Up @@ -117,7 +133,7 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque
ctx := req.Context()
info, _ := apirequest.RequestInfoFrom(ctx)
klog.V(4).Infof("pool handle post, req=%s, reqInfo=%s", hubutil.ReqString(req), hubutil.ReqInfoString(info))
if util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req) {
if (util.IsSubjectAccessReviewCreateGetRequest(req) || util.IsEventCreateRequest(req)) && pp.poolCoordinatorProxy != nil {
// kubelet needs to create subjectaccessreviews for auth
pp.poolCoordinatorProxy.ServeHTTP(rw, req)
return nil
Expand All @@ -127,15 +143,15 @@ func (pp *PoolCoordinatorProxy) poolPost(rw http.ResponseWriter, req *http.Reque
}

func (pp *PoolCoordinatorProxy) poolQuery(rw http.ResponseWriter, req *http.Request) error {
if util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req) {
if (util.IsPoolScopedResouceListWatchRequest(req) || util.IsSubjectAccessReviewCreateGetRequest(req)) && pp.poolCoordinatorProxy != nil {
pp.poolCoordinatorProxy.ServeHTTP(rw, req)
return nil
}
return fmt.Errorf("unsupported query request")
}

func (pp *PoolCoordinatorProxy) poolWatch(rw http.ResponseWriter, req *http.Request) error {
if util.IsPoolScopedResouceListWatchRequest(req) {
if util.IsPoolScopedResouceListWatchRequest(req) && pp.poolCoordinatorProxy != nil {
clientReqCtx := req.Context()
poolServeCtx, poolServeCancel := context.WithCancel(clientReqCtx)

Expand Down
Loading