diff --git a/CHANGELOG.md b/CHANGELOG.md index c4cb761fc6c..afdb5ab91b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,9 @@ Adding a new version? You'll need three changes: - Services using `Secret`s containing the same certificate as client certificates by annotation `konghq.com/client-cert` can be correctly translated. [#6228](https://github.com/Kong/kubernetes-ingress-controller/pull/6228) +- Do not try recovering from gateways synchronization errors with fallback configuration + (either generated or the last valid one) when an unexpected error (e.g. 5xx or network issue) occurs. + [#6237](https://github.com/Kong/kubernetes-ingress-controller/pull/6237) ## 3.2.0 diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index f8f68a74651..56d704a54ce 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -180,9 +180,6 @@ type KongClient struct { // While lastProcessedSnapshotHash keeps track of the last processed cache snapshot (the one kept in KongClient.cache), // lastValidCacheSnapshot can also represent the fallback cache snapshot that was successfully synced with gateways. lastValidCacheSnapshot *store.CacheStores - - // brokenObjects is a list of the Kubernetes resources that failed to sync and triggered a fallback sync. - brokenObjects []fallback.ObjectHash } // NewKongClient provides a new KongClient object after connecting to the @@ -453,6 +450,8 @@ func (c *KongClient) Update(ctx context.Context) error { } // Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken. That optimization can be used // in main code path to avoid unnecessary processing. TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6095 + // TODO: We should short-circuit here only if all the Gateways were successfully synced with the current configuration. + // https://github.com/Kong/kubernetes-ingress-controller/issues/6219 if newSnapshotHash == store.SnapshotHashEmpty { c.prometheusMetrics.RecordProcessedConfigSnapshotCacheHit() c.logger.V(util.DebugLevel).Info("No configuration change; pushing config to gateway is not necessary, skipping") @@ -493,7 +492,7 @@ func (c *KongClient) Update(ctx context.Context) error { // In case of a failure in syncing configuration with Gateways, propagate the error. if gatewaysSyncErr != nil { - if recoveringErr := c.tryRecoveringFromGatewaysSyncError( + if recoveringErr := c.maybeTryRecoveringFromGatewaysSyncError( ctx, cacheSnapshot, gatewaysSyncErr, @@ -532,28 +531,46 @@ func (c *KongClient) maybePreserveTheLastValidConfigCache(lastValidCache store.C } } -// tryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection by: -// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled. -// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled or fallback -// configuration generation fails. -func (c *KongClient) tryRecoveringFromGatewaysSyncError( +// maybeTryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection if the error is of the expected +// UpdateError type. Otherwise, we assume the error is non-recoverable by means of fallback configuration generation +// or applying the last valid configuration, and we need to rely on the retry mechanism. It can happen in case of +// transient network issues, internal server errors, etc. +// +// The recovery can be handled in two ways: +// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled and +// the UpdateError contains at least 1 broken object. +// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled, fallback +// configuration generation fails, or the UpdateError does not contain any broken objects (it can happen if a gateway +// returns 400 with no meaningful entities' errors). +func (c *KongClient) maybeTryRecoveringFromGatewaysSyncError( ctx context.Context, cacheSnapshot store.CacheStores, gatewaysSyncErr error, ) error { - // If configuration was rejected by the gateways and FallbackConfiguration is enabled, - // we should generate a fallback configuration and push it to the gateways. - if c.kongConfig.FallbackConfiguration { - recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr) + // If the error is not of the expected UpdateError type, we should log it and skip the recovery. + updateErr := sendconfig.UpdateError{} + if !errors.As(gatewaysSyncErr, &updateErr) { + c.logger.V(util.DebugLevel).Info("Skipping recovery from gateways sync error - unexpected error occurred, relying on retry mechanism", + "error", gatewaysSyncErr, + ) + return nil + } + + // If configuration was rejected by the gateways, we identified at least one broken object and FallbackConfiguration + // is enabled, we should generate a fallback configuration and push it to the gateways. + brokenObjects := extractBrokenObjectsFromUpdateError(updateErr) + if c.kongConfig.FallbackConfiguration && len(brokenObjects) > 0 { + recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, brokenObjects) if recoveringErr == nil { c.logger.Info("Successfully recovered from configuration rejection with fallback configuration") return nil } - // If we failed to recover using the fallback configuration, we should log the error and carry on. + // If we failed to recover using the fallback configuration, we should log the error and carry on with the last + // valid configuration. c.logger.Error(recoveringErr, "Failed to recover from configuration rejection with fallback configuration") } - // If FallbackConfiguration is disabled, or we failed to recover using the fallback configuration, we should + // If FallbackConfiguration is disabled, we skipped or failed to recover using the fallback configuration, we should // apply the last valid configuration to the gateways. if state, found := c.kongConfigFetcher.LastValidConfig(); found { const isFallback = true @@ -565,23 +582,13 @@ func (c *KongClient) tryRecoveringFromGatewaysSyncError( return nil } -func (c *KongClient) cacheBrokenObjectList(list []fallback.ObjectHash) { - c.brokenObjects = list -} - // tryRecoveringWithFallbackConfiguration tries to recover from a configuration rejection by generating a fallback // configuration excluding affected objects from the cache. func (c *KongClient) tryRecoveringWithFallbackConfiguration( ctx context.Context, currentCache store.CacheStores, - gatewaysSyncErr error, + brokenObjects []fallback.ObjectHash, ) error { - // Extract the broken objects from the update error and generate a fallback configuration excluding them. - brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr) - if err != nil { - return fmt.Errorf("failed to extract broken objects from update error: %w", err) - } - // Generate a fallback cache snapshot. fallbackCache, generatedCacheMetadata, err := c.generateFallbackCache(currentCache, brokenObjects) if err != nil { @@ -607,8 +614,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration( } const isFallback = true - c.cacheBrokenObjectList(brokenObjects) - _, gatewaysSyncErr = c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback) + _, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback) if gatewaysSyncErr != nil { return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr) } @@ -647,24 +653,15 @@ func (c *KongClient) generateFallbackCache( ) } -// extractBrokenObjectsFromUpdateError. -func extractBrokenObjectsFromUpdateError(err error) ([]fallback.ObjectHash, error) { +// extractBrokenObjectsFromUpdateError extracts broken objects from the UpdateError. +func extractBrokenObjectsFromUpdateError(err sendconfig.UpdateError) []fallback.ObjectHash { var brokenObjects []client.Object - - var updateErr sendconfig.UpdateError - if ok := errors.As(err, &updateErr); !ok { - return nil, fmt.Errorf("expected UpdateError, cannot extract broken objects from %T", err) //nolint:errorlint - } - for _, resourceFailure := range updateErr.ResourceFailures() { + for _, resourceFailure := range err.ResourceFailures() { brokenObjects = append(brokenObjects, resourceFailure.CausingObjects()...) } - if len(brokenObjects) == 0 { - return nil, fmt.Errorf("no broken objects found in UpdateError") - } - return lo.Map(brokenObjects, func(obj client.Object, _ int) fallback.ObjectHash { return fallback.GetObjectHash(obj) - }), nil + }) } // sendOutToGatewayClients will generate deck content (config) from the provided kong state diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 5fbfe954f77..0d6a2cb2e81 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "slices" "strings" "sync" "testing" @@ -1175,20 +1176,8 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { // We'll use KongConsumer as an example of a broken object, but it could be any supported type // for the purpose of this test as the fallback config generator is mocked anyway. - someConsumer := func(name string) *kongv1.KongConsumer { - return &kongv1.KongConsumer{ - ObjectMeta: metav1.ObjectMeta{ - Name: "name", - Namespace: "namespace", - Annotations: map[string]string{ - annotations.IngressClassKey: annotations.DefaultIngressClass, - }, - }, - Username: name, - } - } - validConsumer := someConsumer("valid") - brokenConsumer := someConsumer("broken") + validConsumer := someConsumer(t, "valid") + brokenConsumer := someConsumer(t, "broken") originalCache := cacheStoresFromObjs(t, validConsumer, brokenConsumer) lastValidCache := cacheStoresFromObjs(t, validConsumer) @@ -1415,19 +1404,7 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { // We'll use KongConsumer as an example of a broken object, but it could be any supported type // for the purpose of this test as the fallback config generator is mocked anyway. - someConsumer := func(name string) *kongv1.KongConsumer { - return &kongv1.KongConsumer{ - ObjectMeta: metav1.ObjectMeta{ - Name: "name", - Namespace: "namespace", - Annotations: map[string]string{ - annotations.IngressClassKey: annotations.DefaultIngressClass, - }, - }, - Username: name, - } - } - brokenConsumer := someConsumer("broken") + brokenConsumer := someConsumer(t, "broken") originalCache := cacheStoresFromObjs(t, brokenConsumer) kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), @@ -1654,3 +1631,185 @@ func TestKongClient_ConfigDumpSanitization(t *testing.T) { }) } } + +func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { + ctx := context.Background() + configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + fallbackConfigGenerator := newMockFallbackConfigGenerator() + originalCache := cacheStoresFromObjs(t) + + testCases := []struct { + name string + errorsFromGateways []error + hasLastValidConfig bool + expectRecoveryByGeneratingFallbackConfig bool + expectRecoveryByApplyingLastValidConfig bool + }{ + { + name: "one of gateways returns UpdateError with entities", + errorsFromGateways: []error{ + sendconfig.NewUpdateError( + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "broken"))), + }, + errors.New("error on update"), + ), + nil, + }, + expectRecoveryByGeneratingFallbackConfig: true, + }, + { + name: "one of gateways returns UpdateError without entities, has last valid config", + errorsFromGateways: []error{ + sendconfig.NewUpdateError(nil, errors.New("error on update")), + nil, + }, + hasLastValidConfig: true, + expectRecoveryByApplyingLastValidConfig: true, + }, + { + name: "one of gateways returns UpdateError without entities, no last valid config", + errorsFromGateways: []error{ + sendconfig.NewUpdateError(nil, errors.New("error on update")), + nil, + }, + hasLastValidConfig: false, + expectRecoveryByGeneratingFallbackConfig: false, + expectRecoveryByApplyingLastValidConfig: false, + }, + { + name: "one of gateways returns unexpected error", + errorsFromGateways: []error{ + errors.New("unexpected error on update"), + nil, + }, + hasLastValidConfig: true, + expectRecoveryByGeneratingFallbackConfig: false, + expectRecoveryByApplyingLastValidConfig: false, + }, + { + name: "one gateway returns UpdateError, another one an unexpected error", + errorsFromGateways: []error{ + sendconfig.NewUpdateError( + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "broken"))), + }, + errors.New("error on update"), + ), + errors.New("unexpected error on update"), + nil, + }, + hasLastValidConfig: true, + expectRecoveryByGeneratingFallbackConfig: true, + expectRecoveryByApplyingLastValidConfig: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Preparing %d gateway clients", len(tc.errorsFromGateways)) + updateStrategyResolver := newMockUpdateStrategyResolver(t) + gwClients := make([]*adminapi.Client, len(tc.errorsFromGateways)) + for i := range gwClients { + gwClients[i] = mustSampleGatewayClient(t) + updateStrategyResolver.returnSpecificErrorOnUpdate(gwClients[i].BaseRootURL(), tc.errorsFromGateways[i]) + } + clientsProvider := mockGatewayClientsProvider{ + gatewayClients: gwClients, + } + + lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} + if tc.hasLastValidConfig { + t.Logf("Setting last valid config to contain a consumer with username 'last-valid'") + lastValidConfigFetcher.lastKongState = &kongstate.KongState{ + Consumers: []kongstate.Consumer{ + { + Consumer: kong.Consumer{ + Username: lo.ToPtr("last-valid"), + }, + }, + }, + } + } + + t.Logf("Preparing config builder with a consumer with username 'fallback'") + configBuilder := newMockKongConfigBuilder() + configBuilder.kongState = &kongstate.KongState{ + Consumers: []kongstate.Consumer{ + { + Consumer: kong.Consumer{ + Username: lo.ToPtr("fallback"), + }, + }, + }, + } + + kongClient, err := NewKongClient( + zapr.NewLogger(zap.NewNop()), + time.Second, + diagnostics.ConfigDumpDiagnostic{}, + sendconfig.Config{ + FallbackConfiguration: true, + }, + mocks.NewEventRecorder(), + dpconf.DBModeOff, + clientsProvider, + updateStrategyResolver, + configChangeDetector, + lastValidConfigFetcher, + configBuilder, + originalCache, + fallbackConfigGenerator, + ) + require.NoError(t, err) + + err = kongClient.Update(ctx) + require.Error(t, err) + + expectedUpdatedURLs := lo.Map(gwClients, func(c *adminapi.Client, _ int) string { + return c.BaseRootURL() + }) + if tc.expectRecoveryByGeneratingFallbackConfig || tc.expectRecoveryByApplyingLastValidConfig { + // In case of any recovery method, we expect the update to be called twice for each gateway. + expectedUpdatedURLs = slices.Concat(expectedUpdatedURLs, expectedUpdatedURLs) + } + t.Logf("Ensuring that the update strategy was called %d times", len(expectedUpdatedURLs)) + updateStrategyResolver.assertUpdateCalledForURLs(expectedUpdatedURLs) + + expectedContent := func(consumerUsername string) *file.Content { + return &file.Content{ + FormatVersion: "3.0", + Consumers: []file.FConsumer{ + { + Consumer: kong.Consumer{ + Username: lo.ToPtr(consumerUsername), + }, + }, + }, + } + } + receivedContent, ok := updateStrategyResolver.lastUpdatedContentForURL(expectedUpdatedURLs[0]) + require.True(t, ok) + if tc.expectRecoveryByApplyingLastValidConfig { + t.Log("Verifying that the last valid config was applied") + require.Equal(t, expectedContent("last-valid"), receivedContent.Content) + } + if tc.expectRecoveryByGeneratingFallbackConfig { + t.Log("Verifying that the fallback config was generated and applied") + require.Equal(t, expectedContent("fallback"), receivedContent.Content) + } + }) + } +} + +func someConsumer(t *testing.T, name string) *kongv1.KongConsumer { + return helpers.WithTypeMeta(t, &kongv1.KongConsumer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + Annotations: map[string]string{ + annotations.IngressClassKey: annotations.DefaultIngressClass, + }, + }, + Username: name, + }) +} diff --git a/internal/dataplane/sendconfig/inmemory.go b/internal/dataplane/sendconfig/inmemory.go index 21699002f25..f5883117e8b 100644 --- a/internal/dataplane/sendconfig/inmemory.go +++ b/internal/dataplane/sendconfig/inmemory.go @@ -4,12 +4,14 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" + "net/http" "github.com/go-logr/logr" "github.com/kong/go-database-reconciler/pkg/file" - + "github.com/kong/go-kong/kong" "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" "github.com/kong/kubernetes-ingress-controller/v3/internal/util" ) @@ -71,17 +73,28 @@ func (s UpdateStrategyInMemory) Update(ctx context.Context, targetState ContentW } } - if errBody, reloadConfigErr := s.configService.ReloadDeclarativeRawConfig(ctx, bytes.NewReader(config), true, true); reloadConfigErr != nil { - resourceErrors, parseErr := parseFlatEntityErrors(errBody, s.logger) - if parseErr != nil { - return fmt.Errorf("failed to parse flat entity errors from error response: %w", parseErr) + if errBody, reloadConfigErr := s.configService.ReloadDeclarativeRawConfig( + ctx, + bytes.NewReader(config), + true, + true, + ); reloadConfigErr != nil { + // If the returned error is an APIError with a 400 status code, we can try to parse the response body to get the + // resource errors and produce an UpdateError with them. + var apiError *kong.APIError + if errors.As(reloadConfigErr, &apiError) && apiError.Code() == http.StatusBadRequest { + resourceErrors, parseErr := parseFlatEntityErrors(errBody, s.logger) + if parseErr != nil { + return fmt.Errorf("failed to parse flat entity errors from error response: %w", parseErr) + } + return NewUpdateErrorWithResponseBody( + errBody, + resourceErrorsToResourceFailures(resourceErrors, s.logger), + reloadConfigErr, + ) } - - return NewUpdateErrorWithResponseBody( - errBody, - resourceErrorsToResourceFailures(resourceErrors, s.logger), - reloadConfigErr, - ) + // ...otherwise, we return the original one. + return fmt.Errorf("failed to reload declarative configuration: %w", reloadConfigErr) } return nil } diff --git a/internal/dataplane/sendconfig/inmemory_test.go b/internal/dataplane/sendconfig/inmemory_test.go new file mode 100644 index 00000000000..bbcd9f783ae --- /dev/null +++ b/internal/dataplane/sendconfig/inmemory_test.go @@ -0,0 +1,153 @@ +package sendconfig_test + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/go-logr/logr" + "github.com/kong/go-database-reconciler/pkg/file" + "github.com/kong/go-kong/kong" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/failures" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" + "github.com/samber/lo" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const validFlattenedErrorsResponse = `{ + "code": 14, + "name": "invalid declarative configuration", + "flattened_errors": [ + { + "entity_name": "ingress.httpbin.httpbin..80", + "entity_tags": [ + "k8s-name:httpbin", + "k8s-namespace:default", + "k8s-kind:Ingress", + "k8s-uid:7b3f3b3b-0b3b-4b3b-8b3b-3b3b3b3b3b3b", + "k8s-group:networking.k8s.io", + "k8s-version:v1" + ], + "errors": [ + { + "field": "methods", + "type": "field", + "message": "cannot set 'methods' when 'protocols' is 'grpc' or 'grpcs'" + } + ], + "entity": { + "regex_priority": 0, + "preserve_host": true, + "name": "ingress.httpbin.httpbin..80", + "protocols": [ + "grpcs" + ], + "https_redirect_status_code": 426, + "request_buffering": true, + "tags": [ + "k8s-name:httpbin", + "k8s-namespace:default", + "k8s-kind:Ingress", + "k8s-uid:7b3f3b3b-0b3b-4b3b-8b3b-3b3b3b3b3b3b", + "k8s-group:networking.k8s.io", + "k8s-version:v1" + ], + "path_handling": "v0", + "response_buffering": true, + "methods": [ + "GET" + ], + "paths": [ + "/bar/", + "~/bar$" + ] + }, + "entity_type": "route" + } + ], + "message": "declarative config is invalid: {}", + "fields": {} +}` + +type mockConfigService struct { + responseBody []byte + err error +} + +func (m *mockConfigService) ReloadDeclarativeRawConfig(context.Context, io.Reader, bool, bool) ([]byte, error) { + return m.responseBody, m.err +} + +type mockConfigConverter struct { + called bool +} + +func (m *mockConfigConverter) Convert(*file.Content) sendconfig.DBLessConfig { + m.called = true + return sendconfig.DBLessConfig{} +} + +func TestUpdateStrategyInMemory(t *testing.T) { + testCases := []struct { + name string + configServiceError error + configServiceResponseBody []byte + expectedError error + }{ + { + name: "no error returned from config service", + configServiceError: nil, + expectedError: nil, + }, + { + name: "unexpected error returned from config service", + configServiceError: fmt.Errorf("unexpected error"), // e.g. network error + expectedError: fmt.Errorf("failed to reload declarative configuration: %w", fmt.Errorf("unexpected error")), + }, + { + name: "APIError 500 returned from config service", + configServiceError: kong.NewAPIError(500, "internal error"), + expectedError: fmt.Errorf("failed to reload declarative configuration: %w", kong.NewAPIError(500, "internal error")), + }, + { + name: "APIError 400 with no resource failures returned from config service", + configServiceError: kong.NewAPIError(400, "bad request"), + expectedError: sendconfig.NewUpdateError(nil, kong.NewAPIError(400, "bad request")), + }, + { + name: "APIError 400 with resource failures returned from config service", + configServiceError: kong.NewAPIError(400, "bad request"), + configServiceResponseBody: []byte(validFlattenedErrorsResponse), + expectedError: sendconfig.NewUpdateErrorWithResponseBody( + []byte(validFlattenedErrorsResponse), + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("invalid methods: cannot set 'methods' when 'protocols' is 'grpc' or 'grpcs'", &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "networking.k8s.io/v1", + Kind: "Ingress", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "httpbin", + Namespace: "default", + UID: "7b3f3b3b-0b3b-4b3b-8b3b-3b3b3b3b3b3b", + }, + })), + }, + kong.NewAPIError(400, "bad request"), + ), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + configService := &mockConfigService{responseBody: tc.configServiceResponseBody, err: tc.configServiceError} + configConverter := &mockConfigConverter{} + s := sendconfig.NewUpdateStrategyInMemory(configService, configConverter, logr.Discard()) + err := s.Update(context.Background(), sendconfig.ContentWithHash{}) + require.Equal(t, tc.expectedError, err) + require.True(t, configConverter.called) + }) + } +}