Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix coordinator sync problem and update ready condition #1142

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ func NewCoordinator(

func (coordinator *coordinator) Run() {
for {
var poolCacheManager cachemanager.CacheManager
var cancelEtcdStorage = func() {}
var needUploadLocalCache bool
var needCancelEtcdStorage bool
var isPoolCacheSynced bool
var etcdStorage storage.Store
var err error

select {
Expand All @@ -211,13 +213,10 @@ func (coordinator *coordinator) Run() {
needUploadLocalCache = true
needCancelEtcdStorage = true
isPoolCacheSynced = false
coordinator.Lock()
coordinator.poolCacheManager = nil
coordinator.Unlock()
etcdStorage = nil
poolCacheManager = nil
case LeaderHub:
coordinator.Lock()
coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.Unlock()
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
continue
Expand Down Expand Up @@ -246,16 +245,14 @@ func (coordinator *coordinator) Run() {
coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
klog.Errorf("failed to upload local cache when yurthub becomes leader, %v", err)
} else {
needUploadLocalCache = false
}
}
case FollowerHub:
coordinator.Lock()
coordinator.poolCacheManager, coordinator.etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
coordinator.Unlock()
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
continue
Expand All @@ -266,7 +263,7 @@ func (coordinator *coordinator) Run() {
coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
if err := coordinator.uploadLocalCache(coordinator.etcdStorage); err != nil {
if err := coordinator.uploadLocalCache(etcdStorage); err != nil {
klog.Errorf("failed to upload local cache when yurthub becomes follower, %v", err)
} else {
needUploadLocalCache = false
Expand All @@ -282,6 +279,8 @@ func (coordinator *coordinator) Run() {
cancelEtcdStorage()
}
coordinator.electStatus = electorStatus
coordinator.poolCacheManager = poolCacheManager
coordinator.etcdStorage = etcdStorage
coordinator.cancelEtcdStorage = cancelEtcdStorage
coordinator.needUploadLocalCache = needUploadLocalCache
coordinator.isPoolCacheSynced = isPoolCacheSynced
Expand All @@ -300,8 +299,7 @@ func (coordinator *coordinator) IsReady() (cachemanager.CacheManager, bool) {
// If electStatus is not PendingHub, it means pool-coordinator is healthy.
coordinator.Lock()
defer coordinator.Unlock()
// fixme: coordinator.isPoolCacheSynced now is not considered
if coordinator.electStatus != PendingHub && !coordinator.needUploadLocalCache {
if coordinator.electStatus != PendingHub && coordinator.isPoolCacheSynced && !coordinator.needUploadLocalCache {
return coordinator.poolCacheManager, true
}
return nil, false
Expand Down