-
Notifications
You must be signed in to change notification settings - Fork 407
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix: add PoolScopeResource validation and dynamic configuration suppo…
…rt (#1148) * Fix: add PoolScopeResource validation and dynamic configuration support
- Loading branch information
1 parent
9c0df52
commit 803e233
Showing
11 changed files
with
233 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
package resources | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"sync" | ||
"time" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
apirequest "k8s.io/apiserver/pkg/endpoints/request" | ||
"k8s.io/client-go/informers" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/cache" | ||
"k8s.io/klog/v2" | ||
|
||
"github.com/openyurtio/openyurt/pkg/yurthub/util" | ||
) | ||
|
||
type PoolScopeResourcesManger struct { | ||
factory informers.SharedInformerFactory | ||
validPoolScopedResources map[string]*verifiablePoolScopeResource | ||
validPoolScopedResourcesLock sync.RWMutex | ||
k8sClient kubernetes.Interface | ||
hasSynced func() bool | ||
} | ||
|
||
var poolScopeResourcesManger *PoolScopeResourcesManger | ||
|
||
func InitPoolScopeResourcesManger(client kubernetes.Interface, factory informers.SharedInformerFactory) *PoolScopeResourcesManger { | ||
poolScopeResourcesManger = &PoolScopeResourcesManger{ | ||
k8sClient: client, | ||
validPoolScopedResources: make(map[string]*verifiablePoolScopeResource), | ||
} | ||
configmapInformer := factory.Core().V1().ConfigMaps().Informer() | ||
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
AddFunc: poolScopeResourcesManger.addConfigmap, | ||
// todo: now we do not support update of pool scope resources definition | ||
}) | ||
poolScopeResourcesManger.hasSynced = configmapInformer.HasSynced | ||
|
||
klog.Infof("init pool scope resources manager") | ||
|
||
poolScopeResourcesManger.setVerifiableGVRs(poolScopeResourcesManger.getInitPoolScopeResources()...) | ||
return poolScopeResourcesManger | ||
} | ||
|
||
func WaitUntilPoolScopeResourcesSync(ctx context.Context) { | ||
cache.WaitForCacheSync(ctx.Done(), poolScopeResourcesManger.hasSynced) | ||
} | ||
|
||
func IsPoolScopeResources(info *apirequest.RequestInfo) bool { | ||
if info == nil || poolScopeResourcesManger == nil { | ||
return false | ||
} | ||
_, ok := poolScopeResourcesManger.validPoolScopedResources[schema.GroupVersionResource{ | ||
Group: info.APIGroup, | ||
Version: info.APIVersion, | ||
Resource: info.Resource, | ||
}.String()] | ||
return ok | ||
} | ||
|
||
func GetPoolScopeResources() []schema.GroupVersionResource { | ||
if poolScopeResourcesManger == nil { | ||
return []schema.GroupVersionResource{} | ||
} | ||
return poolScopeResourcesManger.getPoolScopeResources() | ||
} | ||
|
||
func (m *PoolScopeResourcesManger) getPoolScopeResources() []schema.GroupVersionResource { | ||
poolScopeResources := make([]schema.GroupVersionResource, 0) | ||
m.validPoolScopedResourcesLock.RLock() | ||
defer m.validPoolScopedResourcesLock.RUnlock() | ||
for _, v := range m.validPoolScopedResources { | ||
poolScopeResources = append(poolScopeResources, v.GroupVersionResource) | ||
} | ||
return poolScopeResources | ||
} | ||
|
||
// addVerifiableGVRs add given gvrs to validPoolScopedResources map | ||
func (m *PoolScopeResourcesManger) addVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) { | ||
m.validPoolScopedResourcesLock.Lock() | ||
defer m.validPoolScopedResourcesLock.Unlock() | ||
for _, gvr := range gvrs { | ||
if ok, errMsg := gvr.Verify(); ok { | ||
m.validPoolScopedResources[gvr.String()] = gvr | ||
klog.Infof("PoolScopeResourcesManger add gvr %s success", gvr.String()) | ||
} else { | ||
klog.Warningf("PoolScopeResourcesManger add gvr %s failed, because %s", gvr.String(), errMsg) | ||
} | ||
} | ||
} | ||
|
||
// addVerifiableGVRs clear validPoolScopedResources and set given gvrs to validPoolScopedResources map | ||
func (m *PoolScopeResourcesManger) setVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) { | ||
m.validPoolScopedResourcesLock.Lock() | ||
defer m.validPoolScopedResourcesLock.Unlock() | ||
m.validPoolScopedResources = make(map[string]*verifiablePoolScopeResource) | ||
for _, gvr := range gvrs { | ||
if ok, errMsg := gvr.Verify(); ok { | ||
m.validPoolScopedResources[gvr.String()] = gvr | ||
klog.Infof("PoolScopeResourcesManger update gvr %s success", gvr.String()) | ||
} else { | ||
klog.Warningf("PoolScopeResourcesManger update gvr %s failed, because %s", gvr.String(), errMsg) | ||
} | ||
} | ||
} | ||
|
||
func (m *PoolScopeResourcesManger) addConfigmap(obj interface{}) { | ||
cfg, ok := obj.(*corev1.ConfigMap) | ||
if !ok { | ||
return | ||
} | ||
|
||
poolScopeResources := cfg.Data[util.PoolScopeResourcesKey] | ||
poolScopeResourcesGVRs := make([]schema.GroupVersionResource, 0) | ||
verifiablePoolScopeResourcesGVRs := make([]*verifiablePoolScopeResource, 0) | ||
if err := json.Unmarshal([]byte(poolScopeResources), &poolScopeResourcesGVRs); err != nil { | ||
klog.Errorf("PoolScopeResourcesManger unmarshal poolScopeResources %s failed with error = %s", | ||
poolScopeResourcesGVRs, err.Error()) | ||
return | ||
} | ||
klog.Infof("PoolScopeResourcesManger add configured pool scope resources %v", poolScopeResourcesGVRs) | ||
for _, v := range poolScopeResourcesGVRs { | ||
verifiablePoolScopeResourcesGVRs = append(verifiablePoolScopeResourcesGVRs, | ||
newVerifiablePoolScopeResource(v, m.getGroupVersionVerifyFunction(m.k8sClient))) | ||
} | ||
m.addVerifiableGVRs(verifiablePoolScopeResourcesGVRs...) | ||
} | ||
|
||
func (m *PoolScopeResourcesManger) getGroupVersionVerifyFunction(client kubernetes.Interface) func(gvr schema.GroupVersionResource) (bool, string) { | ||
return func(gvr schema.GroupVersionResource) (bool, string) { | ||
maxRetry := 3 | ||
duration := time.Second * 5 | ||
counter := 0 | ||
var err error | ||
for counter <= maxRetry { | ||
if _, err = client.Discovery().ServerResourcesForGroupVersion(gvr.GroupVersion().String()); err == nil { | ||
return true, "" // gvr found | ||
} | ||
if apierrors.IsNotFound(err) { | ||
return false, err.Error() // gvr not found | ||
} | ||
// unexpected error, retry | ||
counter++ | ||
time.Sleep(duration) | ||
} | ||
return false, err.Error() | ||
} | ||
} | ||
|
||
func (m *PoolScopeResourcesManger) getInitPoolScopeResources() []*verifiablePoolScopeResource { | ||
return []*verifiablePoolScopeResource{ | ||
newVerifiablePoolScopeResource( | ||
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"}, | ||
m.getGroupVersionVerifyFunction(m.k8sClient)), | ||
newVerifiablePoolScopeResource( | ||
schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, | ||
m.getGroupVersionVerifyFunction(m.k8sClient)), | ||
newVerifiablePoolScopeResource( | ||
schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1beta1", Resource: "endpointslices"}, | ||
m.getGroupVersionVerifyFunction(m.k8sClient)), | ||
} | ||
} |
20 changes: 20 additions & 0 deletions
20
pkg/yurthub/poolcoordinator/resources/verifiable_pool_scope_resource.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package resources | ||
|
||
import "k8s.io/apimachinery/pkg/runtime/schema" | ||
|
||
type verifiablePoolScopeResource struct { | ||
schema.GroupVersionResource | ||
checkFunction func(gvr schema.GroupVersionResource) (bool, string) | ||
} | ||
|
||
func newVerifiablePoolScopeResource(gvr schema.GroupVersionResource, | ||
checkFunction func(gvr schema.GroupVersionResource) (bool, string)) *verifiablePoolScopeResource { | ||
return &verifiablePoolScopeResource{ | ||
GroupVersionResource: gvr, | ||
checkFunction: checkFunction, | ||
} | ||
} | ||
|
||
func (v *verifiablePoolScopeResource) Verify() (bool, string) { | ||
return v.checkFunction(v.GroupVersionResource) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.