From f46014fec7deed067a97235ef65a7c0700f378b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Mon, 1 Jul 2024 15:17:22 +0200 Subject: [PATCH] fix(fallback): do not skip config update when gateways out of sync (#6271) (#6274) (cherry picked from commit 3b547a9feaba331763601cb87612480cf515f850) --- CHANGELOG.md | 16 +- internal/adminapi/client.go | 12 + internal/dataplane/kong_client.go | 39 ++-- internal/dataplane/kong_client_golden_test.go | 2 +- internal/dataplane/kong_client_test.go | 213 ++++++++++++------ internal/dataplane/sendconfig/sendconfig.go | 6 +- internal/manager/run.go | 2 +- 7 files changed, 206 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88ec0c3928..fee67f7d44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Adding a new version? You'll need three changes: * Add the diff link, like "[2.7.0]: https://github.com/kong/kubernetes-ingress-controller/compare/v1.2.2...v1.2.3". This is all the way at the bottom. It's the thing we always forget. ---> + - [3.2.2](#322) - [3.2.1](#321) - [3.2.0](#320) - [3.1.6](#316) @@ -90,9 +91,21 @@ Adding a new version? You'll need three changes: - [0.0.5](#005) - [0.0.4 and prior](#004-and-prior) +## 3.2.2 + +> Release date: 2024-07-01 + +### Fixed + +- Fixed an issue where new gateways were not being populated with the current configuration when + `FallbackConfiguration` feature gate was turned on. Previously, configuration updates were skipped + if the Kubernetes config cache did not change, leading to inconsistencies. Now, the system ensures + that all gateways are populated with the latest configuration regardless of cache changes. + [#6271](https://github.com/Kong/kubernetes-ingress-controller/pull/6271) + ## 3.2.1 -> Release date: 2024-06-28 +> Release date: 2024-06-28 ### Fixed @@ -3559,6 +3572,7 @@ Please read the changelog and test in your environment. - The initial versions were rapildy iterated to deliver a working ingress controller. +[3.2.2]: https://github.com/kong/kubernetes-ingress-controller/compare/v3.2.1...v3.2.2 [3.2.1]: https://github.com/kong/kubernetes-ingress-controller/compare/v3.2.0...v3.2.1 [3.2.0]: https://github.com/kong/kubernetes-ingress-controller/compare/v3.1.6...v3.2.0 [3.1.6]: https://github.com/kong/kubernetes-ingress-controller/compare/v3.1.5...v3.1.6 diff --git a/internal/adminapi/client.go b/internal/adminapi/client.go index 74142c805d..35e2deb6a1 100644 --- a/internal/adminapi/client.go +++ b/internal/adminapi/client.go @@ -10,6 +10,7 @@ import ( "github.com/samber/lo" k8stypes "k8s.io/apimachinery/pkg/types" + "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/util" "github.com/kong/kubernetes-ingress-controller/v3/internal/util/clock" ) @@ -25,6 +26,7 @@ type Client struct { isKonnect bool konnectControlPlane string lastConfigSHA []byte + lastCacheStoresHash store.SnapshotHash // podRef (optional) describes the Pod that the Client communicates with. podRef *k8stypes.NamespacedName @@ -154,6 +156,16 @@ func (c *Client) KonnectControlPlane() string { return c.konnectControlPlane } +// SetLastCacheStoresHash overrides last cache stores hash. +func (c *Client) SetLastCacheStoresHash(s store.SnapshotHash) { + c.lastCacheStoresHash = s +} + +// LastCacheStoresHash returns a checksum of the last successful cache stores push. +func (c *Client) LastCacheStoresHash() store.SnapshotHash { + return c.lastCacheStoresHash +} + // SetLastConfigSHA overrides last config SHA. func (c *Client) SetLastConfigSHA(s []byte) { c.lastConfigSHA = s diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 00ccc65140..0641b540bf 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -196,7 +196,7 @@ func NewKongClient( configChangeDetector sendconfig.ConfigurationChangeDetector, kongConfigFetcher configfetcher.LastValidConfigFetcher, kongConfigBuilder KongConfigBuilder, - cacheStores store.CacheStores, + cacheStores *store.CacheStores, fallbackConfigGenerator FallbackConfigGenerator, ) (*KongClient, error) { c := &KongClient{ @@ -204,7 +204,7 @@ func NewKongClient( requestTimeout: timeout, diagnostic: diagnostic, prometheusMetrics: metrics.NewCtrlFuncMetrics(), - cache: &cacheStores, + cache: cacheStores, kongConfig: kongConfig, eventRecorder: eventRecorder, dbmode: dbMode, @@ -444,23 +444,30 @@ func (c *KongClient) Update(ctx context.Context) error { if c.kongConfig.FallbackConfiguration { var newSnapshotHash store.SnapshotHash var err error + // Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken. That optimization can be used + // in main code path to avoid unnecessary processing. TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6095 cacheSnapshot, newSnapshotHash, err = c.cache.TakeSnapshotIfChanged(c.lastProcessedSnapshotHash) if err != nil { return fmt.Errorf("failed to take snapshot of cache: %w", err) } - // Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken. That optimization can be used - // in main code path to avoid unnecessary processing. TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6095 - // TODO: We should short-circuit here only if all the Gateways were successfully synced with the current configuration. - // https://github.com/Kong/kubernetes-ingress-controller/issues/6219 - if newSnapshotHash == store.SnapshotHashEmpty { + hasNewSnapshotToBeProcessed := newSnapshotHash != store.SnapshotHashEmpty + if !hasNewSnapshotToBeProcessed { c.prometheusMetrics.RecordProcessedConfigSnapshotCacheHit() - c.logger.V(util.DebugLevel).Info("No configuration change; pushing config to gateway is not necessary, skipping") - return nil + } else { + c.prometheusMetrics.RecordProcessedConfigSnapshotCacheMiss() + } + if hasNewSnapshotToBeProcessed { + c.logger.V(util.DebugLevel).Info("New configuration snapshot detected", "hash", newSnapshotHash) + c.lastProcessedSnapshotHash = newSnapshotHash + c.kongConfigBuilder.UpdateCache(cacheSnapshot) } - c.prometheusMetrics.RecordProcessedConfigSnapshotCacheMiss() - c.lastProcessedSnapshotHash = newSnapshotHash - c.kongConfigBuilder.UpdateCache(cacheSnapshot) + if allGatewaysAreInSync := lo.EveryBy(c.clientsProvider.GatewayClientsToConfigure(), func(cl *adminapi.Client) bool { + return cl.LastCacheStoresHash() == c.lastProcessedSnapshotHash + }); allGatewaysAreInSync { + c.logger.V(util.DebugLevel).Info("All gateways are in sync; pushing config is not necessary, skipping") + return nil + } } c.logger.V(util.DebugLevel).Info("Parsing kubernetes objects into data-plane configuration") @@ -700,6 +707,12 @@ func (c *KongClient) sendOutToGatewayClients( len(gatewayClients) > 1 { for _, client := range gatewayClients { client.SetLastConfigSHA([]byte(shas[0])) + + // If the last processed snapshot hash is not empty, we should set it to the clients as well. + // It can be empty when FallbackConfiguration feature gate is off. + if c.lastProcessedSnapshotHash != "" { + client.SetLastCacheStoresHash(c.lastProcessedSnapshotHash) + } } } @@ -840,7 +853,7 @@ func (c *KongClient) sendToClient( sendDiagnostic(diagnostics.DumpMeta{Failed: false, Hash: string(newConfigSHA)}, nil) // No error occurred. // update the lastConfigSHA with the new updated checksum client.SetLastConfigSHA(newConfigSHA) - + client.SetLastCacheStoresHash(c.lastProcessedSnapshotHash) return string(newConfigSHA), nil } diff --git a/internal/dataplane/kong_client_golden_test.go b/internal/dataplane/kong_client_golden_test.go index 60d7ae8b65..a6c3dbacd0 100644 --- a/internal/dataplane/kong_client_golden_test.go +++ b/internal/dataplane/kong_client_golden_test.go @@ -307,7 +307,7 @@ func runKongClientGoldenTest(t *testing.T, tc kongClientGoldenTestCase) { sendconfig.NewDefaultConfigurationChangeDetector(logger), lastValidConfigFetcher, p, - cacheStores, + &cacheStores, fallbackConfigGenerator, ) require.NoError(t, err) diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 6ad4bef24a..1c326f419f 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" @@ -153,15 +154,15 @@ type mockGatewayClientsProvider struct { dbMode dpconf.DBMode } -func (p mockGatewayClientsProvider) KonnectClient() *adminapi.KonnectClient { +func (p *mockGatewayClientsProvider) KonnectClient() *adminapi.KonnectClient { return p.konnectClient } -func (p mockGatewayClientsProvider) GatewayClients() []*adminapi.Client { +func (p *mockGatewayClientsProvider) GatewayClients() []*adminapi.Client { return p.gatewayClients } -func (p mockGatewayClientsProvider) GatewayClientsToConfigure() []*adminapi.Client { +func (p *mockGatewayClientsProvider) GatewayClientsToConfigure() []*adminapi.Client { if p.dbMode.IsDBLessMode() { return p.gatewayClients } @@ -384,7 +385,7 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: tc.gatewayClients, konnectClient: tc.konnectClient, } @@ -415,7 +416,7 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes } func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) { - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{ mustSampleGatewayClient(t), mustSampleGatewayClient(t), @@ -531,7 +532,7 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { testKonnectClient = mustSampleKonnectClient(t) testGatewayClient = mustSampleGatewayClient(t) - clientsProvider = mockGatewayClientsProvider{ + clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, konnectClient: testKonnectClient, } @@ -616,7 +617,7 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { func TestKongClient_ApplyConfigurationEvents(t *testing.T) { testGatewayClient := mustSampleGatewayClient(t) - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, } updateStrategyResolver := newMockUpdateStrategyResolver(t) @@ -677,10 +678,6 @@ func TestKongClient_KubernetesEvents(t *testing.T) { t.Setenv("POD_NAME", "test-pod") ctx := context.Background() - testGatewayClient := mustSampleGatewayClient(t) - clientsProvider := mockGatewayClientsProvider{ - gatewayClients: []*adminapi.Client{testGatewayClient}, - } configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} testIngress := helpers.WithTypeMeta(t, &netv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ @@ -794,6 +791,10 @@ func TestKongClient_KubernetesEvents(t *testing.T) { configBuilder := newMockKongConfigBuilder() eventRecorder := mocks.NewEventRecorder() lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} + testGatewayClient := mustSampleGatewayClient(t) + clientsProvider := &mockGatewayClientsProvider{ + gatewayClients: []*adminapi.Client{testGatewayClient}, + } kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, eventRecorder, lastValidConfigFetcher) kongClient.kongConfig.FallbackConfiguration = tc.fallbackConfiguration @@ -855,7 +856,7 @@ func TestKongClient_EmptyConfigUpdate(t *testing.T) { testKonnectClient = mustSampleKonnectClient(t) testGatewayClient = mustSampleGatewayClient(t) - clientsProvider = mockGatewayClientsProvider{ + clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, konnectClient: testKonnectClient, } @@ -909,7 +910,7 @@ func TestKongClient_EmptyConfigUpdate(t *testing.T) { func setupTestKongClient( t *testing.T, updateStrategyResolver *mockUpdateStrategyResolver, - clientsProvider mockGatewayClientsProvider, + clientsProvider *mockGatewayClientsProvider, configChangeDetector sendconfig.ConfigurationChangeDetector, configBuilder *mockKongConfigBuilder, eventRecorder record.EventRecorder, @@ -926,6 +927,7 @@ func setupTestKongClient( eventRecorder = mocks.NewEventRecorder() } + cacheStores := store.NewCacheStores() kongClient, err := NewKongClient( logger, timeout, @@ -938,7 +940,7 @@ func setupTestKongClient( configChangeDetector, kongRawStateGetter, configBuilder, - store.NewCacheStores(), + &cacheStores, newMockFallbackConfigGenerator(), ) require.NoError(t, err) @@ -962,7 +964,7 @@ func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { return adminapi.NewKonnectClient(c, rgID) } -func mapClientsToUrls(clients mockGatewayClientsProvider) []string { +func mapClientsToUrls(clients *mockGatewayClientsProvider) []string { urls := lo.Map(clients.GatewayClients(), func(c *adminapi.Client, _ int) string { return c.BaseRootURL() }) @@ -999,7 +1001,7 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { var ( ctx = context.Background() - clientsProvider = mockGatewayClientsProvider{ + clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{ mustSampleGatewayClient(t), mustSampleGatewayClient(t), @@ -1128,7 +1130,7 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { func TestKongClientUpdate_KonnectUpdatesAreSanitized(t *testing.T) { ctx := context.Background() - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{mustSampleGatewayClient(t)}, konnectClient: mustSampleKonnectClient(t), } @@ -1169,12 +1171,6 @@ func TestKongClientUpdate_KonnectUpdatesAreSanitized(t *testing.T) { func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { ctx := context.Background() - gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) - clientsProvider := mockGatewayClientsProvider{ - gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, - } configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} diagnosticsCh := make(chan diagnostics.ConfigDump, 10) // make it buffered to avoid blocking @@ -1208,6 +1204,12 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { updateStrategyResolver := newMockUpdateStrategyResolver(t) configBuilder := newMockKongConfigBuilder() fallbackConfigGenerator := newMockFallbackConfigGenerator() + gwClient := mustSampleGatewayClient(t) + konnectClient := mustSampleKonnectClient(t) + clientsProvider := &mockGatewayClientsProvider{ + gatewayClients: []*adminapi.Client{gwClient}, + konnectClient: konnectClient, + } kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), time.Second, @@ -1225,7 +1227,7 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { configChangeDetector, lastValidConfigFetcher, configBuilder, - originalCache, + &originalCache, fallbackConfigGenerator, ) require.NoError(t, err) @@ -1327,11 +1329,11 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { require.True(t, dump.Meta.Fallback) } -func TestKongClient_FallbackConfiguration_SkipMakingRedundantSnapshot(t *testing.T) { +func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { ctx := context.Background() gwClient := mustSampleGatewayClient(t) konnectClient := mustSampleKonnectClient(t) - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, konnectClient: konnectClient, } @@ -1344,20 +1346,7 @@ func TestKongClient_FallbackConfiguration_SkipMakingRedundantSnapshot(t *testing // We'll use KongConsumer as an example of an object, but it could be any supported type // for the purpose of this test as the fallback config generator is mocked anyway. - someConsumer := func(name string) *kongv1.KongConsumer { - return &kongv1.KongConsumer{ - ObjectMeta: metav1.ObjectMeta{ - Name: "name", - Namespace: "namespace", - Annotations: map[string]string{ - annotations.IngressClassKey: annotations.DefaultIngressClass, - }, - }, - Username: name, - } - } - - originalCache := cacheStoresFromObjs(t, someConsumer("valid")) + originalCache := cacheStoresFromObjs(t, someConsumer(t, "valid")) kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), time.Second, @@ -1374,29 +1363,125 @@ func TestKongClient_FallbackConfiguration_SkipMakingRedundantSnapshot(t *testing configChangeDetector, lastValidConfigFetcher, configBuilder, - originalCache, + &originalCache, fallbackConfigGenerator, ) require.NoError(t, err) - t.Log("Calling KongClient.Update") - require.NoError(t, kongClient.Update(ctx)) + t.Run("on first update clients are updated", func(t *testing.T) { + t.Log("Calling KongClient.Update") + require.NoError(t, kongClient.Update(ctx)) - t.Log("Verifying that the config builder cache was updated once") - require.Len(t, configBuilder.updateCacheCalls, 1) + t.Log("Verifying that the config builder cache was updated once") + require.Len(t, configBuilder.updateCacheCalls, 1) - t.Log("Calling KongClient.Update again") - require.NoError(t, kongClient.Update(ctx)) + t.Log("Verifying that the update strategy was called once for gateway and Konnect") + updateStrategyResolver.assertUpdateCalledForURLs([]string{gwClient.BaseRootURL(), konnectClient.BaseRootURL()}) + }) + + t.Run("without clients change, on second update clients are not updated", func(t *testing.T) { + t.Log("Calling KongClient.Update again") + require.NoError(t, kongClient.Update(ctx)) + + t.Log("Verifying that the config builder cache was not updated when config was not changed") + require.Len(t, configBuilder.updateCacheCalls, 1) + + t.Log("Verifying that the update strategy was not called again") + updateStrategyResolver.assertUpdateCalledForURLs([]string{gwClient.BaseRootURL(), konnectClient.BaseRootURL()}) + }) + + newGwClient := mustSampleGatewayClient(t) + t.Run("when new client is discovered, it is updated", func(t *testing.T) { + t.Log("Injecting a new client to the provider") + clientsProvider.gatewayClients = append(clientsProvider.gatewayClients, newGwClient) + + t.Log("Calling KongClient.Update again") + require.NoError(t, kongClient.Update(ctx)) - t.Log("Verifying that the config builder cache was not updated when config was not changed") - require.Len(t, configBuilder.updateCacheCalls, 1) + t.Log("Verifying that the config builder cache is not updated as there was no config change") + require.Len(t, configBuilder.updateCacheCalls, 1) + + t.Log("Verifying that the update strategies were called for the client that was added") + updateStrategyResolver.assertUpdateCalledForURLs([]string{ + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), // First series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second series of updates + }) + }) + + t.Run("when generating fallback, all clients are updated", func(t *testing.T) { + t.Log("Changing configuration") + require.NoError(t, originalCache.Add(someConsumer(t, "broken"))) // Add a consumer to cache to change the cache hash. + + t.Log("Setting update strategy to return an error on the first call to trigger fallback configuration generation") + updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateError( + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "invalid"))), + }, + errors.New("error on update"), + )) + + t.Log("Calling KongClient.Update") + require.Error(t, kongClient.Update(ctx)) + + t.Log("Verifying that the update strategy was called again for all gateways and Konnect") + updateStrategyResolver.assertUpdateCalledForURLs([]string{ + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), // First series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Rejected series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Fallback series of updates + }) + }) + + anotherNewGwClient := mustSampleGatewayClient(t) + t.Run("when fallback was used before and config is still broken, after discovering a new client, all clients are updated", func(t *testing.T) { + t.Log("Adding a new client to the provider") + clientsProvider.gatewayClients = append(clientsProvider.gatewayClients, anotherNewGwClient) + updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateError( + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "invalid"))), + }, + errors.New("error on update"), + )) + + t.Log("Calling KongClient.Update again") + require.Error(t, kongClient.Update(ctx)) + + t.Log("Verifying that the update strategy was called again for all gateways and Konnect") + updateStrategyResolver.assertUpdateCalledForURLs([]string{ + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), // First series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Rejected series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Fallback series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), anotherNewGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second rejected series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), anotherNewGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second fallback series of updates + }) + }) + + t.Run("after fallback, when new config is correct, all clients are updated", func(t *testing.T) { + t.Log("Changing configuration") + require.NoError(t, originalCache.Consumer.Add(someConsumer(t, "valid"))) // Add a consumer to cache to change the cache hash. + + t.Log("Calling KongClient.Update") + require.NoError(t, kongClient.Update(ctx)) + + t.Log("Verifying that the update strategy was called again for all gateways and Konnect") + updateStrategyResolver.assertUpdateCalledForURLs([]string{ + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), // First series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Rejected series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Fallback series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), anotherNewGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second rejected series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), anotherNewGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Second fallback series of updates + gwClient.BaseRootURL(), newGwClient.BaseRootURL(), anotherNewGwClient.BaseRootURL(), konnectClient.BaseRootURL(), // Third series of updates + }) + }) } func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { ctx := context.Background() gwClient := mustSampleGatewayClient(t) konnectClient := mustSampleKonnectClient(t) - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, konnectClient: konnectClient, } @@ -1427,7 +1512,7 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { configChangeDetector, lastValidConfigFetcher, configBuilder, - originalCache, + &originalCache, fallbackConfigGenerator, ) require.NoError(t, err) @@ -1475,15 +1560,7 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { func TestKongClient_LastValidCacheSnapshot(t *testing.T) { var ( - ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) - testGatewayClient = mustSampleGatewayClient(t) - - clientsProvider = mockGatewayClientsProvider{ - gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, - } - + ctx = context.Background() updateStrategyResolver = newMockUpdateStrategyResolver(t) configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} configBuilder = newMockKongConfigBuilder() @@ -1525,6 +1602,13 @@ func TestKongClient_LastValidCacheSnapshot(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + testKonnectClient := mustSampleKonnectClient(t) + testGatewayClient := mustSampleGatewayClient(t) + clientsProvider := &mockGatewayClientsProvider{ + gatewayClients: []*adminapi.Client{testGatewayClient}, + konnectClient: testKonnectClient, + } + kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), time.Second, @@ -1540,7 +1624,7 @@ func TestKongClient_LastValidCacheSnapshot(t *testing.T) { configChangeDetector, lastValidConfigFetcher, configBuilder, - originalCache, + &originalCache, fallbackConfigGenerator, ) require.NoError(t, err) @@ -1572,7 +1656,7 @@ func cacheStoresFromObjs(t *testing.T, objs ...runtime.Object) store.CacheStores } func TestKongClient_ConfigDumpSanitization(t *testing.T) { - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{ mustSampleGatewayClient(t), }, @@ -1718,7 +1802,7 @@ func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { gwClients[i] = mustSampleGatewayClient(t) updateStrategyResolver.returnSpecificErrorOnUpdate(gwClients[i].BaseRootURL(), tc.errorsFromGateways[i]) } - clientsProvider := mockGatewayClientsProvider{ + clientsProvider := &mockGatewayClientsProvider{ gatewayClients: gwClients, } @@ -1762,7 +1846,7 @@ func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { configChangeDetector, lastValidConfigFetcher, configBuilder, - originalCache, + &originalCache, fallbackConfigGenerator, ) require.NoError(t, err) @@ -1814,6 +1898,7 @@ func someConsumer(t *testing.T, name string) *kongv1.KongConsumer { Annotations: map[string]string{ annotations.IngressClassKey: annotations.DefaultIngressClass, }, + UID: k8stypes.UID(uuid.NewString()), }, Username: name, }) diff --git a/internal/dataplane/sendconfig/sendconfig.go b/internal/dataplane/sendconfig/sendconfig.go index 7946ce21b7..adf114f31a 100644 --- a/internal/dataplane/sendconfig/sendconfig.go +++ b/internal/dataplane/sendconfig/sendconfig.go @@ -12,6 +12,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen" "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" + "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/util" ) @@ -27,6 +28,7 @@ type AdminAPIClient interface { AdminAPIClient() *kong.Client LastConfigSHA() []byte SetLastConfigSHA([]byte) + SetLastCacheStoresHash(store.SnapshotHash) BaseRootURL() string PluginSchemaStore() *util.PluginSchemaStore @@ -110,7 +112,3 @@ func PerformUpdate( return newSHA, nil } - -// ----------------------------------------------------------------------------- -// Sendconfig - Private Functions -// ----------------------------------------------------------------------------- diff --git a/internal/manager/run.go b/internal/manager/run.go index c10b4573cf..d37f7ef87d 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -206,7 +206,7 @@ func Run( configurationChangeDetector, kongConfigFetcher, configTranslator, - cache, + &cache, fallbackConfigGenerator, ) if err != nil {