Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: add PoolScopeResource validation and dynamic configuration support #1148

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/openyurt/templates/pool-coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ spec:
- --listen-client-urls=https://0.0.0.0:{{ .Values.poolCoordinator.etcdPort }}
- --cert-file=/etc/kubernetes/pki/etcd-server.crt
- --client-cert-auth=true
- --max-txn-ops=1000
- --max-txn-ops=102400
- --data-dir=/var/lib/etcd
- --max-request-bytes=100000000
- --key-file=/etc/kubernetes/pki/etcd-server.key
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/kubernetes/meta/restmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewDefaultRESTMapperFromScheme() *meta.DefaultRESTMapper {
}

func NewRESTMapperManager(baseDir string) (*RESTMapperManager, error) {
var dm map[schema.GroupVersionResource]schema.GroupVersionKind
dm := make(map[schema.GroupVersionResource]schema.GroupVersionKind)
cachedFilePath := filepath.Join(baseDir, CacheDynamicRESTMapperKey)
// Recover the mapping relationship between GVR and GVK from the hard disk
storage := fs.FileSystemOperator{}
Expand Down
7 changes: 0 additions & 7 deletions pkg/yurthub/poolcoordinator/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@ limitations under the License.
package constants

import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
)

var (
PoolScopedResources = map[schema.GroupVersionResource]struct{}{
{Group: "", Version: "v1", Resource: "endpoints"}: {},
{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}: {},
}

UploadResourcesKeyBuildInfo = map[storage.KeyBuildInfo]struct{}{
{Component: "kubelet", Resources: "pods", Group: "", Version: "v1"}: {},
{Component: "kubelet", Resources: "nodes", Group: "", Version: "v1"}: {},
Expand Down
50 changes: 38 additions & 12 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@ import (
"sync"
"time"

"k8s.io/klog/v2"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
Expand All @@ -47,6 +50,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/certmanager"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/etcd"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
Expand Down Expand Up @@ -169,9 +173,18 @@ func NewCoordinator(
if err != nil {
return nil, fmt.Errorf("failed to create proxied client, %v", err)
}

// init pool scope resources
resources.InitPoolScopeResourcesManger(proxiedClient, cfg.SharedFactory)

dynamicClient, err := buildDynamicClientWithUserAgent(fmt.Sprintf("http://%s", cfg.YurtHubProxyServerAddr), constants.DefaultPoolScopedUserAgent)
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client, %v", err)
}

poolScopedCacheSyncManager := &poolScopedCacheSyncManager{
ctx: ctx,
proxiedClient: proxiedClient,
dynamicClient: dynamicClient,
coordinatorClient: coordinatorClient,
nodeName: cfg.NodeName,
getEtcdStore: coordinator.getEtcdStore,
Expand All @@ -185,6 +198,9 @@ func NewCoordinator(
}

func (coordinator *coordinator) Run() {
// waiting for pool scope resource synced
resources.WaitUntilPoolScopeResourcesSync(coordinator.ctx)

for {
var poolCacheManager cachemanager.CacheManager
var cancelEtcdStorage = func() {}
Expand Down Expand Up @@ -398,8 +414,8 @@ func (coordinator *coordinator) delegateNodeLease(cloudLeaseClient coordclientse
type poolScopedCacheSyncManager struct {
ctx context.Context
isRunning bool
// proxiedClient is a client of Cloud APIServer which is proxied by yurthub.
proxiedClient kubernetes.Interface
// dynamicClient is a dynamic client of Cloud APIServer which is proxied by yurthub.
dynamicClient dynamic.Interface
// coordinatorClient is a client of APIServer in pool-coordinator.
coordinatorClient kubernetes.Interface
// nodeName will be used to update the ownerReference of informer synced lease.
Expand All @@ -426,18 +442,14 @@ func (p *poolScopedCacheSyncManager) EnsureStart() error {

ctx, cancel := context.WithCancel(p.ctx)
hasInformersSynced := []cache.InformerSynced{}
informerFactory := informers.NewSharedInformerFactory(p.proxiedClient, 0)
for gvr := range constants.PoolScopedResources {
dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynamicClient, 0, metav1.NamespaceAll, nil)
for _, gvr := range resources.GetPoolScopeResources() {
klog.Infof("coordinator informer with resources gvr %+v registered", gvr)
informer, err := informerFactory.ForResource(gvr)
if err != nil {
cancel()
return fmt.Errorf("failed to add informer for %s, %v", gvr.String(), err)
}
informer := dynamicInformerFactory.ForResource(gvr)
hasInformersSynced = append(hasInformersSynced, informer.Informer().HasSynced)
}

informerFactory.Start(ctx.Done())
dynamicInformerFactory.Start(ctx.Done())
go p.holdInformerSync(ctx, hasInformersSynced)
p.cancel = cancel
p.isRunning = true
Expand Down Expand Up @@ -697,3 +709,17 @@ func buildProxiedClientWithUserAgent(proxyAddr string, userAgent string) (kubern
}
return client, nil
}

func buildDynamicClientWithUserAgent(proxyAddr string, userAgent string) (dynamic.Interface, error) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(proxyAddr, "")
if err != nil {
return nil, err
}

kubeConfig.UserAgent = userAgent
client, err := dynamic.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
return client, nil
}
167 changes: 167 additions & 0 deletions pkg/yurthub/poolcoordinator/resources/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package resources

import (
"context"
"encoding/json"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

type PoolScopeResourcesManger struct {
factory informers.SharedInformerFactory
validPoolScopedResources map[string]*verifiablePoolScopeResource
validPoolScopedResourcesLock sync.RWMutex
k8sClient kubernetes.Interface
hasSynced func() bool
}

var poolScopeResourcesManger *PoolScopeResourcesManger

func InitPoolScopeResourcesManger(client kubernetes.Interface, factory informers.SharedInformerFactory) *PoolScopeResourcesManger {
poolScopeResourcesManger = &PoolScopeResourcesManger{
factory: factory,
k8sClient: client,
validPoolScopedResources: make(map[string]*verifiablePoolScopeResource),
}
configmapInformer := factory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: poolScopeResourcesManger.addConfigmap,
// todo: now we do not support update of pool scope resources definition
})
poolScopeResourcesManger.hasSynced = configmapInformer.HasSynced

klog.Infof("init pool scope resources manager")

poolScopeResourcesManger.setVerifiableGVRs(poolScopeResourcesManger.getInitPoolScopeResources()...)
return poolScopeResourcesManger
}

func WaitUntilPoolScopeResourcesSync(ctx context.Context) {
cache.WaitForCacheSync(ctx.Done(), poolScopeResourcesManger.hasSynced)
}

func IsPoolScopeResources(info *apirequest.RequestInfo) bool {
if info == nil || poolScopeResourcesManger == nil {
return false
}
_, ok := poolScopeResourcesManger.validPoolScopedResources[schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}.String()]
return ok
}

func GetPoolScopeResources() []schema.GroupVersionResource {
if poolScopeResourcesManger == nil {
return []schema.GroupVersionResource{}
}
return poolScopeResourcesManger.getPoolScopeResources()
}

func (m *PoolScopeResourcesManger) getPoolScopeResources() []schema.GroupVersionResource {
poolScopeResources := make([]schema.GroupVersionResource, 0)
m.validPoolScopedResourcesLock.RLock()
defer m.validPoolScopedResourcesLock.RUnlock()
for _, v := range m.validPoolScopedResources {
poolScopeResources = append(poolScopeResources, v.GroupVersionResource)
}
return poolScopeResources
}

// addVerifiableGVRs add given gvrs to validPoolScopedResources map
func (m *PoolScopeResourcesManger) addVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) {
m.validPoolScopedResourcesLock.Lock()
defer m.validPoolScopedResourcesLock.Unlock()
for _, gvr := range gvrs {
if ok, errMsg := gvr.Verify(); ok {
m.validPoolScopedResources[gvr.String()] = gvr
klog.Infof("PoolScopeResourcesManger add gvr %s success", gvr.String())
} else {
klog.Warningf("PoolScopeResourcesManger add gvr %s failed, because %s", gvr.String(), errMsg)
}
}
}

// addVerifiableGVRs clear validPoolScopedResources and set given gvrs to validPoolScopedResources map
func (m *PoolScopeResourcesManger) setVerifiableGVRs(gvrs ...*verifiablePoolScopeResource) {
m.validPoolScopedResourcesLock.Lock()
defer m.validPoolScopedResourcesLock.Unlock()
m.validPoolScopedResources = make(map[string]*verifiablePoolScopeResource)
for _, gvr := range gvrs {
if ok, errMsg := gvr.Verify(); ok {
m.validPoolScopedResources[gvr.String()] = gvr
klog.Infof("PoolScopeResourcesManger update gvr %s success", gvr.String())
} else {
klog.Warningf("PoolScopeResourcesManger update gvr %s failed, because %s", gvr.String(), errMsg)
}
}
}

func (m *PoolScopeResourcesManger) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

poolScopeResources := cfg.Data[util.PoolScopeResourcesKey]
poolScopeResourcesGVRs := make([]schema.GroupVersionResource, 0)
verifiablePoolScopeResourcesGVRs := make([]*verifiablePoolScopeResource, 0)
if err := json.Unmarshal([]byte(poolScopeResources), &poolScopeResourcesGVRs); err != nil {
klog.Errorf("PoolScopeResourcesManger unmarshal poolScopeResources %s failed with error = %s",
poolScopeResourcesGVRs, err.Error())
return
}
klog.Infof("PoolScopeResourcesManger add configured pool scope resources %v", poolScopeResourcesGVRs)
for _, v := range poolScopeResourcesGVRs {
verifiablePoolScopeResourcesGVRs = append(verifiablePoolScopeResourcesGVRs,
newVerifiablePoolScopeResource(v, m.getGroupVersionVerifyFunction(m.k8sClient)))
}
m.addVerifiableGVRs(verifiablePoolScopeResourcesGVRs...)
}

func (m *PoolScopeResourcesManger) getGroupVersionVerifyFunction(client kubernetes.Interface) func(gvr schema.GroupVersionResource) (bool, string) {
return func(gvr schema.GroupVersionResource) (bool, string) {
maxRetry := 3
duration := time.Second * 5
counter := 0
var err error
for counter <= maxRetry {
if _, err = client.Discovery().ServerResourcesForGroupVersion(gvr.GroupVersion().String()); err == nil {
return true, "" // gvr found
}
if apierrors.IsNotFound(err) {
return false, err.Error() // gvr not found
}
// unexpected error, retry
counter++
time.Sleep(duration)
}
return false, err.Error()
}
}

func (m *PoolScopeResourcesManger) getInitPoolScopeResources() []*verifiablePoolScopeResource {
return []*verifiablePoolScopeResource{
newVerifiablePoolScopeResource(
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"},
m.getGroupVersionVerifyFunction(m.k8sClient)),
newVerifiablePoolScopeResource(
schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"},
m.getGroupVersionVerifyFunction(m.k8sClient)),
newVerifiablePoolScopeResource(
schema.GroupVersionResource{Group: "discovery.k8s.io", Version: "v1beta1", Resource: "endpointslices"},
m.getGroupVersionVerifyFunction(m.k8sClient)),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package resources

import "k8s.io/apimachinery/pkg/runtime/schema"

type verifiablePoolScopeResource struct {
schema.GroupVersionResource
checkFunction func(gvr schema.GroupVersionResource) (bool, string)
}

func newVerifiablePoolScopeResource(gvr schema.GroupVersionResource,
checkFunction func(gvr schema.GroupVersionResource) (bool, string)) *verifiablePoolScopeResource {
return &verifiablePoolScopeResource{
GroupVersionResource: gvr,
checkFunction: checkFunction,
}
}

func (v *verifiablePoolScopeResource) Verify() (bool, string) {
return v.checkFunction(v.GroupVersionResource)
}
15 changes: 2 additions & 13 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)
Expand Down Expand Up @@ -181,7 +182,7 @@ func WithIfPoolScopedResource(handler http.Handler) http.Handler {
ctx := req.Context()
if info, ok := apirequest.RequestInfoFrom(ctx); ok {
var ifPoolScopedResource bool
if info.IsResourceRequest && isPoolScopedResource(info) {
if info.IsResourceRequest && resources.IsPoolScopeResources(info) {
ifPoolScopedResource = true
}
ctx = util.WithIfPoolScopedResource(ctx, ifPoolScopedResource)
Expand All @@ -191,18 +192,6 @@ func WithIfPoolScopedResource(handler http.Handler) http.Handler {
})
}

func isPoolScopedResource(info *apirequest.RequestInfo) bool {
if info != nil {
if info.APIGroup == "" && info.APIVersion == "v1" && info.Resource == "endpoints" {
return true
}
if info.APIGroup == "discovery.k8s.io" && info.APIVersion == "v1" && info.Resource == "endpointslices" {
return true
}
}
return false
}

type wrapperResponseWriter struct {
http.ResponseWriter
http.Flusher
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/storage/etcd/keycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

coordinatorconstants "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/resources"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util/fs"
)
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *componentKeyCache) Recover() error {

func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) {
keys := &keySet{m: map[storageKey]struct{}{}}
for gvr := range coordinatorconstants.PoolScopedResources {
for _, gvr := range resources.GetPoolScopeResources() {
getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout)
defer cancel()
rootKey, err := c.keyFunc(storage.KeyBuildInfo{
Expand Down
Loading