Skip to content

Commit

Permalink
[ARG] Add an option to disable API call cache
Browse files Browse the repository at this point in the history
New option: disableAPICallCache
When ARG is enabled, this option should be true.

Signed-off-by: Zhecheng Li <zhechengli@microsoft.com>
  • Loading branch information
lzhecheng committed Jun 26, 2023
1 parent cb04dcf commit e1bb0d5
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 75 deletions.
40 changes: 30 additions & 10 deletions pkg/cache/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,31 @@ func cacheKeyFunc(obj interface{}) (string, error) {

// TimedCache is a cache with TTL.
type TimedCache struct {
Store cache.Store
Lock sync.Mutex
Getter GetFunc
TTL time.Duration
Store cache.Store
Lock sync.Mutex
Getter GetFunc
TTL time.Duration
Disabled bool
}

// NewTimedcache creates a new TimedCache.
func NewTimedcache(ttl time.Duration, getter GetFunc) (*TimedCache, error) {
func NewTimedcache(ttl time.Duration, getter GetFunc, disabled bool) (*TimedCache, error) {
if getter == nil {
return nil, fmt.Errorf("getter is not provided")
}

return &TimedCache{
Getter: getter,
timedCache := &TimedCache{
Getter: getter,
Disabled: disabled,
}
if !disabled {
// switch to using NewStore instead of NewTTLStore so that we can
// reuse entries for calls that are fine with reading expired/stalled data.
// with NewTTLStore, entries are not returned if they have already expired.
Store: cache.NewStore(cacheKeyFunc),
TTL: ttl,
}, nil
timedCache.Store = cache.NewStore(cacheKeyFunc)
timedCache.TTL = ttl
}
return timedCache, nil
}

// getInternal returns AzureCacheEntry by key. If the key is not cached yet,
Expand Down Expand Up @@ -124,11 +129,17 @@ func (t *TimedCache) getInternal(key string) (*AzureCacheEntry, error) {

// Get returns the requested item by key.
func (t *TimedCache) Get(key string, crt AzureCacheReadType) (interface{}, error) {
if t.Disabled {
return t.Getter(key)
}
return t.get(key, crt)
}

// Get returns the requested item by key with deep copy.
func (t *TimedCache) GetWithDeepCopy(key string, crt AzureCacheReadType) (interface{}, error) {
if t.Disabled {
return t.Getter(key)
}
data, err := t.get(key, crt)
copied := deepcopy.Copy(data)
return copied, err
Expand Down Expand Up @@ -172,6 +183,9 @@ func (t *TimedCache) get(key string, crt AzureCacheReadType) (interface{}, error

// Delete removes an item from the cache.
func (t *TimedCache) Delete(key string) error {
if t.Disabled {
return nil
}
return t.Store.Delete(&AzureCacheEntry{
Key: key,
})
Expand All @@ -180,6 +194,9 @@ func (t *TimedCache) Delete(key string) error {
// Set sets the data cache for the key.
// It is only used for testing.
func (t *TimedCache) Set(key string, data interface{}) {
if t.Disabled {
return
}
_ = t.Store.Add(&AzureCacheEntry{
Key: key,
Data: data,
Expand All @@ -189,6 +206,9 @@ func (t *TimedCache) Set(key string, data interface{}) {

// Update updates the data cache for the key.
func (t *TimedCache) Update(key string, data interface{}) {
if t.Disabled {
return
}
if entry, err := t.getInternal(key); err == nil {
entry.Lock.Lock()
defer entry.Lock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/azure_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newFakeCache(t *testing.T) (*fakeDataSource, *TimedCache) {
sem: *semaphore.NewWeighted(1),
}
getter := dataSource.get
cache, err := NewTimedcache(fakeCacheTTL, getter)
cache, err := NewTimedcache(fakeCacheTTL, getter, false)
assert.NoError(t, err)
return dataSource, cache
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestCacheGetError(t *testing.T) {
getter := func(key string) (interface{}, error) {
return nil, getError
}
cache, err := NewTimedcache(fakeCacheTTL, getter)
cache, err := NewTimedcache(fakeCacheTTL, getter, false)
assert.NoError(t, err)

val, err := cache.GetWithDeepCopy("key", CacheReadTypeDefault)
Expand Down
7 changes: 7 additions & 0 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ type Config struct {
// If the length is not 0, it is assumed the multiple standard load balancers mode is on. In this case,
// there must be one configuration named “<clustername>” or an error will be reported.
MultipleStandardLoadBalancerConfigurations []MultipleStandardLoadBalancerConfiguration `json:"multipleStandardLoadBalancerConfigurations,omitempty" yaml:"multipleStandardLoadBalancerConfigurations,omitempty"`

// DisableAPICallCache disables the cache for Azure API calls. It is for ARG support and not all resources will be disabled.
DisableAPICallCache bool `json:"disableAPICallCache,omitempty" yaml:"disableAPICallCache,omitempty"`
}

// MultipleStandardLoadBalancerConfiguration stores the properties regarding multiple standard load balancers.
Expand Down Expand Up @@ -742,6 +745,10 @@ func (az *Cloud) getPutVMSSVMBatchSize() int {
}

func (az *Cloud) initCaches() (err error) {
if az.Config.DisableAPICallCache {
klog.Infof("API call cache is disabled, ignore logs about cache operations")
}

az.vmCache, err = az.newVMCache()
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/provider/azure_controller_standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (as *availabilitySet) AttachDisk(ctx context.Context, nodeName types.NodeNa
}

func (as *availabilitySet) DeleteCacheForNode(nodeName string) error {
if as.Config.DisableAPICallCache {
return nil
}
err := as.cloud.vmCache.Delete(nodeName)
if err == nil {
klog.V(2).Infof("DeleteCacheForNode(%s) successfully", nodeName)
Expand Down Expand Up @@ -265,6 +268,9 @@ func (as *availabilitySet) UpdateVMAsync(ctx context.Context, nodeName types.Nod
}

func (as *availabilitySet) updateCache(nodeName string, vm *compute.VirtualMachine) {
if as.Config.DisableAPICallCache {
return
}
if as.common.DisableUpdateCache {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure_instance_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewInstanceMetadataService(imdsServer string) (*InstanceMetadataService, er
imdsServer: imdsServer,
}

imsCache, err := azcache.NewTimedcache(consts.MetadataCacheTTL, ims.getMetadata)
imsCache, err := azcache.NewTimedcache(consts.MetadataCacheTTL, ims.getMetadata, false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/azure_instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestInstanceID(t *testing.T) {
if test.useCustomImsCache {
cloud.Metadata.imsCache, err = azcache.NewTimedcache(consts.MetadataCacheTTL, func(key string) (interface{}, error) {
return nil, fmt.Errorf("getError")
})
}, false)
if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestNodeAddresses(t *testing.T) {
if test.useCustomImsCache {
cloud.Metadata.imsCache, err = azcache.NewTimedcache(consts.MetadataCacheTTL, func(key string) (interface{}, error) {
return nil, fmt.Errorf("getError")
})
}, false)
if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure_standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (as *availabilitySet) newVMASCache() (*azcache.TimedCache, error) {
as.Config.AvailabilitySetsCacheTTLInSeconds = consts.VMASCacheTTLDefaultInSeconds
}

return azcache.NewTimedcache(time.Duration(as.Config.AvailabilitySetsCacheTTLInSeconds)*time.Second, getter)
return azcache.NewTimedcache(time.Duration(as.Config.AvailabilitySetsCacheTTLInSeconds)*time.Second, getter, as.Cloud.Config.DisableAPICallCache)
}

// newStandardSet creates a new availabilitySet.
Expand Down
5 changes: 2 additions & 3 deletions pkg/provider/azure_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,13 @@ func TestGetVMSSVMCacheKey(t *testing.T) {
}

func TestIsNodeInVMSSVMCache(t *testing.T) {

getter := func(key string) (interface{}, error) {
return nil, nil
}
emptyCacheEntryTimedCache, _ := azcache.NewTimedcache(fakeCacheTTL, getter)
emptyCacheEntryTimedCache, _ := azcache.NewTimedcache(fakeCacheTTL, getter, false)
emptyCacheEntryTimedCache.Set("key", nil)

cacheEntryTimedCache, _ := azcache.NewTimedcache(fakeCacheTTL, getter)
cacheEntryTimedCache, _ := azcache.NewTimedcache(fakeCacheTTL, getter, false)
syncMap := &sync.Map{}
syncMap.Store("node", nil)
cacheEntryTimedCache.Set("key", syncMap)
Expand Down
117 changes: 69 additions & 48 deletions pkg/provider/azure_vmss_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,16 @@ func (ss *ScaleSet) newVMSSCache(ctx context.Context) (*azcache.TimedCache, erro
}
}

if resourceGroupNotFound {
// gc vmss vm cache when there is resource group not found
vmssVMKeys := ss.vmssVMCache.Store.ListKeys()
for _, cacheKey := range vmssVMKeys {
vmssName := cacheKey[strings.LastIndex(cacheKey, "/")+1:]
if _, ok := localCache.Load(vmssName); !ok {
klog.V(2).Infof("remove vmss %s from vmssVMCache due to rg not found", cacheKey)
_ = ss.vmssVMCache.Delete(cacheKey)
if !ss.Cloud.Config.DisableAPICallCache {
if resourceGroupNotFound {
// gc vmss vm cache when there is resource group not found
vmssVMKeys := ss.vmssVMCache.Store.ListKeys()
for _, cacheKey := range vmssVMKeys {
vmssName := cacheKey[strings.LastIndex(cacheKey, "/")+1:]
if _, ok := localCache.Load(vmssName); !ok {
klog.V(2).Infof("remove vmss %s from vmssVMCache due to rg not found", cacheKey)
_ = ss.vmssVMCache.Delete(cacheKey)
}
}
}
}
Expand All @@ -119,7 +121,7 @@ func (ss *ScaleSet) newVMSSCache(ctx context.Context) (*azcache.TimedCache, erro
if ss.Config.VmssCacheTTLInSeconds == 0 {
ss.Config.VmssCacheTTLInSeconds = consts.VMSSCacheTTLDefaultInSeconds
}
return azcache.NewTimedcache(time.Duration(ss.Config.VmssCacheTTLInSeconds)*time.Second, getter)
return azcache.NewTimedcache(time.Duration(ss.Config.VmssCacheTTLInSeconds)*time.Second, getter, ss.Config.DisableAPICallCache)
}

func (ss *ScaleSet) getVMSSVMsFromCache(resourceGroup, vmssName string, crt azcache.AzureCacheReadType) (*sync.Map, error) {
Expand All @@ -146,24 +148,26 @@ func (ss *ScaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) {
localCache := &sync.Map{} // [nodeName]*VMSSVirtualMachineEntry
oldCache := make(map[string]*VMSSVirtualMachineEntry)

entry, exists, err := ss.vmssVMCache.Store.GetByKey(cacheKey)
if err != nil {
return nil, err
}
if exists {
cached := entry.(*azcache.AzureCacheEntry).Data
if cached != nil {
virtualMachines := cached.(*sync.Map)
virtualMachines.Range(func(key, value interface{}) bool {
oldCache[key.(string)] = value.(*VMSSVirtualMachineEntry)
return true
})
if !ss.Cloud.Config.DisableAPICallCache {
entry, exists, err := ss.vmssVMCache.Store.GetByKey(cacheKey)
if err != nil {
return nil, err
}
if exists {
cached := entry.(*azcache.AzureCacheEntry).Data
if cached != nil {
virtualMachines := cached.(*sync.Map)
virtualMachines.Range(func(key, value interface{}) bool {
oldCache[key.(string)] = value.(*VMSSVirtualMachineEntry)
return true
})
}
}
}

result := strings.Split(cacheKey, "/")
if len(result) < 2 {
err = fmt.Errorf("Invalid cacheKey (%s)", cacheKey)
err := fmt.Errorf("Invalid cacheKey (%s)", cacheKey)
return nil, err
}

Expand Down Expand Up @@ -202,43 +206,50 @@ func (ss *ScaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) {
}
localCache.Store(computerName, vmssVMCacheEntry)

delete(oldCache, computerName)
if !ss.Cloud.Config.DisableAPICallCache {
delete(oldCache, computerName)
}
}

// add old missing cache data with nil entries to prevent aggressive
// ARM calls during cache invalidation
for name, vmEntry := range oldCache {
// if the nil cache entry has existed for vmssVirtualMachinesCacheTTL in the cache
// then it should not be added back to the cache
if vmEntry.VirtualMachine == nil && time.Since(vmEntry.LastUpdate) > vmssVirtualMachinesCacheTTL {
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
continue
}
LastUpdate := time.Now().UTC()
if vmEntry.VirtualMachine == nil {
// if this is already a nil entry then keep the time the nil
// entry was first created, so we can cleanup unwanted entries
LastUpdate = vmEntry.LastUpdate
}
if !ss.Cloud.Config.DisableAPICallCache {
// add old missing cache data with nil entries to prevent aggressive
// ARM calls during cache invalidation
for name, vmEntry := range oldCache {
// if the nil cache entry has existed for vmssVirtualMachinesCacheTTL in the cache
// then it should not be added back to the cache
if vmEntry.VirtualMachine == nil && time.Since(vmEntry.LastUpdate) > vmssVirtualMachinesCacheTTL {
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
continue
}
LastUpdate := time.Now().UTC()
if vmEntry.VirtualMachine == nil {
// if this is already a nil entry then keep the time the nil
// entry was first created, so we can cleanup unwanted entries
LastUpdate = vmEntry.LastUpdate
}

klog.V(5).Infof("adding old entries to new cache for %s", name)
localCache.Store(name, &VMSSVirtualMachineEntry{
ResourceGroup: vmEntry.ResourceGroup,
VMSSName: vmEntry.VMSSName,
InstanceID: vmEntry.InstanceID,
VirtualMachine: nil,
LastUpdate: LastUpdate,
})
klog.V(5).Infof("adding old entries to new cache for %s", name)
localCache.Store(name, &VMSSVirtualMachineEntry{
ResourceGroup: vmEntry.ResourceGroup,
VMSSName: vmEntry.VMSSName,
InstanceID: vmEntry.InstanceID,
VirtualMachine: nil,
LastUpdate: LastUpdate,
})
}
}

return localCache, nil
}

return azcache.NewTimedcache(vmssVirtualMachinesCacheTTL, getter)
return azcache.NewTimedcache(vmssVirtualMachinesCacheTTL, getter, ss.Cloud.Config.DisableAPICallCache)
}

// DeleteCacheForNode deletes Node from VMSS VM and VM caches.
func (ss *ScaleSet) DeleteCacheForNode(nodeName string) error {
if ss.Config.DisableAPICallCache {
return nil
}
vmManagementType, err := ss.getVMManagementTypeByNodeName(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil {
klog.Errorf("getVMManagementTypeByNodeName(%s) failed with %v", nodeName, err)
Expand Down Expand Up @@ -368,7 +379,7 @@ func (ss *ScaleSet) newNonVmssUniformNodesCache() (*azcache.TimedCache, error) {
if ss.Config.NonVmssUniformNodesCacheTTLInSeconds == 0 {
ss.Config.NonVmssUniformNodesCacheTTLInSeconds = consts.NonVmssUniformNodesCacheTTLDefaultInSeconds
}
return azcache.NewTimedcache(time.Duration(ss.Config.NonVmssUniformNodesCacheTTLInSeconds)*time.Second, getter)
return azcache.NewTimedcache(time.Duration(ss.Config.NonVmssUniformNodesCacheTTLInSeconds)*time.Second, getter, ss.Cloud.Config.DisableAPICallCache)
}

func (ss *ScaleSet) getVMManagementTypeByNodeName(nodeName string, crt azcache.AzureCacheReadType) (VMManagementType, error) {
Expand All @@ -382,6 +393,16 @@ func (ss *ScaleSet) getVMManagementTypeByNodeName(nodeName string, crt azcache.A
return ManagedByUnknownVMSet, err
}

if ss.Cloud.Config.DisableAPICallCache {
if cached.(NonVmssUniformNodesEntry).AvSetVMNodeNames.Has(nodeName) {
return ManagedByAvSet, nil
}
if cached.(NonVmssUniformNodesEntry).VMSSFlexVMNodeNames.Has(nodeName) {
return ManagedByVmssFlex, nil
}
return ManagedByVmssUniform, nil
}

cachedNodes := cached.(NonVmssUniformNodesEntry).ClusterNodeNames
// if the node is not in the cache, assume the node has joined after the last cache refresh and attempt to refresh the cache.
if !cachedNodes.Has(nodeName) {
Expand Down
Loading

0 comments on commit e1bb0d5

Please sign in to comment.