Skip to content

Commit

Permalink
fix: try recovering from gateway sync failure only on UpdateError
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Jun 24, 2024
1 parent 07fa4b1 commit 389d60a
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 79 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ Adding a new version? You'll need three changes:
- Services using `Secret`s containing the same certificate as client certificates
by annotation `konghq.com/client-cert` can be correctly translated.
[#6228](https://github.com/Kong/kubernetes-ingress-controller/pull/6228)
- Do not try recovering from gateways synchronization errors with fallback configuration
(either generated or the last valid one) when an unexpected error (e.g. 5xx or network issue) occurs.
[#6237](https://github.com/Kong/kubernetes-ingress-controller/pull/6237)

## 3.2.0

Expand Down
79 changes: 38 additions & 41 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ type KongClient struct {
// While lastProcessedSnapshotHash keeps track of the last processed cache snapshot (the one kept in KongClient.cache),
// lastValidCacheSnapshot can also represent the fallback cache snapshot that was successfully synced with gateways.
lastValidCacheSnapshot *store.CacheStores

// brokenObjects is a list of the Kubernetes resources that failed to sync and triggered a fallback sync.
brokenObjects []fallback.ObjectHash
}

// NewKongClient provides a new KongClient object after connecting to the
Expand Down Expand Up @@ -453,6 +450,8 @@ func (c *KongClient) Update(ctx context.Context) error {
}
// Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken. That optimization can be used
// in main code path to avoid unnecessary processing. TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6095
// TODO: We should short-circuit here only if all the Gateways were successfully synced with the current configuration.
// https://github.com/Kong/kubernetes-ingress-controller/issues/6219
if newSnapshotHash == store.SnapshotHashEmpty {
c.prometheusMetrics.RecordProcessedConfigSnapshotCacheHit()
c.logger.V(util.DebugLevel).Info("No configuration change; pushing config to gateway is not necessary, skipping")
Expand Down Expand Up @@ -493,7 +492,7 @@ func (c *KongClient) Update(ctx context.Context) error {

// In case of a failure in syncing configuration with Gateways, propagate the error.
if gatewaysSyncErr != nil {
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(
if recoveringErr := c.maybeTryRecoveringFromGatewaysSyncError(
ctx,
cacheSnapshot,
gatewaysSyncErr,
Expand Down Expand Up @@ -532,28 +531,46 @@ func (c *KongClient) maybePreserveTheLastValidConfigCache(lastValidCache store.C
}
}

// tryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection by:
// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled.
// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled or fallback
// configuration generation fails.
func (c *KongClient) tryRecoveringFromGatewaysSyncError(
// maybeTryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection if the error is of the expected
// UpdateError type. Otherwise, we assume the error is non-recoverable by means of fallback configuration generation
// or applying the last valid configuration, and we need to rely on the retry mechanism. It can happen in case of
// transient network issues, internal server errors, etc.
//
// The recovery can be handled in two ways:
// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled and
// the UpdateError contains at least 1 broken object.
// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled, fallback
// configuration generation fails, or the UpdateError does not contain any broken objects (it can happen if a gateway
// returns 400 with no meaningful entities' errors).
func (c *KongClient) maybeTryRecoveringFromGatewaysSyncError(
ctx context.Context,
cacheSnapshot store.CacheStores,
gatewaysSyncErr error,
) error {
// If configuration was rejected by the gateways and FallbackConfiguration is enabled,
// we should generate a fallback configuration and push it to the gateways.
if c.kongConfig.FallbackConfiguration {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr)
// If the error is not of the expected UpdateError type, we should log it and skip the recovery.
updateErr := sendconfig.UpdateError{}
if !errors.As(gatewaysSyncErr, &updateErr) {
c.logger.V(util.DebugLevel).Info("Skipping recovery from gateways sync error - unexpected error occurred, relying on retry mechanism",
"error", gatewaysSyncErr,
)
return nil
}

// If configuration was rejected by the gateways, we identified at least one broken object and FallbackConfiguration
// is enabled, we should generate a fallback configuration and push it to the gateways.
brokenObjects := extractBrokenObjectsFromUpdateError(updateErr)
if c.kongConfig.FallbackConfiguration && len(brokenObjects) > 0 {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, brokenObjects)
if recoveringErr == nil {
c.logger.Info("Successfully recovered from configuration rejection with fallback configuration")
return nil
}
// If we failed to recover using the fallback configuration, we should log the error and carry on.
// If we failed to recover using the fallback configuration, we should log the error and carry on with the last
// valid configuration.
c.logger.Error(recoveringErr, "Failed to recover from configuration rejection with fallback configuration")
}

// If FallbackConfiguration is disabled, or we failed to recover using the fallback configuration, we should
// If FallbackConfiguration is disabled, we skipped or failed to recover using the fallback configuration, we should
// apply the last valid configuration to the gateways.
if state, found := c.kongConfigFetcher.LastValidConfig(); found {
const isFallback = true
Expand All @@ -565,23 +582,13 @@ func (c *KongClient) tryRecoveringFromGatewaysSyncError(
return nil
}

func (c *KongClient) cacheBrokenObjectList(list []fallback.ObjectHash) {
c.brokenObjects = list
}

// tryRecoveringWithFallbackConfiguration tries to recover from a configuration rejection by generating a fallback
// configuration excluding affected objects from the cache.
func (c *KongClient) tryRecoveringWithFallbackConfiguration(
ctx context.Context,
currentCache store.CacheStores,
gatewaysSyncErr error,
brokenObjects []fallback.ObjectHash,
) error {
// Extract the broken objects from the update error and generate a fallback configuration excluding them.
brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr)
if err != nil {
return fmt.Errorf("failed to extract broken objects from update error: %w", err)
}

// Generate a fallback cache snapshot.
fallbackCache, generatedCacheMetadata, err := c.generateFallbackCache(currentCache, brokenObjects)
if err != nil {
Expand All @@ -607,8 +614,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
}

const isFallback = true
c.cacheBrokenObjectList(brokenObjects)
_, gatewaysSyncErr = c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback)
_, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback)
if gatewaysSyncErr != nil {
return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr)
}
Expand Down Expand Up @@ -647,24 +653,15 @@ func (c *KongClient) generateFallbackCache(
)
}

// extractBrokenObjectsFromUpdateError.
func extractBrokenObjectsFromUpdateError(err error) ([]fallback.ObjectHash, error) {
// extractBrokenObjectsFromUpdateError extracts broken objects from the UpdateError.
func extractBrokenObjectsFromUpdateError(err sendconfig.UpdateError) []fallback.ObjectHash {
var brokenObjects []client.Object

var updateErr sendconfig.UpdateError
if ok := errors.As(err, &updateErr); !ok {
return nil, fmt.Errorf("expected UpdateError, cannot extract broken objects from %T", err) //nolint:errorlint
}
for _, resourceFailure := range updateErr.ResourceFailures() {
for _, resourceFailure := range err.ResourceFailures() {
brokenObjects = append(brokenObjects, resourceFailure.CausingObjects()...)
}
if len(brokenObjects) == 0 {
return nil, fmt.Errorf("no broken objects found in UpdateError")
}

return lo.Map(brokenObjects, func(obj client.Object, _ int) fallback.ObjectHash {
return fallback.GetObjectHash(obj)
}), nil
})
}

// sendOutToGatewayClients will generate deck content (config) from the provided kong state
Expand Down
213 changes: 186 additions & 27 deletions internal/dataplane/kong_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"slices"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1175,20 +1176,8 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) {

// We'll use KongConsumer as an example of a broken object, but it could be any supported type
// for the purpose of this test as the fallback config generator is mocked anyway.
someConsumer := func(name string) *kongv1.KongConsumer {
return &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
annotations.IngressClassKey: annotations.DefaultIngressClass,
},
},
Username: name,
}
}
validConsumer := someConsumer("valid")
brokenConsumer := someConsumer("broken")
validConsumer := someConsumer(t, "valid")
brokenConsumer := someConsumer(t, "broken")
originalCache := cacheStoresFromObjs(t, validConsumer, brokenConsumer)
lastValidCache := cacheStoresFromObjs(t, validConsumer)

Expand Down Expand Up @@ -1415,19 +1404,7 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) {

// We'll use KongConsumer as an example of a broken object, but it could be any supported type
// for the purpose of this test as the fallback config generator is mocked anyway.
someConsumer := func(name string) *kongv1.KongConsumer {
return &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
annotations.IngressClassKey: annotations.DefaultIngressClass,
},
},
Username: name,
}
}
brokenConsumer := someConsumer("broken")
brokenConsumer := someConsumer(t, "broken")
originalCache := cacheStoresFromObjs(t, brokenConsumer)
kongClient, err := NewKongClient(
zapr.NewLogger(zap.NewNop()),
Expand Down Expand Up @@ -1654,3 +1631,185 @@ func TestKongClient_ConfigDumpSanitization(t *testing.T) {
})
}
}

func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) {
ctx := context.Background()
configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true}
fallbackConfigGenerator := newMockFallbackConfigGenerator()
originalCache := cacheStoresFromObjs(t)

testCases := []struct {
name string
errorsFromGateways []error
hasLastValidConfig bool
expectRecoveryByGeneratingFallbackConfig bool
expectRecoveryByApplyingLastValidConfig bool
}{
{
name: "one of gateways returns UpdateError with entities",
errorsFromGateways: []error{
sendconfig.NewUpdateError(
[]failures.ResourceFailure{
lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "broken"))),
},
errors.New("error on update"),
),
nil,
},
expectRecoveryByGeneratingFallbackConfig: true,
},
{
name: "one of gateways returns UpdateError without entities, has last valid config",
errorsFromGateways: []error{
sendconfig.NewUpdateError(nil, errors.New("error on update")),
nil,
},
hasLastValidConfig: true,
expectRecoveryByApplyingLastValidConfig: true,
},
{
name: "one of gateways returns UpdateError without entities, no last valid config",
errorsFromGateways: []error{
sendconfig.NewUpdateError(nil, errors.New("error on update")),
nil,
},
hasLastValidConfig: false,
expectRecoveryByGeneratingFallbackConfig: false,
expectRecoveryByApplyingLastValidConfig: false,
},
{
name: "one of gateways returns unexpected error",
errorsFromGateways: []error{
errors.New("unexpected error on update"),
nil,
},
hasLastValidConfig: true,
expectRecoveryByGeneratingFallbackConfig: false,
expectRecoveryByApplyingLastValidConfig: false,
},
{
name: "one gateway returns UpdateError, another one an unexpected error",
errorsFromGateways: []error{
sendconfig.NewUpdateError(
[]failures.ResourceFailure{
lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "broken"))),
},
errors.New("error on update"),
),
errors.New("unexpected error on update"),
nil,
},
hasLastValidConfig: true,
expectRecoveryByGeneratingFallbackConfig: true,
expectRecoveryByApplyingLastValidConfig: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Preparing %d gateway clients", len(tc.errorsFromGateways))
updateStrategyResolver := newMockUpdateStrategyResolver(t)
gwClients := make([]*adminapi.Client, len(tc.errorsFromGateways))
for i := range gwClients {
gwClients[i] = mustSampleGatewayClient(t)
updateStrategyResolver.returnSpecificErrorOnUpdate(gwClients[i].BaseRootURL(), tc.errorsFromGateways[i])
}
clientsProvider := mockGatewayClientsProvider{
gatewayClients: gwClients,
}

lastValidConfigFetcher := &mockKongLastValidConfigFetcher{}
if tc.hasLastValidConfig {
t.Logf("Setting last valid config to contain a consumer with username 'last-valid'")
lastValidConfigFetcher.lastKongState = &kongstate.KongState{
Consumers: []kongstate.Consumer{
{
Consumer: kong.Consumer{
Username: lo.ToPtr("last-valid"),
},
},
},
}
}

t.Logf("Preparing config builder with a consumer with username 'fallback'")
configBuilder := newMockKongConfigBuilder()
configBuilder.kongState = &kongstate.KongState{
Consumers: []kongstate.Consumer{
{
Consumer: kong.Consumer{
Username: lo.ToPtr("fallback"),
},
},
},
}

kongClient, err := NewKongClient(
zapr.NewLogger(zap.NewNop()),
time.Second,
diagnostics.ConfigDumpDiagnostic{},
sendconfig.Config{
FallbackConfiguration: true,
},
mocks.NewEventRecorder(),
dpconf.DBModeOff,
clientsProvider,
updateStrategyResolver,
configChangeDetector,
lastValidConfigFetcher,
configBuilder,
originalCache,
fallbackConfigGenerator,
)
require.NoError(t, err)

err = kongClient.Update(ctx)
require.Error(t, err)

expectedUpdatedURLs := lo.Map(gwClients, func(c *adminapi.Client, _ int) string {
return c.BaseRootURL()
})
if tc.expectRecoveryByGeneratingFallbackConfig || tc.expectRecoveryByApplyingLastValidConfig {
// In case of any recovery method, we expect the update to be called twice for each gateway.
expectedUpdatedURLs = slices.Concat(expectedUpdatedURLs, expectedUpdatedURLs)
}
t.Logf("Ensuring that the update strategy was called %d times", len(expectedUpdatedURLs))
updateStrategyResolver.assertUpdateCalledForURLs(expectedUpdatedURLs)

expectedContent := func(consumerUsername string) *file.Content {
return &file.Content{
FormatVersion: "3.0",
Consumers: []file.FConsumer{
{
Consumer: kong.Consumer{
Username: lo.ToPtr(consumerUsername),
},
},
},
}
}
receivedContent, ok := updateStrategyResolver.lastUpdatedContentForURL(expectedUpdatedURLs[0])
require.True(t, ok)
if tc.expectRecoveryByApplyingLastValidConfig {
t.Log("Verifying that the last valid config was applied")
require.Equal(t, expectedContent("last-valid"), receivedContent.Content)
}
if tc.expectRecoveryByGeneratingFallbackConfig {
t.Log("Verifying that the fallback config was generated and applied")
require.Equal(t, expectedContent("fallback"), receivedContent.Content)
}
})
}
}

func someConsumer(t *testing.T, name string) *kongv1.KongConsumer {
return helpers.WithTypeMeta(t, &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
annotations.IngressClassKey: annotations.DefaultIngressClass,
},
},
Username: name,
})
}
Loading

0 comments on commit 389d60a

Please sign in to comment.