Skip to content

Commit

Permalink
Fix: add PoolScopeResource validation and dynamic configuration suppo…
Browse files Browse the repository at this point in the history
…rt (openyurtio#1148)

* Fix: add PoolScopeResource validation and dynamic configuration support
  • Loading branch information
LaurenceLiZhixin authored and Congrool committed Jan 18, 2023
1 parent 5781203 commit 735b40a
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 34 deletions.
7 changes: 0 additions & 7 deletions pkg/yurthub/poolcoordinator/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@ limitations under the License.
package constants

import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
)

var (
PoolScopedResources = map[schema.GroupVersionResource]struct{}{
{Group: "", Version: "v1", Resource: "endpoints"}: {},
{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}: {},
}

UploadResourcesKeyBuildInfo = map[storage.KeyBuildInfo]struct{}{
{Component: "kubelet", Resources: "pods", Group: "", Version: "v1"}: {},
{Component: "kubelet", Resources: "nodes", Group: "", Version: "v1"}: {},
Expand Down
47 changes: 36 additions & 11 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
Expand All @@ -47,6 +49,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/certmanager"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/etcd"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
Expand Down Expand Up @@ -169,9 +172,18 @@ func NewCoordinator(
if err != nil {
return nil, fmt.Errorf("failed to create proxied client, %v", err)
}

// init pool scope resources
resources.InitPoolScopeResourcesManger(proxiedClient, cfg.SharedFactory)

dynamicClient, err := buildDynamicClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent)
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client, %v", err)
}

poolScopedCacheSyncManager := &poolScopedCacheSyncManager{
ctx: ctx,
proxiedClient: proxiedClient,
dynamicClient: dynamicClient,
coordinatorClient: coordinatorClient,
nodeName: cfg.NodeName,
getEtcdStore: coordinator.getEtcdStore,
Expand All @@ -185,6 +197,9 @@ func NewCoordinator(
}

func (coordinator *coordinator) Run() {
// waiting for pool scope resource synced
resources.WaitUntilPoolScopeResourcesSync(coordinator.ctx)

for {
var poolCacheManager cachemanager.CacheManager
var cancelEtcdStorage = func() {}
Expand Down Expand Up @@ -398,8 +413,8 @@ func (coordinator *coordinator) delegateNodeLease(cloudLeaseClient coordclientse
type poolScopedCacheSyncManager struct {
ctx context.Context
isRunning bool
// proxiedClient is a client of Cloud APIServer which is proxied by yurthub.
proxiedClient kubernetes.Interface
// dynamicClient is a dynamic client of Cloud APIServer which is proxied by yurthub.
dynamicClient dynamic.Interface
// coordinatorClient is a client of APIServer in pool-coordinator.
coordinatorClient kubernetes.Interface
// nodeName will be used to update the ownerReference of informer synced lease.
Expand All @@ -426,18 +441,14 @@ func (p *poolScopedCacheSyncManager) EnsureStart() error {

ctx, cancel := context.WithCancel(p.ctx)
hasInformersSynced := []cache.InformerSynced{}
informerFactory := informers.NewSharedInformerFactory(p.proxiedClient, 0)
for gvr := range constants.PoolScopedResources {
dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynamicClient, 0, metav1.NamespaceAll, nil)
for _, gvr := range resources.GetPoolScopeResources() {
klog.Infof("coordinator informer with resources gvr %+v registered", gvr)
informer, err := informerFactory.ForResource(gvr)
if err != nil {
cancel()
return fmt.Errorf("failed to add informer for %s, %v", gvr.String(), err)
}
informer := dynamicInformerFactory.ForResource(gvr)
hasInformersSynced = append(hasInformersSynced, informer.Informer().HasSynced)
}

informerFactory.Start(ctx.Done())
dynamicInformerFactory.Start(ctx.Done())
go p.holdInformerSync(ctx, hasInformersSynced)
p.cancel = cancel
p.isRunning = true
Expand Down Expand Up @@ -697,3 +708,17 @@ func buildProxiedClientWithUserAgent(proxyAddr string, userAgent string) (kubern
}
return client, nil
}

func buildDynamicClientWithUserAgent(proxyAddr string, userAgent string) (dynamic.Interface, error) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(proxyAddr, "")
if err != nil {
return nil, err
}

kubeConfig.UserAgent = userAgent
client, err := dynamic.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
return client, nil
}
166 changes: 166 additions & 0 deletions pkg/yurthub/poolcoordinator/resources/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package resources

import (
"context"
"encoding/json"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

type PoolScopeResourcesManger struct {
factory informers.SharedInformerFactory
validPoolScopedResources map[string]*verifiablePoolScopeResource
validPoolScopedResourcesLock sync.RWMutex
k8sClient kubernetes.Interface
hasSynced func() bool
}

var poolScopeResourcesManger *PoolScopeResourcesManger

func InitPoolScopeResourcesManger(client kubernetes.Interface, factory informers.SharedInformerFactory) *PoolScopeResourcesManger {
poolScopeResourcesManger = &PoolScopeResourcesManger{
k8sClient: client,
validPoolScopedResources: make(map[string]*verifiablePoolScopeResource),
}
configmapInformer := factory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: poolScopeResourcesManger.addConfigmap,
// todo: now we do not support update of pool scope resources definition
})
poolScopeResourcesManger.hasSynced = configmapInformer.HasSynced

klog.Infof("init pool scope resources manager")

poolScopeResourcesManger.setVerifiableGVRs(poolScopeResourcesManger.getInitPoolScopeResources()...)
return poolScopeResourcesManger
}

func WaitUntilPoolScopeResourcesSync(ctx context.Context) {
cache.WaitForCacheSync(ctx.Done(), poolScopeResourcesManger.hasSynced)
}

func IsPoolScopeResources(info *apirequest.RequestInfo) bool {
if info == nil || poolScopeResourcesManger == nil {
return false
}
_, ok := poolScopeResourcesManger.validPoolScopedResources[schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}.String()]
return ok
}

func GetPoolScopeResources() []schema.GroupVersionResource {
if poolScopeResourcesManger == nil {
return []schema.GroupVersionResource{}
}
return poolScopeResourcesManger.getPoolScopeResources()
}

func (m *PoolScopeResourcesManger) getPoolScopeResources() []schema.GroupVersionResource {
poolScopeResources := make([]schema.GroupVersionResource, 0)
m.validPoolScopedResourcesLock.RLock()
defer m.validPoolScopedResourcesLock.RUnlock()
for _, v := range m.validPoolScopedResources {
poolScopeResources = append(poolScopeResources, v.GroupVersionResource)
}
return poolScopeResources
}

// addVerifiableGVRs add given gvrs to validPoolScopedResources map
func (m *PoolScopeResourcesManger) addVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) {
m.validPoolScopedResourcesLock.Lock()
defer m.validPoolScopedResourcesLock.Unlock()
for _, gvr := range gvrs {
if ok, errMsg := gvr.Verify(); ok {
m.validPoolScopedResources[gvr.String()] = gvr
klog.Infof("PoolScopeResourcesManger add gvr %s success", gvr.String())
} else {
klog.Warningf("PoolScopeResourcesManger add gvr %s failed, because %s", gvr.String(), errMsg)
}
}
}

// addVerifiableGVRs clear validPoolScopedResources and set given gvrs to validPoolScopedResources map
func (m *PoolScopeResourcesManger) setVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) {
m.validPoolScopedResourcesLock.Lock()
defer m.validPoolScopedResourcesLock.Unlock()
m.validPoolScopedResources = make(map[string]*verifiablePoolScopeResource)
for _, gvr := range gvrs {
if ok, errMsg := gvr.Verify(); ok {
m.validPoolScopedResources[gvr.String()] = gvr
klog.Infof("PoolScopeResourcesManger update gvr %s success", gvr.String())
} else {
klog.Warningf("PoolScopeResourcesManger update gvr %s failed, because %s", gvr.String(), errMsg)
}
}
}

func (m *PoolScopeResourcesManger) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

poolScopeResources := cfg.Data[util.PoolScopeResourcesKey]
poolScopeResourcesGVRs := make([]schema.GroupVersionResource, 0)
verifiablePoolScopeResourcesGVRs := make([]*verifiablePoolScopeResource, 0)
if err := json.Unmarshal([]byte(poolScopeResources), &poolScopeResourcesGVRs); err != nil {
klog.Errorf("PoolScopeResourcesManger unmarshal poolScopeResources %s failed with error = %s",
poolScopeResourcesGVRs, err.Error())
return
}
klog.Infof("PoolScopeResourcesManger add configured pool scope resources %v", poolScopeResourcesGVRs)
for _, v := range poolScopeResourcesGVRs {
verifiablePoolScopeResourcesGVRs = append(verifiablePoolScopeResourcesGVRs,
newVerifiablePoolScopeResource(v, m.getGroupVersionVerifyFunction(m.k8sClient)))
}
m.addVerifiableGVRs(verifiablePoolScopeResourcesGVRs...)
}

func (m *PoolScopeResourcesManger) getGroupVersionVerifyFunction(client kubernetes.Interface) func(gvr schema.GroupVersionResource) (bool, string) {
return func(gvr schema.GroupVersionResource) (bool, string) {
maxRetry := 3
duration := time.Second * 5
counter := 0
var err error
for counter <= maxRetry {
if _, err = client.Discovery().ServerResourcesForGroupVersion(gvr.GroupVersion().String()); err == nil {
return true, "" // gvr found
}
if apierrors.IsNotFound(err) {
return false, err.Error() // gvr not found
}
// unexpected error, retry
counter++
time.Sleep(duration)
}
return false, err.Error()
}
}

func (m *PoolScopeResourcesManger) getInitPoolScopeResources() []*verifiablePoolScopeResource {
return []*verifiablePoolScopeResource{
newVerifiablePoolScopeResource(
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"},
m.getGroupVersionVerifyFunction(m.k8sClient)),
newVerifiablePoolScopeResource(
schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"},
m.getGroupVersionVerifyFunction(m.k8sClient)),
newVerifiablePoolScopeResource(
schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1beta1", Resource: "endpointslices"},
m.getGroupVersionVerifyFunction(m.k8sClient)),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package resources

import "k8s.io/apimachinery/pkg/runtime/schema"

type verifiablePoolScopeResource struct {
schema.GroupVersionResource
checkFunction func(gvr schema.GroupVersionResource) (bool, string)
}

func newVerifiablePoolScopeResource(gvr schema.GroupVersionResource,
checkFunction func(gvr schema.GroupVersionResource) (bool, string)) *verifiablePoolScopeResource {
return &verifiablePoolScopeResource{
GroupVersionResource: gvr,
checkFunction: checkFunction,
}
}

func (v *verifiablePoolScopeResource) Verify() (bool, string) {
return v.checkFunction(v.GroupVersionResource)
}
1 change: 1 addition & 0 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (pp *PoolCoordinatorProxy) ServeHTTP(rw http.ResponseWriter, req *http.Requ
util.Err(errors.NewBadRequest(fmt.Sprintf("pool-coordinator proxy cannot handle request(%s), cannot get requestInfo", hubutil.ReqString(req))), rw, req)
return
}
req.Header.Del("Authorization") // delete token with cloud apiServer RBAC and use yurthub authorization
if reqInfo.IsResourceRequest {
switch reqInfo.Verb {
case "create":
Expand Down
15 changes: 2 additions & 13 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)
Expand Down Expand Up @@ -181,7 +182,7 @@ func WithIfPoolScopedResource(handler http.Handler) http.Handler {
ctx := req.Context()
if info, ok := apirequest.RequestInfoFrom(ctx); ok {
var ifPoolScopedResource bool
if info.IsResourceRequest && isPoolScopedResource(info) {
if info.IsResourceRequest && resources.IsPoolScopeResources(info) {
ifPoolScopedResource = true
}
ctx = util.WithIfPoolScopedResource(ctx, ifPoolScopedResource)
Expand All @@ -191,18 +192,6 @@ func WithIfPoolScopedResource(handler http.Handler) http.Handler {
})
}

func isPoolScopedResource(info *apirequest.RequestInfo) bool {
if info != nil {
if info.APIGroup == "" && info.APIVersion == "v1" && info.Resource == "endpoints" {
return true
}
if info.APIGroup == "discovery.k8s.io" && info.APIVersion == "v1" && info.Resource == "endpointslices" {
return true
}
}
return false
}

type wrapperResponseWriter struct {
http.ResponseWriter
http.Flusher
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/storage/etcd/keycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

coordinatorconstants "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util/fs"
)
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *componentKeyCache) Recover() error {

func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) {
keys := &keySet{m: map[storageKey]struct{}{}}
for gvr := range coordinatorconstants.PoolScopedResources {
for _, gvr := range resources.GetPoolScopeResources() {
getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout)
defer cancel()
rootKey, err := c.keyFunc(storage.KeyBuildInfo{
Expand Down
1 change: 1 addition & 0 deletions pkg/yurthub/storage/etcd/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func (s *etcdStorage) ReplaceComponentList(component string, gvr schema.GroupVer
oldKeyCache, loaded := s.localComponentKeyCache.LoadOrStore(component, newKeyCache)
addedOrUpdated = newKeyCache.Difference(keySet{})
if loaded {
// FIXME: delete keys may cause unexpected problem
deleted = oldKeyCache.Difference(newKeyCache)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/yurthub/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ const (
// DefaultPoolCoordinatorAPIServerSvcPort represents default pool coordinator apiServer port
DefaultPoolCoordinatorAPIServerSvcPort = "443"

YurtHubNamespace = "kube-system"
CacheUserAgentsKey = "cache_agents"
YurtHubNamespace = "kube-system"
CacheUserAgentsKey = "cache_agents"
PoolScopeResourcesKey = "pool_scope_resources"

YurtHubProxyPort = 10261
YurtHubPort = 10267
Expand Down

0 comments on commit 735b40a

Please sign in to comment.