Skip to content

Commit

Permalink
feat(diag) expose fallback objects
Browse files Browse the repository at this point in the history
  • Loading branch information
rainest authored and czeslavo committed Jun 10, 2024
1 parent 0b7e192 commit fc34d70
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 39 deletions.
58 changes: 42 additions & 16 deletions internal/dataplane/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-logr/logr"

"github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics"
"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
)
Expand All @@ -31,98 +32,123 @@ func NewGenerator(cacheGraphProvider CacheGraphProvider, logger logr.Logger) *Ge
func (g *Generator) GenerateExcludingBrokenObjects(
cache store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
graph, err := g.cacheGraphProvider.CacheToGraph(cache)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
return store.CacheStores{}, nil, fmt.Errorf("failed to build cache graph: %w", err)
}

fallbackCache, err := cache.TakeSnapshot()
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to take cache snapshot: %w", err)
return store.CacheStores{}, nil, fmt.Errorf("failed to take cache snapshot: %w", err)
}

var diag []diagnostics.FallbackDiagnostic
for _, brokenObject := range brokenObjects {
subgraphObjects, err := graph.SubgraphObjects(brokenObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
return store.CacheStores{}, nil, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
}
for _, obj := range subgraphObjects {
if err := fallbackCache.Delete(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
return store.CacheStores{}, nil, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
diag = append(diag, diagnostics.FallbackDiagnostic{
GroupKind: obj.GetObjectKind().GroupVersionKind().GroupKind().String(),
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
ID: string(obj.GetUID()),
Status: "excluded",
})
}
}

return fallbackCache, nil
return fallbackCache, diag, nil
}

func (g *Generator) GenerateBackfillingBrokenObjects(
currentCache store.CacheStores,
lastValidCacheSnapshot store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
// Build a graph from the current cache.
currentGraph, err := g.cacheGraphProvider.CacheToGraph(currentCache)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to build current cache graph: %w", err)
return store.CacheStores{}, nil, fmt.Errorf("failed to build current cache graph: %w", err)
}

// Take a snapshot of the current cache to use as a fallback.
fallbackCache, err := currentCache.TakeSnapshot()
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to take current cache snapshot: %w", err)
return store.CacheStores{}, nil, fmt.Errorf("failed to take current cache snapshot: %w", err)
}

// Exclude the affected objects from the fallback cache. Also, collect all the affected objects as they will be
// subjects of backfilling.
var affectedObjects []ObjectHash
var (
affectedObjects []ObjectHash
diag []diagnostics.FallbackDiagnostic
)
for _, brokenObject := range brokenObjects {
subgraphObjects, err := currentGraph.SubgraphObjects(brokenObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
return store.CacheStores{}, nil, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
}
for _, obj := range subgraphObjects {
if err := fallbackCache.Delete(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to delete %s from the fallback cache: %w", GetObjectHash(obj), err)
return store.CacheStores{}, nil, fmt.Errorf("failed to delete %s from the fallback cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
affectedObjects = append(affectedObjects, GetObjectHash(obj))
diag = append(diag, diagnostics.FallbackDiagnostic{
GroupKind: obj.GetObjectKind().GroupVersionKind().GroupKind().String(),
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
ID: string(obj.GetUID()),
Status: "excluded",
})
}
}

// Build a graph from the last valid cache snapshot.
lastValidGraph, err := g.cacheGraphProvider.CacheToGraph(lastValidCacheSnapshot)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
return store.CacheStores{}, diag, fmt.Errorf("failed to build cache graph: %w", err)
}

// Backfill the affected objects from the last valid cache snapshot.
for _, affectedObject := range affectedObjects {
objectsToBackfill, err := lastValidGraph.SubgraphObjects(affectedObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", affectedObject, err)
return store.CacheStores{}, nil, fmt.Errorf("failed to find dependants for %s: %w", affectedObject, err)
}

for _, obj := range objectsToBackfill {
if err := fallbackCache.Add(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to add %s to the cache: %w", GetObjectHash(obj), err)
return store.CacheStores{}, diag, fmt.Errorf("failed to add %s to the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Backfilled object to fallback cache from previous valid cache snapshot",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
diag = append(diag, diagnostics.FallbackDiagnostic{
GroupKind: obj.GetObjectKind().GroupVersionKind().GroupKind().String(),
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
ID: string(obj.GetUID()),
Status: "backfilled",
})
}
}
return fallbackCache, nil
return fallbackCache, diag, nil
}
20 changes: 10 additions & 10 deletions internal/dataplane/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
g := fallback.NewGenerator(graphProvider, logr.Discard())

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -101,7 +101,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("service is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -112,7 +112,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("serviceFacade is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -123,7 +123,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("plugin is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -134,7 +134,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("multiple objects are broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
g := fallback.NewGenerator(graphProvider, logr.Discard())

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2),
"expected the generator to call CacheToGraph with the input cache stores and the last valid cache stores")
Expand All @@ -242,7 +242,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("service is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2),
"expected the generator to call CacheToGraph with the input cache stores and fallback cache")
Expand All @@ -264,7 +264,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("serviceFacade is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -285,7 +285,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("plugin is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -306,7 +306,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("multiple objects are broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand Down
25 changes: 19 additions & 6 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@ type KongConfigBuilder interface {

// FallbackConfigGenerator generates a fallback configuration based on a cache snapshot and a set of broken objects.
type FallbackConfigGenerator interface {
GenerateExcludingBrokenObjects(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error)
GenerateExcludingBrokenObjects(
store.CacheStores,
[]fallback.ObjectHash,
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error)
GenerateBackfillingBrokenObjects(
currentCache store.CacheStores,
lastValidCache store.CacheStores,
brokenObjects []fallback.ObjectHash,
) (store.CacheStores, error)
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error)
}

// KongClient is a threadsafe high level API client for the Kong data-plane(s)
Expand Down Expand Up @@ -489,7 +492,14 @@ func (c *KongClient) Update(ctx context.Context) error {

// In case of a failure in syncing configuration with Gateways, propagate the error.
if gatewaysSyncErr != nil {
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(ctx, cacheSnapshot, gatewaysSyncErr); recoveringErr != nil {
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(
ctx,
cacheSnapshot,
gatewaysSyncErr,
// there are actually multiple returned hashes, and there can apparently be _no_ returned hashes,
// so this either can't align to those like the other endpoints or needs to pipe it in from elsewhere
"TODO",
); recoveringErr != nil {
return fmt.Errorf("failed to recover from gateways sync error: %w", recoveringErr)
}
// Update result is positive only if gateways were successfully synced with the current config, so we still
Expand Down Expand Up @@ -532,11 +542,12 @@ func (c *KongClient) tryRecoveringFromGatewaysSyncError(
ctx context.Context,
cacheSnapshot store.CacheStores,
gatewaysSyncErr error,
hash string,
) error {
// If configuration was rejected by the gateways and FallbackConfiguration is enabled,
// we should generate a fallback configuration and push it to the gateways.
if c.kongConfig.FallbackConfiguration {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr)
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr, hash)
if recoveringErr == nil {
c.logger.Info("Successfully recovered from configuration rejection with fallback configuration")
return nil
Expand Down Expand Up @@ -567,6 +578,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
ctx context.Context,
currentCache store.CacheStores,
gatewaysSyncErr error,
hash string,
) error {
// Extract the broken objects from the update error and generate a fallback configuration excluding them.
brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr)
Expand All @@ -575,10 +587,11 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
}

// Generate a fallback cache snapshot.
fallbackCache, err := c.generateFallbackCache(currentCache, brokenObjects)
fallbackCache, fallbackDiag, err := c.generateFallbackCache(currentCache, brokenObjects)
if err != nil {
return fmt.Errorf("failed to generate fallback configuration: %w", err)
}
c.diagnostic.Fallbacks <- diagnostics.FallbackDiagnosticCollection{ConfigHash: hash, Objects: fallbackDiag}

// Update the KongConfigBuilder with the fallback configuration and build the KongConfig.
c.kongConfigBuilder.UpdateCache(fallbackCache)
Expand Down Expand Up @@ -617,7 +630,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
func (c *KongClient) generateFallbackCache(
currentCache store.CacheStores,
brokenObjects []fallback.ObjectHash,
) (s store.CacheStores, err error) {
) (s store.CacheStores, diag []diagnostics.FallbackDiagnostic, err error) {
start := time.Now()
defer func() {
c.prometheusMetrics.RecordFallbackCacheGenerationDuration(time.Since(start), err)
Expand Down
12 changes: 6 additions & 6 deletions internal/dataplane/kong_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged(
type mockFallbackConfigGenerator struct {
GenerateResult store.CacheStores

GenerateExcludingBrokenObjectsCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash]
GenerateExcludingBrokenObjectsCalledWith lo.Tuple3[store.CacheStores, []diagnostics.FallbackDiagnostic, []fallback.ObjectHash]
GenerateBackfillingBrokenObjectsCalledWith lo.Tuple3[store.CacheStores, store.CacheStores, []fallback.ObjectHash]
}

Expand All @@ -304,18 +304,18 @@ func newMockFallbackConfigGenerator() *mockFallbackConfigGenerator {
func (m *mockFallbackConfigGenerator) GenerateExcludingBrokenObjects(
stores store.CacheStores,
hashes []fallback.ObjectHash,
) (store.CacheStores, error) {
m.GenerateExcludingBrokenObjectsCalledWith = lo.T2(stores, hashes)
return m.GenerateResult, nil
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
m.GenerateExcludingBrokenObjectsCalledWith = lo.T3(stores, []diagnostics.FallbackDiagnostic{}, hashes)
return m.GenerateResult, []diagnostics.FallbackDiagnostic{}, nil
}

func (m *mockFallbackConfigGenerator) GenerateBackfillingBrokenObjects(
currentStores store.CacheStores,
lastValidStores store.CacheStores,
brokenObjects []fallback.ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
m.GenerateBackfillingBrokenObjectsCalledWith = lo.T3(currentStores, lastValidStores, brokenObjects)
return m.GenerateResult, nil
return m.GenerateResult, []diagnostics.FallbackDiagnostic{}, nil
}

func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) {
Expand Down
Loading

0 comments on commit fc34d70

Please sign in to comment.