Skip to content

Commit

Permalink
feat: use TakeSnapshotIfChanged to avoid taking redundant snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
programmer04 committed May 27, 2024
1 parent 183519e commit e2dfd41
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 32 deletions.
18 changes: 9 additions & 9 deletions internal/dataplane/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,29 @@ func NewGenerator(cacheGraphProvider CacheGraphProvider, logger logr.Logger) *Ge
}
}

// GenerateExcludingAffected generates a new cache snapshot that excludes all objects that depend on the broken objects.
func (g *Generator) GenerateExcludingAffected(
// GenerateExcludingBrokenObjects generates a new cache snapshot that excludes all objects that depend on the broken objects.
func (g *Generator) GenerateExcludingBrokenObjects(
cache store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, store.SnapshotHash, error) {
graph, err := g.cacheGraphProvider.CacheToGraph(cache)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
return store.CacheStores{}, store.SnapshotHashEmpty, fmt.Errorf("failed to build cache graph: %w", err)
}

fallbackCache, err := cache.TakeSnapshot()
fallbackCache, newSnapshotHash, err := cache.TakeSnapshotIfChanged(store.SnapshotHashEmpty)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to take cache snapshot: %w", err)
return store.CacheStores{}, store.SnapshotHashEmpty, fmt.Errorf("failed to take cache snapshot: %w", err)
}

for _, brokenObject := range brokenObjects {
subgraphObjects, err := graph.SubgraphObjects(brokenObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
return store.CacheStores{}, store.SnapshotHashEmpty, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
}
for _, obj := range subgraphObjects {
if err := fallbackCache.Delete(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
return store.CacheStores{}, store.SnapshotHashEmpty, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache",
"object_kind", obj.GetObjectKind(),
Expand All @@ -59,5 +59,5 @@ func (g *Generator) GenerateExcludingAffected(
}
}

return fallbackCache, nil
return fallbackCache, newSnapshotHash, nil
}
19 changes: 13 additions & 6 deletions internal/dataplane/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (m *mockGraphProvider) CacheToGraph(s store.CacheStores) (*fallback.ConfigG
return m.graph, nil
}

func TestGenerator_GenerateExcludingAffected(t *testing.T) {
func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
// We have to use real-world object types here as we're testing integration with store.CacheStores.
ingressClass := testIngressClass(t, "ingressClass")
service := testService(t, "service")
Expand Down Expand Up @@ -56,9 +56,12 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
graphProvider := &mockGraphProvider{graph: graph}
g := fallback.NewGenerator(graphProvider, logr.Discard())

const expectedSnapshotHash = store.SnapshotHash("4OYMIQUY7QOBJGX36TEJS35ZEQT24QPEMSNZGTFESWMRW6CSXBKQ====")

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
fallbackCache, snapshotHash, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
require.NoError(t, err)
require.Equal(t, expectedSnapshotHash, snapshotHash)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
require.Empty(t, fallbackCache.IngressClassV1.List(), "ingressClass should be excluded as it's broken")
Expand All @@ -68,8 +71,9 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("service is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
fallbackCache, snapshotHash, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, expectedSnapshotHash, snapshotHash)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
require.Empty(t, fallbackCache.Service.List(), "service should be excluded as it's broken")
Expand All @@ -79,8 +83,9 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("serviceFacade is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
fallbackCache, snapshotHash, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
require.NoError(t, err)
require.Equal(t, expectedSnapshotHash, snapshotHash)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
require.Empty(t, fallbackCache.KongServiceFacade.List(), "serviceFacade should be excluded as it's broken")
Expand All @@ -90,8 +95,9 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("plugin is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
fallbackCache, snapshotHash, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
require.NoError(t, err)
require.Equal(t, expectedSnapshotHash, snapshotHash)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
require.Empty(t, fallbackCache.Plugin.List(), "plugin should be excluded as it's broken")
Expand All @@ -101,8 +107,9 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("multiple objects are broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
fallbackCache, snapshotHash, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, expectedSnapshotHash, snapshotHash)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
require.Empty(t, fallbackCache.IngressClassV1.List(), "ingressClass should be excluded as it's broken")
Expand Down
16 changes: 11 additions & 5 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type KongConfigBuilder interface {

// FallbackConfigGenerator generates a fallback configuration based on a cache snapshot and a set of broken objects.
type FallbackConfigGenerator interface {
GenerateExcludingAffected(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error)
GenerateExcludingBrokenObjects(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, store.SnapshotHash, error)
}

// KongClient is a threadsafe high level API client for the Kong data-plane(s)
Expand Down Expand Up @@ -413,13 +413,18 @@ func (c *KongClient) Update(ctx context.Context) error {
// based on the cache contents, we need to ensure it is not modified during the process.
var cacheSnapshot store.CacheStores
if c.kongConfig.FallbackConfiguration {
// TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6080
// Use TakeSnapshotIfChanged to avoid taking a snapshot if the cache hasn't changed.
var newSnapshotHash store.SnapshotHash
var err error
cacheSnapshot, err = c.cache.TakeSnapshot()
cacheSnapshot, newSnapshotHash, err = c.cache.TakeSnapshotIfChanged(c.kongConfig.SnapshotHash)
if err != nil {
return fmt.Errorf("failed to take snapshot of cache: %w", err)
}
// Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken.
if newSnapshotHash == store.SnapshotHashEmpty {
c.logger.V(util.DebugLevel).Info("No configuration change; pushing config to gateway is not necessary, skipping")
return nil
}
c.kongConfig.SnapshotHash = newSnapshotHash
c.kongConfigBuilder.UpdateCache(cacheSnapshot)
}

Expand Down Expand Up @@ -518,13 +523,14 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
if err != nil {
return fmt.Errorf("failed to extract broken objects from update error: %w", err)
}
fallbackCache, err := c.fallbackConfigGenerator.GenerateExcludingAffected(
fallbackCache, newHash, err := c.fallbackConfigGenerator.GenerateExcludingBrokenObjects(
cacheSnapshot,
brokenObjects,
)
if err != nil {
return fmt.Errorf("failed to generate fallback configuration: %w", err)
}
c.kongConfig.SnapshotHash = newHash

// Update the KongConfigBuilder with the fallback configuration and build the KongConfig.
c.kongConfigBuilder.UpdateCache(fallbackCache)
Expand Down
20 changes: 11 additions & 9 deletions internal/dataplane/kong_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,22 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged(

// mockKongLastValidConfigFetcher is a mock implementation of FallbackConfigGenerator interface.
type mockFallbackConfigGenerator struct {
generateExcludingAffectedCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash]
generateExcludingAffectedResult store.CacheStores
GenerateExcludingBrokenObjectsCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash]
GenerateExcludingBrokenObjectsResult store.CacheStores
}

func newMockFallbackConfigGenerator() *mockFallbackConfigGenerator {
return &mockFallbackConfigGenerator{}
}

func (m *mockFallbackConfigGenerator) GenerateExcludingAffected(
func (m *mockFallbackConfigGenerator) GenerateExcludingBrokenObjects(
stores store.CacheStores,
hashes []fallback.ObjectHash,
) (store.CacheStores, error) {
m.generateExcludingAffectedCalledWith = lo.T2(stores, hashes)
return m.generateExcludingAffectedResult, nil
) (store.CacheStores, store.SnapshotHash, error) {
m.GenerateExcludingBrokenObjectsCalledWith = lo.T2(stores, hashes)
// In this mock implementation, we don't care about the snapshot hash.
// We know that it will be some base32 encoded string.
return m.GenerateExcludingBrokenObjectsResult, "4OYMIQUY7QOBJGX36TEJS35ZEQT24QPEMSNZGTFESWMRW6CSXBKQ====", nil
}

func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) {
Expand Down Expand Up @@ -1026,7 +1028,7 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) {

t.Log("Setting the fallback config generator to return a snapshot excluding the broken consumer")
fallbackCacheStoresToBeReturned := cacheStoresFromObjs(t, validConsumer)
fallbackConfigGenerator.generateExcludingAffectedResult = fallbackCacheStoresToBeReturned
fallbackConfigGenerator.GenerateExcludingBrokenObjectsResult = fallbackCacheStoresToBeReturned

t.Log("Calling KongClient.Update")
err = kongClient.Update(ctx)
Expand All @@ -1044,8 +1046,8 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) {
require.True(t, hasConsumer, "expected consumer to be in the first cache snapshot")

t.Log("Verifying that the fallback config generator was called with the first cache snapshot and the broken object hash")
expectedGenerateExcludingAffectedArgs := lo.T2(firstCacheUpdate, []fallback.ObjectHash{fallback.GetObjectHash(brokenConsumer)})
require.Equal(t, expectedGenerateExcludingAffectedArgs, fallbackConfigGenerator.generateExcludingAffectedCalledWith,
expectedGenerateExcludingBrokenObjectsArgs := lo.T2(firstCacheUpdate, []fallback.ObjectHash{fallback.GetObjectHash(brokenConsumer)})
require.Equal(t, expectedGenerateExcludingBrokenObjectsArgs, fallbackConfigGenerator.GenerateExcludingBrokenObjectsCalledWith,
"expected fallback config generator to be called with the first cache snapshot and the broken object hash")

t.Log("Verifying that the second config builder cache update contains the fallback snapshot")
Expand Down
6 changes: 6 additions & 0 deletions internal/dataplane/sendconfig/kong.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sendconfig

import (
"github.com/blang/semver/v4"

"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
)

// Config gathers parameters that are needed for sending configuration to Kong Admin APIs.
Expand Down Expand Up @@ -37,4 +39,8 @@ type Config struct {
// FallbackConfiguration indicates whether to generate fallback configuration in the case of entity
// errors returned by the Kong Admin API.
FallbackConfiguration bool

// SnapshotHash stores the hash of the current snapshot. It's used to determine configuration
// changes, it's used for the fallback configuration generation.
SnapshotHash store.SnapshotHash
}
8 changes: 5 additions & 3 deletions internal/store/cache_stores_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

// TakeSnapshot takes a snapshot of the CacheStores.
//
// Deprecated: use TakeSnapshotIfChanged instead.
func (c CacheStores) TakeSnapshot() (CacheStores, error) {
// Create a fresh CacheStores instance to store the snapshot
// in the c.takeSnapshot method. It happens here because it's
Expand Down Expand Up @@ -97,7 +99,7 @@ func (c CacheStores) TakeSnapshotIfChanged(previousSnapshotHash SnapshotHash) (
return string(uid) + resourceVer
})
if capturedErr != nil {
return CacheStores{}, "", capturedErr
return CacheStores{}, SnapshotHashEmpty, capturedErr
}
// Strings have to be used instead of byte slices, because Cmp.Ordered has to be satisfied.
slices.Sort(valuesForHashComputation)
Expand All @@ -110,12 +112,12 @@ func (c CacheStores) TakeSnapshotIfChanged(previousSnapshotHash SnapshotHash) (

// If the hash of the current state is the same as the hash of the previous snapshot, return an empty snapshot.
if newHash == previousSnapshotHash {
return CacheStores{}, "", nil
return CacheStores{}, SnapshotHashEmpty, nil
}

// Take a snapshot of the current state as the hash of the current state differs from the previous one.
if err := takeSnapshot(&snapshot, listOfStores); err != nil {
return CacheStores{}, "", fmt.Errorf("failed to take snapshot: %w", err)
return CacheStores{}, SnapshotHashEmpty, fmt.Errorf("failed to take snapshot: %w", err)
}
return snapshot, newHash, nil
}
Expand Down

0 comments on commit e2dfd41

Please sign in to comment.