Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fallback): backfill all affected objects, not only broken #6172

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions internal/dataplane/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,37 @@ func (g *Generator) GenerateBackfillingBrokenObjects(
lastValidCacheSnapshot store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
// Generate a fallback cache snapshot excluding the broken objects.
fallbackCache, err := g.GenerateExcludingBrokenObjects(currentCache, brokenObjects)
// Build a graph from the current cache.
currentGraph, err := g.cacheGraphProvider.CacheToGraph(currentCache)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to generate fallback cache: %w", err)
return store.CacheStores{}, fmt.Errorf("failed to build current cache graph: %w", err)
}

// Take a snapshot of the current cache to use as a fallback.
fallbackCache, err := currentCache.TakeSnapshot()
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to take current cache snapshot: %w", err)
}

// Exclude the affected objects from the fallback cache. Also, collect all the affected objects as they will be
// subjects of backfilling.
var affectedObjects []ObjectHash
for _, brokenObject := range brokenObjects {
subgraphObjects, err := currentGraph.SubgraphObjects(brokenObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
}
for _, obj := range subgraphObjects {
if err := fallbackCache.Delete(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to delete %s from the fallback cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
affectedObjects = append(affectedObjects, GetObjectHash(obj))
}
}

// Build a graph from the last valid cache snapshot.
Expand All @@ -79,18 +106,18 @@ func (g *Generator) GenerateBackfillingBrokenObjects(
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
}

// Backfill the broken objects from the last valid cache snapshot.
for _, brokenObject := range brokenObjects {
objectsToBackfill, err := lastValidGraph.SubgraphObjects(brokenObject)
// Backfill the affected objects from the last valid cache snapshot.
for _, affectedObject := range affectedObjects {
objectsToBackfill, err := lastValidGraph.SubgraphObjects(affectedObject)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", brokenObject, err)
return store.CacheStores{}, fmt.Errorf("failed to find dependants for %s: %w", affectedObject, err)
}

for _, obj := range objectsToBackfill {
if err := fallbackCache.Add(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to add %s to the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Backfilled object to fallback cache",
g.logger.V(util.DebugLevel).Info("Backfilled object to fallback cache from previous valid cache snapshot",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
Expand Down
57 changes: 30 additions & 27 deletions internal/dataplane/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback"
"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
kongv1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1"
incubatorv1alpha1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/incubator/v1alpha1"
)

// mockGraphProvider is a mock implementation of the CacheGraphProvider interface.
Expand Down Expand Up @@ -177,40 +176,46 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
// Dependency resolving between the objects is tested in TestResolveDependencies_* tests.
//
// Graph structure (edges define dependency -> dependant relationship):
// ┌────────────┐ ┌──────┐
// │ingressClass│ │plugin│
// └──────┬─────┘ └──────┘
// │
// ┌───▼───┐
// │service│
// └───┬───┘
// │
// ┌──────▼──────┐
// │serviceFacade│
// └─────────────┘
graph, err := NewGraphBuilder().
// ┌────────────┐ ┌──────┐
// │ingressClass│ │plugin│
// └──────┬─────┘ └───┬──┘
// │ │
// ├────────────┘
// │
// ┌───▼───┐
// │service│
// └───┬───┘
// │
// ┌──────▼──────┐
// │serviceFacade│
// └─────────────┘
currentGraph, err := NewGraphBuilder().
WithVertices(ingressClass, service, serviceFacade, plugin).
WithEdge(ingressClass, service).
WithEdge(plugin, service).
WithEdge(service, serviceFacade).
Build()
require.NoError(t, err)

// Fallback graph differs from the input graph by lack of the serviceFacade.
// ┌────────────┐ ┌──────┐
// │ingressClass│ │plugin│
// └──────┬─────┘ └──────┘
// │
// ┌───▼───┐
// │service│
// └───────┘
// Last valid graph differs from the input graph by lack of the serviceFacade.
// ┌────────────┐ ┌──────┐
// │ingressClass│ │plugin│
// └──────┬─────┘ └───┬──┘
// │ │
// ├────────────┘
// │
// ┌───▼───┐
// │service│
// └───────┘
lastValidGraph, err := NewGraphBuilder().
WithVertices(lastValidIngressClass, lastValidService, lastValidPlugin).
WithEdge(lastValidIngressClass, lastValidService).
WithEdge(lastValidPlugin, lastValidService).
Build()
require.NoError(t, err)

graphProvider := &mockGraphProvider{}
graphProvider.ReturnGraphOn(inputCacheStores, graph)
graphProvider.ReturnGraphOn(inputCacheStores, currentGraph)
graphProvider.ReturnGraphOn(lastValidCacheStores, lastValidGraph)
g := fallback.NewGenerator(graphProvider, logr.Discard())

Expand Down Expand Up @@ -294,12 +299,10 @@ func TestGenerator_GenerateBackfillingBrokenObjects(t *testing.T) {
requireNotAnnotatedLastValid(t, fallbackIngressClass)

fallbackService, err := getFromStore[*corev1.Service](fallbackCache.Service, service)
require.NoError(t, err)
requireNotAnnotatedLastValid(t, fallbackService)
require.NoError(t, err, "service should be backfilled as it was present in the last valid state and is directly affected by broken plugin")
requireAnnotatedLastValid(t, fallbackService)

fallbackServiceFacade, err := getFromStore[*incubatorv1alpha1.KongServiceFacade](fallbackCache.KongServiceFacade, serviceFacade)
require.NoError(t, err)
requireNotAnnotatedLastValid(t, fallbackServiceFacade)
require.Empty(t, fallbackCache.KongServiceFacade.List(), "serviceFacade shouldn't be recovered as it wasn't in the last valid cache snapshot and it's indirectly affected by broken plugin")
})

t.Run("multiple objects are broken", func(t *testing.T) {
Expand Down
19 changes: 10 additions & 9 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ func Run(
kongSemVersion := semver.Version{Major: v.Major(), Minor: v.Minor(), Patch: v.Patch()}

kongConfig := sendconfig.Config{
Version: kongSemVersion,
InMemory: dbMode.IsDBLessMode(),
Concurrency: c.Concurrency,
FilterTags: c.FilterTags,
SkipCACertificates: c.SkipCACertificates,
EnableReverseSync: c.EnableReverseSync,
ExpressionRoutes: dpconf.ShouldEnableExpressionRoutes(routerFlavor),
SanitizeKonnectConfigDumps: featureGates.Enabled(featuregates.SanitizeKonnectConfigDumps),
FallbackConfiguration: featureGates.Enabled(featuregates.FallbackConfiguration),
Version: kongSemVersion,
InMemory: dbMode.IsDBLessMode(),
Concurrency: c.Concurrency,
FilterTags: c.FilterTags,
SkipCACertificates: c.SkipCACertificates,
EnableReverseSync: c.EnableReverseSync,
ExpressionRoutes: dpconf.ShouldEnableExpressionRoutes(routerFlavor),
SanitizeKonnectConfigDumps: featureGates.Enabled(featuregates.SanitizeKonnectConfigDumps),
FallbackConfiguration: featureGates.Enabled(featuregates.FallbackConfiguration),
UseLastValidConfigForFallback: c.UseLastValidConfigForFallback,
}

setupLog.Info("Configuring and building the controller manager")
Expand Down
41 changes: 25 additions & 16 deletions test/integration/isolated/examples_httproute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
eventsv1 "k8s.io/api/events/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"

"github.com/kong/kubernetes-ingress-controller/v3/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v3/internal/gatewayapi"
"github.com/kong/kubernetes-ingress-controller/v3/internal/manager/featuregates"
kongv1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1"
Expand Down Expand Up @@ -109,11 +111,15 @@ func TestHTTPRouteUseLastValidConfigWithBrokenPluginFallback(t *testing.T) {
additionalRoutePath,
http.StatusOK,
additionalRouteServiceTarget,
map[string]string{
additionalHeaderKey: additionalHeaderValue,
},
nil,
consts.IngressWait,
consts.WaitTick,
func(resp *http.Response, _ string) (reason string, ok bool) {
if resp.Header.Get(additionalHeaderKey) != additionalHeaderValue {
return fmt.Sprintf("response header %s == %s not found", additionalHeaderKey, additionalHeaderValue), false
}
return "", true
},
)
}

Expand Down Expand Up @@ -145,6 +151,7 @@ func TestHTTPRouteUseLastValidConfigWithBrokenPluginFallback(t *testing.T) {
Name: additionalRouteName,
Annotations: map[string]string{
annotations.AnnotationPrefix + annotations.StripPathKey: "true",
annotations.AnnotationPrefix + annotations.PluginsKey: "response-transformer",
},
},
Spec: gatewayapi.HTTPRouteSpec{
Expand Down Expand Up @@ -191,11 +198,10 @@ func TestHTTPRouteUseLastValidConfigWithBrokenPluginFallback(t *testing.T) {
Name: "response-transformer",
},
PluginName: "response-transformer",
// Misconfigured on purpose.
Config: apiextensionsv1.JSON{
Raw: []byte(fmt.Sprintf(`
{
"config": {"add": {"headers": ["%s:%s"]}}
"add": {"headers": ["%s:%s"]}
}`, additionalHeaderKey, additionalHeaderValue),
),
},
Expand All @@ -208,7 +214,7 @@ func TestHTTPRouteUseLastValidConfigWithBrokenPluginFallback(t *testing.T) {

return ctx
}).
Assess("attach broken plugin to a working route", func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context {
Assess("break plugin's configuration and wait for event indicating that", func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context {
client := GetFromCtxForT[*clientset.Clientset](ctx, t)

plugin, err := client.ConfigurationV1().KongPlugins(namespace).Get(ctx, "response-transformer", metav1.GetOptions{})
Expand All @@ -219,16 +225,19 @@ func TestHTTPRouteUseLastValidConfigWithBrokenPluginFallback(t *testing.T) {
_, err = client.ConfigurationV1().KongPlugins(namespace).Update(ctx, plugin, metav1.UpdateOptions{})
require.NoError(t, err)

t.Log("getting a gateway client")
gatewayClient := GetFromCtxForT[*gatewayclient.Clientset](ctx, t)
assert.NoError(t, err)
ctx = SetInCtxForT(ctx, t, gatewayClient)

route, err := gatewayClient.GatewayV1().HTTPRoutes(namespace).Get(ctx, additionalRouteName, metav1.GetOptions{})
assert.NoError(t, err)
route.Annotations[annotations.AnnotationPrefix+annotations.PluginsKey] = plugin.Name
_, err = gatewayClient.GatewayV1().HTTPRoutes(namespace).Update(ctx, route, metav1.UpdateOptions{})
assert.NoError(t, err)
k8sClient := GetClusterFromCtx(ctx).Client()
require.EventuallyWithT(t, func(t *assert.CollectT) {
events, err := k8sClient.EventsV1().Events(namespace).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("reason=%s", dataplane.KongConfigurationApplyFailedEventReason),
})
if !assert.NoError(t, err) {
return
}
contains := lo.ContainsBy(events.Items, func(e eventsv1.Event) bool {
return e.Regarding.Name == plugin.Name && e.Regarding.Kind == "KongPlugin"
})
assert.Truef(t, contains, "expected events to contain one for plugin %s, events: %v", plugin.Name, events.Items)
}, consts.IngressWait, consts.WaitTick)

return ctx
}).
Expand Down
16 changes: 7 additions & 9 deletions test/internal/helpers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ func MustHTTPRequest(t *testing.T, method string, host, path string, headers map
return req
}

// MustParseURL parses a string format URL to *url.URL. If error happens, fails the test.
func MustParseURL(t *testing.T, urlStr string) *url.URL {
u, err := url.Parse(urlStr)
require.NoErrorf(t, err, "Failed to parse URL %s: %v", urlStr, err)
return u
}

// -----------------------------------------------------------------------------
// Testing Utility Functions - Various HTTP related
// -----------------------------------------------------------------------------
Expand All @@ -92,9 +85,10 @@ func EventuallyGETPath(
path string,
statusCode int,
bodyContent string,
headers map[string]string,
requestHeaders map[string]string,
waitDuration time.Duration,
waitTick time.Duration,
responseMatchers ...ResponseMatcher,
) {
t.Helper()
var client *http.Client
Expand All @@ -105,7 +99,7 @@ func EventuallyGETPath(
}

require.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.Do(MustHTTPRequest(t, http.MethodGet, host, path, headers))
resp, err := client.Do(MustHTTPRequest(t, http.MethodGet, host, path, requestHeaders))
if !assert.NoError(c, err) {
return
}
Expand All @@ -127,6 +121,10 @@ func EventuallyGETPath(
return
}
assert.Contains(c, b.String(), bodyContent)
for _, matcher := range responseMatchers {
reason, ok := matcher(resp, b.String())
assert.Truef(t, ok, "response did not match %s", reason)
}
}, waitDuration, waitTick)
}

Expand Down
Loading