Skip to content

Commit

Permalink
fix: try recovering from gateway sync failure only on UpdateError (#6237
Browse files Browse the repository at this point in the history
) (#6260)

(cherry picked from commit 420b44c)
  • Loading branch information
czeslavo authored Jun 27, 2024
1 parent d05b52d commit 9553a95
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 189 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ Adding a new version? You'll need three changes:
### Fixed

- Do not try recovering from gateways synchronization errors with fallback configuration
(either generated or the last valid one) when an unexpected error (e.g. 5xx or network issue) occurs.
[#6237](https://github.com/Kong/kubernetes-ingress-controller/pull/6237)
- Admission webhook will accept multiple plugins of the same type associated with a single route-like,
Service, KongConsumer, KongConsumerGroup object to allow plugins to be associated with combinations
of those objects.
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/jpillora/backoff v1.0.0
github.com/kong/go-database-reconciler v1.12.1
github.com/kong/go-kong v0.55.0
github.com/kong/kubernetes-telemetry v0.1.3
github.com/kong/kubernetes-testing-framework v0.47.0
github.com/kong/go-database-reconciler v1.12.2
github.com/kong/go-kong v0.56.0
github.com/kong/kubernetes-telemetry v0.1.4
github.com/kong/kubernetes-testing-framework v0.47.1
github.com/lithammer/dedent v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/moul/pb v0.0.0-20220425114252-bca18df4138c
Expand All @@ -50,10 +50,10 @@ require (
github.com/testcontainers/testcontainers-go v0.31.0
go.uber.org/zap v1.27.0
google.golang.org/api v0.183.0
k8s.io/api v0.30.1
k8s.io/api v0.30.2
k8s.io/apiextensions-apiserver v0.30.1
k8s.io/apimachinery v0.30.1
k8s.io/client-go v0.30.1
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
k8s.io/component-base v0.30.1
sigs.k8s.io/controller-runtime v0.18.4
sigs.k8s.io/e2e-framework v0.4.0
Expand Down Expand Up @@ -174,7 +174,7 @@ require (
github.com/prometheus/procfs v0.14.0 // indirect
github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.4 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down Expand Up @@ -207,8 +207,8 @@ require (
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.0 // indirect
Expand All @@ -227,6 +227,6 @@ require (
k8s.io/kubectl v0.30.1
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kind v0.22.0 // indirect
sigs.k8s.io/kind v0.23.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)
66 changes: 23 additions & 43 deletions go.sum

Large diffs are not rendered by default.

78 changes: 37 additions & 41 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ type KongClient struct {
// While lastProcessedSnapshotHash keeps track of the last processed cache snapshot (the one kept in KongClient.cache),
// lastValidCacheSnapshot can also represent the fallback cache snapshot that was successfully synced with gateways.
lastValidCacheSnapshot *store.CacheStores

// brokenObjects is a list of the Kubernetes resources that failed to sync and triggered a fallback sync.
brokenObjects []fallback.ObjectHash
}

// NewKongClient provides a new KongClient object after connecting to the
Expand Down Expand Up @@ -453,6 +450,8 @@ func (c *KongClient) Update(ctx context.Context) error {
}
// Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken. That optimization can be used
// in main code path to avoid unnecessary processing. TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6095
// TODO: We should short-circuit here only if all the Gateways were successfully synced with the current configuration.
// https://github.com/Kong/kubernetes-ingress-controller/issues/6219
if newSnapshotHash == store.SnapshotHashEmpty {
c.prometheusMetrics.RecordProcessedConfigSnapshotCacheHit()
c.logger.V(util.DebugLevel).Info("No configuration change; pushing config to gateway is not necessary, skipping")
Expand Down Expand Up @@ -493,7 +492,7 @@ func (c *KongClient) Update(ctx context.Context) error {

// In case of a failure in syncing configuration with Gateways, propagate the error.
if gatewaysSyncErr != nil {
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(
if recoveringErr := c.maybeTryRecoveringFromGatewaysSyncError(
ctx,
cacheSnapshot,
gatewaysSyncErr,
Expand Down Expand Up @@ -532,28 +531,45 @@ func (c *KongClient) maybePreserveTheLastValidConfigCache(lastValidCache store.C
}
}

// tryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection by:
// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled.
// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled or fallback
// configuration generation fails.
func (c *KongClient) tryRecoveringFromGatewaysSyncError(
// maybeTryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection if the error is of the expected
// UpdateError type. Otherwise, we assume the error is non-recoverable by means of fallback configuration generation
// or applying the last valid configuration, and we need to rely on the retry mechanism. It can happen in case of
// transient network issues, internal server errors, etc.
//
// The recovery can be handled in two ways:
// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled and
// the UpdateError contains at least 1 broken object.
// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled, fallback
// configuration generation fails, or the UpdateError does not contain any broken objects (it can happen if a gateway
// returns 400 with no meaningful entities' errors).
func (c *KongClient) maybeTryRecoveringFromGatewaysSyncError(
ctx context.Context,
cacheSnapshot store.CacheStores,
gatewaysSyncErr error,
) error {
// If configuration was rejected by the gateways and FallbackConfiguration is enabled,
// we should generate a fallback configuration and push it to the gateways.
if c.kongConfig.FallbackConfiguration {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr)
// If the error is not of the expected UpdateError type, we should log it and skip the recovery.
updateErr := sendconfig.UpdateError{}
if !errors.As(gatewaysSyncErr, &updateErr) {
c.logger.V(util.DebugLevel).Info("Skipping recovery from gateways sync error - not enough details to recover",
"error", gatewaysSyncErr)
return nil
}

// If configuration was rejected by the gateways, we identified at least one broken object and FallbackConfiguration
// is enabled, we should generate a fallback configuration and push it to the gateways.
brokenObjects := extractBrokenObjectsFromUpdateError(updateErr)
if c.kongConfig.FallbackConfiguration && len(brokenObjects) > 0 {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, brokenObjects)
if recoveringErr == nil {
c.logger.Info("Successfully recovered from configuration rejection with fallback configuration")
return nil
}
// If we failed to recover using the fallback configuration, we should log the error and carry on.
// If we failed to recover using the fallback configuration, we should log the error and carry on with the last
// valid configuration.
c.logger.Error(recoveringErr, "Failed to recover from configuration rejection with fallback configuration")
}

// If FallbackConfiguration is disabled, or we failed to recover using the fallback configuration, we should
// If FallbackConfiguration is disabled, we skipped or failed to recover using the fallback configuration, we should
// apply the last valid configuration to the gateways.
if state, found := c.kongConfigFetcher.LastValidConfig(); found {
const isFallback = true
Expand All @@ -565,23 +581,13 @@ func (c *KongClient) tryRecoveringFromGatewaysSyncError(
return nil
}

func (c *KongClient) cacheBrokenObjectList(list []fallback.ObjectHash) {
c.brokenObjects = list
}

// tryRecoveringWithFallbackConfiguration tries to recover from a configuration rejection by generating a fallback
// configuration excluding affected objects from the cache.
func (c *KongClient) tryRecoveringWithFallbackConfiguration(
ctx context.Context,
currentCache store.CacheStores,
gatewaysSyncErr error,
brokenObjects []fallback.ObjectHash,
) error {
// Extract the broken objects from the update error and generate a fallback configuration excluding them.
brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr)
if err != nil {
return fmt.Errorf("failed to extract broken objects from update error: %w", err)
}

// Generate a fallback cache snapshot.
fallbackCache, generatedCacheMetadata, err := c.generateFallbackCache(currentCache, brokenObjects)
if err != nil {
Expand All @@ -607,8 +613,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
}

const isFallback = true
c.cacheBrokenObjectList(brokenObjects)
_, gatewaysSyncErr = c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback)
_, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback)
if gatewaysSyncErr != nil {
return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr)
}
Expand Down Expand Up @@ -647,24 +652,15 @@ func (c *KongClient) generateFallbackCache(
)
}

// extractBrokenObjectsFromUpdateError.
func extractBrokenObjectsFromUpdateError(err error) ([]fallback.ObjectHash, error) {
// extractBrokenObjectsFromUpdateError extracts broken objects from the UpdateError.
func extractBrokenObjectsFromUpdateError(err sendconfig.UpdateError) []fallback.ObjectHash {
var brokenObjects []client.Object

var updateErr sendconfig.UpdateError
if ok := errors.As(err, &updateErr); !ok {
return nil, fmt.Errorf("expected UpdateError, cannot extract broken objects from %T", err) //nolint:errorlint
}
for _, resourceFailure := range updateErr.ResourceFailures() {
for _, resourceFailure := range err.ResourceFailures() {
brokenObjects = append(brokenObjects, resourceFailure.CausingObjects()...)
}
if len(brokenObjects) == 0 {
return nil, fmt.Errorf("no broken objects found in UpdateError")
}

return lo.Map(brokenObjects, func(obj client.Object, _ int) fallback.ObjectHash {
return fallback.GetObjectHash(obj)
}), nil
})
}

// sendOutToGatewayClients will generate deck content (config) from the provided kong state
Expand Down
Loading

0 comments on commit 9553a95

Please sign in to comment.