Skip to content

Commit

Permalink
refactor yurthub cache to adapt different storages (#882)
Browse files Browse the repository at this point in the history
* refactor yurthub in order to use gernal backend storage

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>

* add unit tests for fs

* add unit tests for disk storage

* revise ut to compatiable with new disk storage

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>

* fix bug: cannot query list with name fieldSelector

* fix bug: failed to list csidrivers and runtimeclasses

* fix bug: cannot create lease in storage

* fix bug: failed to list/watch when offline

* nonResourceRequestWrapper refactor to get api-resources and api-versions

* enhancement: distinguish resources of same name but in different GroupVersions

* fix bug: return empty ListObj if resource is not found when handling list request with metadata.name fieldSelector

* decouple cachemanager with key.IsRootKey()

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>

* use t.Run to run unit test

* modify sub-interface name of store

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>

* ensure compability with older version

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>

Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>
  • Loading branch information
Congrool authored Nov 21, 2022
1 parent 775d69d commit b3b35aa
Show file tree
Hide file tree
Showing 43 changed files with 4,756 additions and 1,889 deletions.
10 changes: 7 additions & 3 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtcorev1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
Expand Down Expand Up @@ -108,14 +108,18 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
}

storageManager, err := factory.CreateStorage(options.DiskCachePath)
storageManager, err := disk.NewDiskStorage(options.DiskCachePath)
if err != nil {
klog.Errorf("could not create storage manager, %v", err)
return nil, err
}
storageWrapper := cachemanager.NewStorageWrapper(storageManager)
serializerManager := serializer.NewSerializerManager()
restMapperManager := meta.NewRESTMapperManager(storageManager)
restMapperManager, err := meta.NewRESTMapperManager(options.DiskCachePath)
if err != nil {
klog.Errorf("failed to create restMapperManager at path %s, %v", options.DiskCachePath, err)
return nil, err
}

hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort)
proxyServerAddr := net.JoinHostPort(options.YurtHubProxyHost, options.YurtHubProxyPort)
Expand Down
5 changes: 1 addition & 4 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %w", err)
}
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
Expand Down
78 changes: 47 additions & 31 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package cachemanager

import (
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

Expand All @@ -31,32 +33,43 @@ const (
sepForAgent = ","
)

func (cm *cacheManager) initCacheAgents() error {
if cm.sharedFactory == nil {
return nil
type CacheAgent struct {
sync.Mutex
agents sets.String
store StorageWrapper
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.NewString(util.DefaultCacheAgents...),
store: store,
}
configmapInformer := cm.sharedFactory.Core().V1().ConfigMaps().Informer()
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cm.addConfigmap,
UpdateFunc: cm.updateConfigmap,
DeleteFunc: cm.deleteConfigmap,
AddFunc: ca.addConfigmap,
UpdateFunc: ca.updateConfigmap,
DeleteFunc: ca.deleteConfigmap,
})

klog.Infof("init cache agents to %v", cm.cacheAgents)
return nil
klog.Infof("init cache agents to %v", ca.agents)
return ca
}

func (cm *cacheManager) addConfigmap(obj interface{}) {
func (ca *CacheAgent) HasAny(items ...string) bool {
return ca.agents.HasAny(items...)
}

func (ca *CacheAgent) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.updateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
deletedAgents := ca.updateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
ca.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
func (ca *CacheAgent) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
Expand All @@ -71,22 +84,22 @@ func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
return
}

deletedAgents := cm.updateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
deletedAgents := ca.updateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
ca.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) deleteConfigmap(obj interface{}) {
func (ca *CacheAgent) deleteConfigmap(obj interface{}) {
_, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.updateCacheAgents("", "delete")
cm.deleteAgentCache(deletedAgents)
deletedAgents := ca.updateCacheAgents("", "delete")
ca.deleteAgentCache(deletedAgents)
}

// updateCacheAgents update cache agents
func (cm *cacheManager) updateCacheAgents(cacheAgents, action string) sets.String {
func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.String {
newAgents := sets.NewString(util.DefaultCacheAgents...)
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
Expand All @@ -95,29 +108,32 @@ func (cm *cacheManager) updateCacheAgents(cacheAgents, action string) sets.Strin
}
}

cm.Lock()
defer cm.Unlock()
ca.Lock()
defer ca.Unlock()

if cm.cacheAgents.Equal(newAgents) {
if ca.agents.Equal(newAgents) {
return sets.String{}
}

deletedAgents := cm.cacheAgents.Difference(newAgents)
cm.cacheAgents = newAgents
klog.Infof("current cache agents: %v after %s, deleted agents: %v", cm.cacheAgents, action, deletedAgents)
// get deleted and added agents
deletedAgents := ca.agents.Difference(newAgents)
ca.agents = newAgents

klog.Infof("current cache agents: %v after %s, deleted agents: %v", ca.agents, action, deletedAgents)

// return deleted agents
return deletedAgents
}

func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) {
func (ca *CacheAgent) deleteAgentCache(deletedAgents sets.String) {
// delete cache data for deleted agents
if deletedAgents.Len() > 0 {
keys := deletedAgents.List()
for i := range keys {
if err := cm.storage.DeleteCollection(keys[i]); err != nil {
klog.Errorf("failed to cleanup cache for deleted agent(%s), %v", keys[i], err)
components := deletedAgents.List()
for i := range components {
if err := ca.store.DeleteComponentResources(components[i]); err != nil {
klog.Errorf("failed to cleanup cache for deleted agent(%s), %v", components[i], err)
} else {
klog.Infof("cleanup cache for agent(%s) successfully", keys[i])
klog.Infof("cleanup cache for agent(%s) successfully", components[i])
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestUpdateCacheAgents(t *testing.T) {
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
m := &cacheManager{
cacheAgents: sets.NewString(util.DefaultCacheAgents...),
m := &CacheAgent{
agents: sets.NewString(tt.initAgents...),
}

m.updateCacheAgents(strings.Join(tt.initAgents, ","), "")
Expand All @@ -79,8 +79,8 @@ func TestUpdateCacheAgents(t *testing.T) {
t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents)
}

if !m.cacheAgents.Equal(tt.resultAgents) {
t.Errorf("Got cache agents: %v, expect agents: %v", m.cacheAgents, tt.resultAgents)
if !m.agents.Equal(tt.resultAgents) {
t.Errorf("Got cache agents: %v, expect agents: %v", m.agents, tt.resultAgents)
}
})
}
Expand Down
Loading

0 comments on commit b3b35aa

Please sign in to comment.