Skip to content

Commit

Permalink
feat(diag) expose fallback objects
Browse files Browse the repository at this point in the history
  • Loading branch information
rainest committed Jun 7, 2024
1 parent 77af8dc commit e4bf51b
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 38 deletions.
50 changes: 35 additions & 15 deletions internal/dataplane/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-logr/logr"

"github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics"
"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
)
Expand All @@ -31,71 +32,90 @@ func NewGenerator(cacheGraphProvider CacheGraphProvider, logger logr.Logger) *Ge
func (g *Generator) GenerateExcludingBrokenObjects(
cache store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
graph, err := g.cacheGraphProvider.CacheToGraph(cache)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
return store.CacheStores{}, nil, fmt.Errorf("failed to build cache graph: %w", err)
}

fallbackCache, err := cache.TakeSnapshot()
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to take cache snapshot: %w", err)
return store.CacheStores{}, nil, fmt.Errorf("failed to take cache snapshot: %w", err)
}

var diag []diagnostics.FallbackDiagnostic
for _, brokenObject := range brokenObjects {
subgraphObjects, err := graph.SubgraphObjects(brokenObject)
diag = make([]diagnostics.FallbackDiagnostic, len(subgraphObjects))
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
return store.CacheStores{}, nil, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
}
for _, obj := range subgraphObjects {
for i, obj := range subgraphObjects {
if err := fallbackCache.Delete(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
return store.CacheStores{}, nil, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
diag[i] = diagnostics.FallbackDiagnostic{
GroupKind: obj.GetObjectKind().GroupVersionKind().GroupKind().String(),
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
ID: string(obj.GetUID()),
Status: "excluded",
}
}
}

return fallbackCache, nil
return fallbackCache, diag, nil
}

func (g *Generator) GenerateBackfillingBrokenObjects(
currentCache store.CacheStores,
lastValidCacheSnapshot store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
// Generate a fallback cache snapshot excluding the broken objects.
fallbackCache, err := g.GenerateExcludingBrokenObjects(currentCache, brokenObjects)
fallbackCache, diag, err := g.GenerateExcludingBrokenObjects(currentCache, brokenObjects)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to generate fallback cache: %w", err)
return store.CacheStores{}, diag, fmt.Errorf("failed to generate fallback cache: %w", err)
}

// Build a graph from the last valid cache snapshot.
lastValidGraph, err := g.cacheGraphProvider.CacheToGraph(lastValidCacheSnapshot)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
return store.CacheStores{}, diag, fmt.Errorf("failed to build cache graph: %w", err)
}

// Backfill the broken objects from the last valid cache snapshot.
var backdiag []diagnostics.FallbackDiagnostic
for _, brokenObject := range brokenObjects {
objectsToBackfill, err := lastValidGraph.SubgraphObjects(brokenObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
return store.CacheStores{}, diag, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
}
var backdiag []diagnostics.FallbackDiagnostic

Check failure on line 99 in internal/dataplane/fallback/fallback.go

View workflow job for this annotation

GitHub Actions / linters / lint

S1021: should merge variable declaration with assignment on next line (gosimple)

for _, obj := range objectsToBackfill {
backdiag = make([]diagnostics.FallbackDiagnostic, len(objectsToBackfill))
for i, obj := range objectsToBackfill {
if err := fallbackCache.Add(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to add %s to the cache: %w", GetObjectHash(obj), err)
return store.CacheStores{}, diag, fmt.Errorf("failed to add %s to the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Backfilled object to fallback cache",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
backdiag[i] = diagnostics.FallbackDiagnostic{
GroupKind: obj.GetObjectKind().GroupVersionKind().GroupKind().String(),
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
ID: string(obj.GetUID()),
Status: "backfilled",
}
}
}
return fallbackCache, nil
return fallbackCache, backdiag, nil
}
20 changes: 10 additions & 10 deletions internal/dataplane/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
g := fallback.NewGenerator(graphProvider, logr.Discard())

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -102,7 +102,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("service is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -113,7 +113,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("serviceFacade is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -124,7 +124,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("plugin is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -135,7 +135,7 @@ func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
})

t.Run("multiple objects are broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.CacheToGraphLastCalledWith(), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
g := fallback.NewGenerator(graphProvider, logr.Discard())

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2),
"expected the generator to call CacheToGraph with the input cache stores and the last valid cache stores")
Expand All @@ -237,7 +237,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("service is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2),
"expected the generator to call CacheToGraph with the input cache stores and fallback cache")
Expand All @@ -259,7 +259,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("serviceFacade is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -280,7 +280,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("plugin is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -303,7 +303,7 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
})

t.Run("multiple objects are broken", func(t *testing.T) {
fallbackCache, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
fallbackCache, _, err := g.GenerateBackfillingBrokenObjects(inputCacheStores, lastValidCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, []store.CacheStores{inputCacheStores, lastValidCacheStores}, graphProvider.CacheToGraphLastNCalledWith(2), "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand Down
23 changes: 17 additions & 6 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@ type KongConfigBuilder interface {

// FallbackConfigGenerator generates a fallback configuration based on a cache snapshot and a set of broken objects.
type FallbackConfigGenerator interface {
GenerateExcludingBrokenObjects(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error)
GenerateExcludingBrokenObjects(
store.CacheStores,
[]fallback.ObjectHash,
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error)
GenerateBackfillingBrokenObjects(
currentCache store.CacheStores,
lastValidCache store.CacheStores,
brokenObjects []fallback.ObjectHash,
) (store.CacheStores, error)
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error)
}

// KongClient is a threadsafe high level API client for the Kong data-plane(s)
Expand Down Expand Up @@ -489,7 +492,12 @@ func (c *KongClient) Update(ctx context.Context) error {

// In case of a failure in syncing configuration with Gateways, propagate the error.
if gatewaysSyncErr != nil {
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(ctx, cacheSnapshot, gatewaysSyncErr); recoveringErr != nil {
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(
ctx,
cacheSnapshot,
gatewaysSyncErr,
shas[0],
); recoveringErr != nil {
return fmt.Errorf("failed to recover from gateways sync error: %w", recoveringErr)
}
// Update result is positive only if gateways were successfully synced with the current config, so we still
Expand Down Expand Up @@ -532,11 +540,12 @@ func (c *KongClient) tryRecoveringFromGatewaysSyncError(
ctx context.Context,
cacheSnapshot store.CacheStores,
gatewaysSyncErr error,
hash string,
) error {
// If configuration was rejected by the gateways and FallbackConfiguration is enabled,
// we should generate a fallback configuration and push it to the gateways.
if c.kongConfig.FallbackConfiguration {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr)
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr, hash)
if recoveringErr == nil {
c.logger.Info("Successfully recovered from configuration rejection with fallback configuration")
return nil
Expand Down Expand Up @@ -567,6 +576,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
ctx context.Context,
currentCache store.CacheStores,
gatewaysSyncErr error,
hash string,
) error {
// Extract the broken objects from the update error and generate a fallback configuration excluding them.
brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr)
Expand All @@ -575,10 +585,11 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
}

// Generate a fallback cache snapshot.
fallbackCache, err := c.generateFallbackCache(currentCache, brokenObjects)
fallbackCache, fallbackDiag, err := c.generateFallbackCache(currentCache, brokenObjects)
if err != nil {
return fmt.Errorf("failed to generate fallback configuration: %w", err)
}
c.diagnostic.Fallbacks <- diagnostics.FallbackDiagnosticCollection{ConfigHash: hash, Objects: fallbackDiag}

// Update the KongConfigBuilder with the fallback configuration and build the KongConfig.
c.kongConfigBuilder.UpdateCache(fallbackCache)
Expand Down Expand Up @@ -617,7 +628,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
func (c *KongClient) generateFallbackCache(
currentCache store.CacheStores,
brokenObjects []fallback.ObjectHash,
) (s store.CacheStores, err error) {
) (s store.CacheStores, diag []diagnostics.FallbackDiagnostic, err error) {
start := time.Now()
defer func() {
c.prometheusMetrics.RecordFallbackCacheGenerationDuration(time.Since(start), err)
Expand Down
12 changes: 6 additions & 6 deletions internal/dataplane/kong_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged(
type mockFallbackConfigGenerator struct {
GenerateResult store.CacheStores

GenerateExcludingBrokenObjectsCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash]
GenerateExcludingBrokenObjectsCalledWith lo.Tuple3[store.CacheStores, []diagnostics.FallbackDiagnostic, []fallback.ObjectHash]
GenerateBackfillingBrokenObjectsCalledWith lo.Tuple3[store.CacheStores, store.CacheStores, []fallback.ObjectHash]
}

Expand All @@ -304,18 +304,18 @@ func newMockFallbackConfigGenerator() *mockFallbackConfigGenerator {
func (m *mockFallbackConfigGenerator) GenerateExcludingBrokenObjects(
stores store.CacheStores,
hashes []fallback.ObjectHash,
) (store.CacheStores, error) {
m.GenerateExcludingBrokenObjectsCalledWith = lo.T2(stores, hashes)
return m.GenerateResult, nil
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
m.GenerateExcludingBrokenObjectsCalledWith = lo.T3(stores, []diagnostics.FallbackDiagnostic{}, hashes)
return m.GenerateResult, []diagnostics.FallbackDiagnostic{}, nil
}

func (m *mockFallbackConfigGenerator) GenerateBackfillingBrokenObjects(
currentStores store.CacheStores,
lastValidStores store.CacheStores,
brokenObjects []fallback.ObjectHash,
) (store.CacheStores, error) {
) (store.CacheStores, []diagnostics.FallbackDiagnostic, error) {
m.GenerateBackfillingBrokenObjectsCalledWith = lo.T3(currentStores, lastValidStores, brokenObjects)
return m.GenerateResult, nil
return m.GenerateResult, []diagnostics.FallbackDiagnostic{}, nil
}

func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions internal/diagnostics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ type Server struct {
successfulConfigDump file.Content
failedConfigDump file.Content
problemObjects []AffectedObject
fallback FallbackDiagnosticCollection
failedHash string
successHash string
rawErrBody []byte
configLock *sync.RWMutex
fallbackLock *sync.RWMutex
}

// ServerConfig contains configuration for the diagnostics server.
Expand All @@ -66,6 +68,7 @@ func NewServer(logger logr.Logger, cfg ServerConfig) Server {
s.configDumps = ConfigDumpDiagnostic{
DumpsIncludeSensitive: cfg.DumpSensitiveConfig,
Configs: make(chan ConfigDump, diagnosticConfigBufferDepth),
Fallbacks: make(chan FallbackDiagnosticCollection, diagnosticConfigBufferDepth),
}
}

Expand Down Expand Up @@ -134,6 +137,10 @@ func (s *Server) receiveConfig(ctx context.Context) {
s.successHash = dump.Meta.Hash
}
s.configLock.Unlock()
case fallback := <-s.configDumps.Fallbacks:
s.fallbackLock.Lock()
s.fallback = fallback
s.fallbackLock.Unlock()
case <-ctx.Done():
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
s.logger.Error(err, "Shutting down diagnostic config collection: context completed with error")
Expand Down Expand Up @@ -165,6 +172,7 @@ func (s *Server) installDumpHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/config/successful", s.handleLastValidConfig)
mux.HandleFunc("/debug/config/failed", s.handleLastFailedConfig)
mux.HandleFunc("/debug/config/problems", s.handleLastFailedProblemObjects)
mux.HandleFunc("/debug/config/fallback", s.handleLastFallback)
mux.HandleFunc("/debug/config/raw-error", s.handleLastErrBody)
}

Expand Down Expand Up @@ -214,6 +222,19 @@ func (s *Server) handleLastFailedProblemObjects(rw http.ResponseWriter, _ *http.
}
}

func (s *Server) handleLastFallback(rw http.ResponseWriter, _ *http.Request) {
rw.Header().Set("Content-Type", "application/json")
s.configLock.RLock()
defer s.configLock.RUnlock()
if err := json.NewEncoder(rw).Encode(
fallbackResponse{
ConfigHash: s.fallback.ConfigHash,
FallbackObjects: s.fallback.Objects,
}); err != nil {
rw.WriteHeader(http.StatusOK)
}
}

func (s *Server) handleLastErrBody(rw http.ResponseWriter, _ *http.Request) {
rw.Header().Set("Content-Type", "text/plain")
s.configLock.RLock()
Expand Down
Loading

0 comments on commit e4bf51b

Please sign in to comment.