From a216475ca1000ad26659de09a64edff4090b8bc2 Mon Sep 17 00:00:00 2001 From: Clayton Gonsalves <101868649+clayton-gonsalves@users.noreply.github.com> Date: Tue, 24 Oct 2023 20:40:33 +0200 Subject: [PATCH] add support for endpoint slices (#5745) Adds support for EndpointSlices, gated by the feature gate "useEndpointSlices". Fixes #2696. Signed-off-by: Clayton Gonsalves --- .github/workflows/build_daily.yaml | 35 + apis/projectcontour/v1alpha1/contourconfig.go | 11 + .../v1alpha1/contourconfig_helpers.go | 23 + .../v1alpha1/contourconfig_helpers_test.go | 37 + .../v1alpha1/zz_generated.deepcopy.go | 24 + .../5745-clayton-gonsalves-minor.md | 6 + cmd/contour/serve.go | 40 +- cmd/contour/servecontext.go | 1 + examples/contour/01-crds.yaml | 16 + examples/contour/02-role-contour.yaml | 8 + examples/gateway-provisioner/01-roles.yaml | 8 + examples/render/contour-deployment.yaml | 24 + .../render/contour-gateway-provisioner.yaml | 24 + examples/render/contour-gateway.yaml | 24 + examples/render/contour.yaml | 24 + internal/k8s/rbac.go | 3 + .../objects/rbac/clusterrole/cluster_role.go | 4 + internal/xdscache/v3/contour_test.go | 17 + .../xdscache/v3/endpointslicetranslator.go | 465 +++++++ .../v3/endpointslicetranslator_test.go | 1198 +++++++++++++++++ internal/xdscache/v3/endpointstranslator.go | 32 +- internal/xdscache/v3/server_test.go | 38 + pkg/config/parameters.go | 11 + .../docs/main/config/api-reference.html | 46 + site/content/docs/main/configuration.md | 3 +- test/e2e/fixtures.go | 60 +- test/e2e/framework.go | 25 + test/e2e/infra/endpointslice_test.go | 129 ++ test/e2e/infra/infra_test.go | 18 + 29 files changed, 2326 insertions(+), 28 deletions(-) create mode 100644 changelogs/unreleased/5745-clayton-gonsalves-minor.md create mode 100644 internal/xdscache/v3/endpointslicetranslator.go create mode 100644 internal/xdscache/v3/endpointslicetranslator_test.go create mode 100644 test/e2e/infra/endpointslice_test.go diff --git a/.github/workflows/build_daily.yaml b/.github/workflows/build_daily.yaml index 021a3a0b10e..ab7ababfa6d 100644 --- a/.github/workflows/build_daily.yaml +++ b/.github/workflows/build_daily.yaml @@ -152,3 +152,38 @@ jobs: steps: ${{ toJson(steps) }} channel: '#contour-ci-notifications' if: ${{ failure() && github.ref == 'refs/heads/main' }} + e2e-endpoint-slices: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/cache@v3 + with: + # * Module download cache + # * Build cache (Linux) + path: | + ~/go/pkg/mod + ~/.cache/go-build + key: ${{ runner.os }}-${{ github.job }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-${{ github.job }}-go- + - uses: actions/setup-go@v4 + with: + go-version: ${{ env.GO_VERSION }} + cache: false + - name: add deps to path + run: | + ./hack/actions/install-kubernetes-toolchain.sh $GITHUB_WORKSPACE/bin + echo "$GITHUB_WORKSPACE/bin" >> $GITHUB_PATH + - name: e2e tests + env: + CONTOUR_E2E_IMAGE: ghcr.io/projectcontour/contour:main + CONTOUR_E2E_USE_ENDPOINT_SLICES: true + run: | + make setup-kind-cluster run-e2e cleanup-kind + - uses: act10ns/slack@v2 + with: + status: ${{ job.status }} + steps: ${{ toJson(steps) }} + channel: '#contour-ci-notifications' + if: ${{ failure() && github.ref == 'refs/heads/main' }} + diff --git a/apis/projectcontour/v1alpha1/contourconfig.go b/apis/projectcontour/v1alpha1/contourconfig.go index d231d19bbaa..015bcb8b6e2 100644 --- a/apis/projectcontour/v1alpha1/contourconfig.go +++ b/apis/projectcontour/v1alpha1/contourconfig.go @@ -84,8 +84,19 @@ type ContourConfigurationSpec struct { // Tracing defines properties for exporting trace data to OpenTelemetry. Tracing *TracingConfig `json:"tracing,omitempty"` + + // FeatureFlags defines toggle to enable new contour features. + // Available toggles are: + // useEndpointSlices - configures contour to fetch endpoint data + // from k8s endpoint slices. defaults to false and reading endpoint + // data from the k8s endpoints. + FeatureFlags FeatureFlags `json:"featureFlags,omitempty"` } +// FeatureFlags defines the set of feature flags +// to toggle new contour features. +type FeatureFlags []string + // XDSServerType is the type of xDS server implementation. type XDSServerType string diff --git a/apis/projectcontour/v1alpha1/contourconfig_helpers.go b/apis/projectcontour/v1alpha1/contourconfig_helpers.go index b2229f187ab..03e7be080f6 100644 --- a/apis/projectcontour/v1alpha1/contourconfig_helpers.go +++ b/apis/projectcontour/v1alpha1/contourconfig_helpers.go @@ -17,9 +17,18 @@ import ( "fmt" "strconv" + "golang.org/x/exp/slices" "k8s.io/apimachinery/pkg/util/sets" ) +const ( + featureFlagUseEndpointSlices string = "useEndpointSlices" +) + +var featureFlagsMap = map[string]bool{ + featureFlagUseEndpointSlices: true, +} + // Validate configuration that is not already covered by CRD validation. func (c *ContourConfigurationSpec) Validate() error { // Validation of root configuration fields. @@ -215,6 +224,20 @@ func (e *EnvoyTLS) SanitizedCipherSuites() []string { return validatedCiphers } +func (f FeatureFlags) Validate() error { + for _, featureFlag := range f { + if _, found := featureFlagsMap[featureFlag]; !found { + return fmt.Errorf("invalid contour configuration, unknown feature flag:%s", featureFlag) + } + } + + return nil +} + +func (f FeatureFlags) IsEndpointSliceEnabled() bool { + return slices.Contains(f, featureFlagUseEndpointSlices) +} + // Validate ensures that exactly one of ControllerName or GatewayRef are specified. func (g *GatewayConfig) Validate() error { if g == nil { diff --git a/apis/projectcontour/v1alpha1/contourconfig_helpers_test.go b/apis/projectcontour/v1alpha1/contourconfig_helpers_test.go index 5e230f8a87d..290256e4d6c 100644 --- a/apis/projectcontour/v1alpha1/contourconfig_helpers_test.go +++ b/apis/projectcontour/v1alpha1/contourconfig_helpers_test.go @@ -14,6 +14,7 @@ package v1alpha1_test import ( + "fmt" "testing" "github.com/projectcontour/contour/apis/projectcontour/v1alpha1" @@ -294,3 +295,39 @@ func TestAccessLogFormatExtensions(t *testing.T) { } assert.Empty(t, e3.AccessLogFormatterExtensions()) } + +func TestFeatureFlagsValidate(t *testing.T) { + tests := []struct { + name string + flags v1alpha1.FeatureFlags + expected error + }{ + { + name: "valid flag", + flags: v1alpha1.FeatureFlags{"useEndpointSlices"}, + expected: nil, + }, + { + name: "invalid flag", + flags: v1alpha1.FeatureFlags{"invalidFlag"}, + expected: fmt.Errorf("invalid contour configuration, unknown feature flag:invalidFlag"), + }, + { + name: "mix of valid and invalid flags", + flags: v1alpha1.FeatureFlags{"useEndpointSlices", "invalidFlag"}, + expected: fmt.Errorf("invalid contour configuration, unknown feature flag:invalidFlag"), + }, + { + name: "empty flags", + flags: v1alpha1.FeatureFlags{}, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.flags.Validate() + assert.Equal(t, tt.expected, err) + }) + } +} diff --git a/apis/projectcontour/v1alpha1/zz_generated.deepcopy.go b/apis/projectcontour/v1alpha1/zz_generated.deepcopy.go index ff4f84765b9..883fb38a893 100644 --- a/apis/projectcontour/v1alpha1/zz_generated.deepcopy.go +++ b/apis/projectcontour/v1alpha1/zz_generated.deepcopy.go @@ -199,6 +199,11 @@ func (in *ContourConfigurationSpec) DeepCopyInto(out *ContourConfigurationSpec) *out = new(TracingConfig) (*in).DeepCopyInto(*out) } + if in.FeatureFlags != nil { + in, out := &in.FeatureFlags, &out.FeatureFlags + *out = make(FeatureFlags, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContourConfigurationSpec. @@ -837,6 +842,25 @@ func (in *ExtensionServiceTarget) DeepCopy() *ExtensionServiceTarget { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in FeatureFlags) DeepCopyInto(out *FeatureFlags) { + { + in := &in + *out = make(FeatureFlags, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FeatureFlags. +func (in FeatureFlags) DeepCopy() FeatureFlags { + if in == nil { + return nil + } + out := new(FeatureFlags) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GatewayConfig) DeepCopyInto(out *GatewayConfig) { *out = *in diff --git a/changelogs/unreleased/5745-clayton-gonsalves-minor.md b/changelogs/unreleased/5745-clayton-gonsalves-minor.md new file mode 100644 index 00000000000..38cc1b50d73 --- /dev/null +++ b/changelogs/unreleased/5745-clayton-gonsalves-minor.md @@ -0,0 +1,6 @@ +## Add Kubernetes Endpoint Slice support + +This change optionally enables Contour to consume the kubernetes endpointslice API to determine the endpoints to configure Envoy with. +Note: This change is off by default and is gated by the feature flag `useEndpointSlices`. + +This feature will be enabled by default in a future version on Contour once it has had sufficient bake time in production environments. diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 85490f7caa4..8d59c721121 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -64,6 +64,7 @@ import ( "github.com/projectcontour/contour/internal/xdscache" xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3" "github.com/projectcontour/contour/pkg/config" + discoveryv1 "k8s.io/api/discovery/v1" ) const ( @@ -190,6 +191,12 @@ type Server struct { handlerCacheSyncs []cache.InformerSynced } +type EndpointsTranslator interface { + cache.ResourceEventHandler + xdscache.ResourceCache + SetObserver(observer contour.Observer) +} + // NewServer returns a Server object which contains the initial configuration // objects required to start an instance of Contour. func NewServer(log logrus.FieldLogger, ctx *serveContext) (*Server, error) { @@ -456,9 +463,13 @@ func (s *Server) doServe() error { contourMetrics := metrics.NewMetrics(s.registry) - // Endpoints updates are handled directly by the EndpointsTranslator - // due to their high update rate and their orthogonal nature. - endpointHandler := xdscache_v3.NewEndpointsTranslator(s.log.WithField("context", "endpointstranslator")) + // Endpoints updates are handled directly by the EndpointsTranslator/EndpointSliceTranslator due to the high update volume. + var endpointHandler EndpointsTranslator + if contourConfiguration.FeatureFlags.IsEndpointSliceEnabled() { + endpointHandler = xdscache_v3.NewEndpointSliceTranslator(s.log.WithField("context", "endpointslicetranslator")) + } else { + endpointHandler = xdscache_v3.NewEndpointsTranslator(s.log.WithField("context", "endpointstranslator")) + } resources := []xdscache.ResourceCache{ xdscache_v3.NewListenerCache(listenerConfig, *contourConfiguration.Envoy.Metrics, *contourConfiguration.Envoy.Health, *contourConfiguration.Envoy.Network.EnvoyAdminPort), @@ -475,7 +486,7 @@ func (s *Server) doServe() error { snapshotHandler := xdscache.NewSnapshotHandler(resources, s.log.WithField("context", "snapshotHandler")) // register observer for endpoints updates. - endpointHandler.Observer = contour.ComposeObservers(snapshotHandler) + endpointHandler.SetObserver(contour.ComposeObservers(snapshotHandler)) // Log that we're using the fallback certificate if configured. if contourConfiguration.HTTPProxy.FallbackCertificate != nil { @@ -608,12 +619,21 @@ func (s *Server) doServe() error { s.log.WithError(err).WithField("resource", "secrets").Fatal("failed to create informer") } - // Inform on endpoints. - if err := s.informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{ - Next: endpointHandler, - Counter: contourMetrics.EventHandlerOperations, - }); err != nil { - s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer") + // Inform on endpoints/endpointSlices. + if contourConfiguration.FeatureFlags.IsEndpointSliceEnabled() { + if err := s.informOnResource(&discoveryv1.EndpointSlice{}, &contour.EventRecorder{ + Next: endpointHandler, + Counter: contourMetrics.EventHandlerOperations, + }); err != nil { + s.log.WithError(err).WithField("resource", "endpointslices").Fatal("failed to create informer") + } + } else { + if err := s.informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{ + Next: endpointHandler, + Counter: contourMetrics.EventHandlerOperations, + }); err != nil { + s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer") + } } // Register our event handler with the manager. diff --git a/cmd/contour/servecontext.go b/cmd/contour/servecontext.go index 7d3f72312ad..7d14c2a94f4 100644 --- a/cmd/contour/servecontext.go +++ b/cmd/contour/servecontext.go @@ -590,6 +590,7 @@ func (ctx *serveContext) convertToContourConfigurationSpec() contour_api_v1alpha Policy: policy, Metrics: &contourMetrics, Tracing: tracingConfig, + FeatureFlags: ctx.Config.FeatureFlags, } xdsServerType := contour_api_v1alpha1.ContourServerType diff --git a/examples/contour/01-crds.yaml b/examples/contour/01-crds.yaml index 940a1b288bf..783ff7574f5 100644 --- a/examples/contour/01-crds.yaml +++ b/examples/contour/01-crds.yaml @@ -471,6 +471,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour features. + Available toggles are: useEndpointSlices - configures contour to + fetch endpoint data from k8s endpoint slices. defaults to false + and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -3945,6 +3953,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour + features. Available toggles are: useEndpointSlices - configures + contour to fetch endpoint data from k8s endpoint slices. defaults + to false and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. diff --git a/examples/contour/02-role-contour.yaml b/examples/contour/02-role-contour.yaml index 589f3244763..35e9386e719 100644 --- a/examples/contour/02-role-contour.yaml +++ b/examples/contour/02-role-contour.yaml @@ -18,6 +18,14 @@ rules: - get - list - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/gateway-provisioner/01-roles.yaml b/examples/gateway-provisioner/01-roles.yaml index 178405f701d..b43bba73e28 100644 --- a/examples/gateway-provisioner/01-roles.yaml +++ b/examples/gateway-provisioner/01-roles.yaml @@ -59,6 +59,14 @@ rules: - create - get - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/render/contour-deployment.yaml b/examples/render/contour-deployment.yaml index 52f38741ac6..2020135a384 100644 --- a/examples/render/contour-deployment.yaml +++ b/examples/render/contour-deployment.yaml @@ -690,6 +690,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour features. + Available toggles are: useEndpointSlices - configures contour to + fetch endpoint data from k8s endpoint slices. defaults to false + and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -4164,6 +4172,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour + features. Available toggles are: useEndpointSlices - configures + contour to fetch endpoint data from k8s endpoint slices. defaults + to false and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -8435,6 +8451,14 @@ rules: - get - list - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/render/contour-gateway-provisioner.yaml b/examples/render/contour-gateway-provisioner.yaml index 7bc48cb9ae8..4dc8215d1cb 100644 --- a/examples/render/contour-gateway-provisioner.yaml +++ b/examples/render/contour-gateway-provisioner.yaml @@ -482,6 +482,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour features. + Available toggles are: useEndpointSlices - configures contour to + fetch endpoint data from k8s endpoint slices. defaults to false + and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -3956,6 +3964,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour + features. Available toggles are: useEndpointSlices - configures + contour to fetch endpoint data from k8s endpoint slices. defaults + to false and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -18997,6 +19013,14 @@ rules: - create - get - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/render/contour-gateway.yaml b/examples/render/contour-gateway.yaml index 7c04958013c..089e75b887d 100644 --- a/examples/render/contour-gateway.yaml +++ b/examples/render/contour-gateway.yaml @@ -693,6 +693,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour features. + Available toggles are: useEndpointSlices - configures contour to + fetch endpoint data from k8s endpoint slices. defaults to false + and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -4167,6 +4175,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour + features. Available toggles are: useEndpointSlices - configures + contour to fetch endpoint data from k8s endpoint slices. defaults + to false and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -8438,6 +8454,14 @@ rules: - get - list - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/render/contour.yaml b/examples/render/contour.yaml index 53ca1b2b248..3f24568201b 100644 --- a/examples/render/contour.yaml +++ b/examples/render/contour.yaml @@ -690,6 +690,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour features. + Available toggles are: useEndpointSlices - configures contour to + fetch endpoint data from k8s endpoint slices. defaults to false + and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -4164,6 +4172,14 @@ spec: type: string type: object type: object + featureFlags: + description: 'FeatureFlags defines toggle to enable new contour + features. Available toggles are: useEndpointSlices - configures + contour to fetch endpoint data from k8s endpoint slices. defaults + to false and reading endpoint data from the k8s endpoints.' + items: + type: string + type: array gateway: description: Gateway contains parameters for the gateway-api Gateway that Contour is configured to serve traffic. @@ -8435,6 +8451,14 @@ rules: - get - list - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/internal/k8s/rbac.go b/internal/k8s/rbac.go index f61ead80180..2eccadecc89 100644 --- a/internal/k8s/rbac.go +++ b/internal/k8s/rbac.go @@ -27,3 +27,6 @@ package k8s // Add RBAC policy to support leader election. // +kubebuilder:rbac:groups="",resources=events,verbs=create;get;update,namespace=projectcontour // +kubebuilder:rbac:groups="coordination.k8s.io",resources=leases,verbs=create;get;update,namespace=projectcontour + +// Add RBAC policy for endpoint slices +// +kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=list;get;watch diff --git a/internal/provisioner/objects/rbac/clusterrole/cluster_role.go b/internal/provisioner/objects/rbac/clusterrole/cluster_role.go index 3eeb3457048..4641a2af794 100644 --- a/internal/provisioner/objects/rbac/clusterrole/cluster_role.go +++ b/internal/provisioner/objects/rbac/clusterrole/cluster_role.go @@ -22,6 +22,7 @@ import ( "github.com/projectcontour/contour/internal/provisioner/model" "github.com/projectcontour/contour/internal/provisioner/objects" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -75,6 +76,9 @@ func desiredClusterRole(name string, contour *model.Contour) *rbacv1.ClusterRole // Core Contour-watched resources. policyRuleFor(corev1.GroupName, getListWatch, "secrets", "endpoints", "services", "namespaces"), + // Discovery Contour-watched resources. + policyRuleFor(discoveryv1.GroupName, getListWatch, "endpointslices"), + // Gateway API resources. // Note, ReferenceGrant does not currently have a .status field so it's omitted from the status rule. policyRuleFor(gatewayv1alpha2.GroupName, getListWatch, "gatewayclasses", "gateways", "httproutes", "tlsroutes", "grpcroutes", "tcproutes", "referencegrants"), diff --git a/internal/xdscache/v3/contour_test.go b/internal/xdscache/v3/contour_test.go index 6619ff94fdf..60d37c6152f 100644 --- a/internal/xdscache/v3/contour_test.go +++ b/internal/xdscache/v3/contour_test.go @@ -15,6 +15,7 @@ package v3 import ( v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -35,3 +36,19 @@ func addresses(ips ...string) []v1.EndpointAddress { } return addrs } + +func endpointSlice(ns, name, service string, addressType discoveryv1.AddressType, endpoints []discoveryv1.Endpoint, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice { + return &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Labels: map[string]string{ + discoveryv1.LabelServiceName: service, + }, + }, + + AddressType: addressType, + Endpoints: endpoints, + Ports: ports, + } +} diff --git a/internal/xdscache/v3/endpointslicetranslator.go b/internal/xdscache/v3/endpointslicetranslator.go new file mode 100644 index 00000000000..f930fc42c00 --- /dev/null +++ b/internal/xdscache/v3/endpointslicetranslator.go @@ -0,0 +1,465 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3 + +import ( + "fmt" + "sort" + "sync" + + envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/projectcontour/contour/internal/contour" + "github.com/projectcontour/contour/internal/dag" + envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" + "github.com/projectcontour/contour/internal/k8s" + "github.com/projectcontour/contour/internal/protobuf" + "github.com/projectcontour/contour/internal/sorter" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" +) + +// RecalculateEndpoints generates a slice of LoadBalancingEndpoint +// resources by matching the given service port to the given discoveryv1.EndpointSlice. +// endpointSliceMap may be nil, in which case, the result is also nil. +func (c *EndpointSliceCache) RecalculateEndpoints(port, healthPort v1.ServicePort, endpointSliceMap map[string]*discoveryv1.EndpointSlice) []*LoadBalancingEndpoint { + var lb []*LoadBalancingEndpoint + uniqueEndpoints := make(map[string]struct{}, 0) + var healthCheckPort int32 + + for _, endpointSlice := range endpointSliceMap { + sort.Slice(endpointSlice.Endpoints, func(i, j int) bool { + return endpointSlice.Endpoints[i].Addresses[0] < endpointSlice.Endpoints[j].Addresses[0] + }) + + for _, endpoint := range endpointSlice.Endpoints { + // Skip if the endpointSlice is not marked as ready. + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + // Range over each port. We want the resultant endpoints to be a + // a cartesian product MxN where M are the endpoints and N are the ports. + for _, p := range endpointSlice.Ports { + // Nil check for the port. + if p.Port == nil { + continue + } + + if p.Protocol == nil || (healthPort.Protocol != *p.Protocol || port.Protocol != *p.Protocol) && *p.Protocol != v1.ProtocolTCP { + continue + } + + // Set healthCheckPort only when port and healthPort are different. + if p.Name != nil && (healthPort.Name != "" && healthPort.Name == *p.Name && port.Name != healthPort.Name) { + healthCheckPort = *p.Port + } + + // Match by port name. + if port.Name != "" && p.Name != nil && port.Name != *p.Name { + continue + } + + // we can safely take the first element here. + // Refer k8s API description: + // The contents of this field are interpreted according to + // the corresponding EndpointSlice addressType field. + // Consumers must handle different types of addresses in the context + // of their own capabilities. This must contain at least one + // address but no more than 100. These are all assumed to be fungible + // and clients may choose to only use the first element. + // Refer to: https://issue.k8s.io/106267 + addr := envoy_v3.SocketAddress(endpoint.Addresses[0], int(*p.Port)) + + // as per note on https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/ + // Clients of the EndpointSlice API must iterate through all the existing EndpointSlices associated to + // a Service and build a complete list of unique network endpoints. It is important to mention that + // endpoints may be duplicated in different EndpointSlices. + // Hence, we need to ensure that the endpoints we add to []*LoadBalancingEndpoint aren't duplicated. + endpointKey := fmt.Sprintf("%s:%d", endpoint.Addresses[0], *p.Port) + if _, exists := uniqueEndpoints[endpointKey]; !exists { + lb = append(lb, envoy_v3.LBEndpoint(addr)) + uniqueEndpoints[endpointKey] = struct{}{} + } + } + } + } + + if healthCheckPort > 0 { + for _, lbEndpoint := range lb { + lbEndpoint.GetEndpoint().HealthCheckConfig = envoy_v3.HealthCheckConfig(healthCheckPort) + } + } + + return lb +} + +// EndpointSliceCache is a cache of EndpointSlice and ServiceCluster objects. +type EndpointSliceCache struct { + mu sync.Mutex // Protects all fields. + + // Slice of stale clusters. A stale cluster is one that + // needs to be recalculated. Clusters can be added to the stale + // slice due to changes in EndpointSlices or due to a DAG rebuild. + stale []*dag.ServiceCluster + + // Index of ServiceClusters. ServiceClusters are indexed + // by the name of their Kubernetes Services. This makes it + // easy to determine which Endpoints affect which ServiceCluster. + services map[types.NamespacedName][]*dag.ServiceCluster + + // Cache of endpointsSlices, indexed by Namespaced name of the associated service. + // the Inner map is a map[k,v] where k is the endpoint slice name and v is the + // endpoint slice itself. + endpointSlices map[types.NamespacedName]map[string]*discoveryv1.EndpointSlice +} + +// Recalculate regenerates all the ClusterLoadAssignments from the +// cached EndpointSlices and stale ServiceClusters. A ClusterLoadAssignment +// will be generated for every stale ServerCluster, however, if there +// are no endpointSlices for the Services in the ServiceCluster, the +// ClusterLoadAssignment will be empty. +func (c *EndpointSliceCache) Recalculate() map[string]*envoy_endpoint_v3.ClusterLoadAssignment { + c.mu.Lock() + defer c.mu.Unlock() + + assignments := map[string]*envoy_endpoint_v3.ClusterLoadAssignment{} + for _, cluster := range c.stale { + // Clusters can be in the stale list multiple times; + // skip to avoid duplicate recalculations. + if _, ok := assignments[cluster.ClusterName]; ok { + continue + } + + cla := envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: cluster.ClusterName, + Endpoints: nil, + Policy: nil, + } + + // Look up each service, and if we have endpointSlice for that service, + // attach them as a new LocalityEndpoints resource. + for _, w := range cluster.Services { + n := types.NamespacedName{Namespace: w.ServiceNamespace, Name: w.ServiceName} + if lb := c.RecalculateEndpoints(w.ServicePort, w.HealthPort, c.endpointSlices[n]); lb != nil { + // Append the new set of endpoints. Users are allowed to set the load + // balancing weight to 0, which we reflect to Envoy as nil in order to + // assign no load to that locality. + cla.Endpoints = append( + cla.Endpoints, + &LocalityEndpoints{ + LbEndpoints: lb, + LoadBalancingWeight: protobuf.UInt32OrNil(w.Weight), + }, + ) + } + } + + assignments[cla.ClusterName] = &cla + } + + c.stale = nil + return assignments +} + +// SetClusters replaces the cache of ServiceCluster resources. All +// the added clusters will be marked stale. +func (c *EndpointSliceCache) SetClusters(clusters []*dag.ServiceCluster) error { + c.mu.Lock() + defer c.mu.Unlock() + + // Keep a local index to start with so that errors don't cause + // partial failure. + serviceIndex := map[types.NamespacedName][]*dag.ServiceCluster{} + + // Reindex the cluster so that we can find them by service name. + for _, cluster := range clusters { + if err := cluster.Validate(); err != nil { + return fmt.Errorf("invalid ServiceCluster %q: %w", cluster.ClusterName, err) + } + + // Make sure service clusters with default weights are balanced. + cluster.Rebalance() + + for _, s := range cluster.Services { + name := types.NamespacedName{ + Namespace: s.ServiceNamespace, + Name: s.ServiceName, + } + + // Create the slice entry if we have not indexed this service yet. + entry := serviceIndex[name] + if entry == nil { + entry = []*dag.ServiceCluster{} + } + + serviceIndex[name] = append(entry, cluster) + } + } + + c.stale = clusters + c.services = serviceIndex + + return nil +} + +// UpdateEndpointSlice adds endpointSlice to the cache, or replaces it if it is +// already cached. Any ServiceClusters that are backed by a Service +// that endpointSlice belongs become stale. Returns a boolean indicating whether +// any ServiceClusters use endpointSlice or not. +func (c *EndpointSliceCache) UpdateEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) bool { + c.mu.Lock() + defer c.mu.Unlock() + + name := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: endpointSlice.Labels[discoveryv1.LabelServiceName]} + + if c.endpointSlices[name] == nil { + c.endpointSlices[name] = make(map[string]*discoveryv1.EndpointSlice) + } + c.endpointSlices[name][endpointSlice.Name] = endpointSlice.DeepCopy() + + // If any service clusters include this endpointSlice, mark them + // all as stale. + if affected := c.services[name]; len(affected) > 0 { + c.stale = append(c.stale, affected...) + return true + } + + return false +} + +// DeleteEndpointSlice deletes endpointSlice from the cache. Any ServiceClusters +// that are backed by a Service that endpointSlice belongs to, become stale. Returns +// a boolean indicating whether any ServiceClusters use endpointSlice or not. +func (c *EndpointSliceCache) DeleteEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) bool { + c.mu.Lock() + defer c.mu.Unlock() + + name := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: endpointSlice.Labels[discoveryv1.LabelServiceName]} + delete(c.endpointSlices[name], endpointSlice.Name) + + // If any service clusters include this endpointSlice, mark them + // all as stale. + if affected := c.services[name]; len(affected) > 0 { + c.stale = append(c.stale, affected...) + return true + } + + return false +} + +// NewEndpointSliceTranslator allocates a new endpointsSlice translator. +func NewEndpointSliceTranslator(log logrus.FieldLogger) *EndpointSliceTranslator { + return &EndpointSliceTranslator{ + Cond: contour.Cond{}, + FieldLogger: log, + entries: map[string]*envoy_endpoint_v3.ClusterLoadAssignment{}, + cache: EndpointSliceCache{ + stale: nil, + services: map[types.NamespacedName][]*dag.ServiceCluster{}, + endpointSlices: map[types.NamespacedName]map[string]*discoveryv1.EndpointSlice{}, + }, + } +} + +// A EndpointsSliceTranslator translates Kubernetes EndpointSlice objects into Envoy +// ClusterLoadAssignment resources. +type EndpointSliceTranslator struct { + // Observer notifies when the endpointSlice cache has been updated. + Observer contour.Observer + + contour.Cond + logrus.FieldLogger + + cache EndpointSliceCache + + mu sync.Mutex // Protects entries. + entries map[string]*envoy_endpoint_v3.ClusterLoadAssignment +} + +// Merge combines the given entries with the existing entries in the +// EndpointSliceTranslator. If the same key exists in both maps, an existing entry +// is replaced. +func (e *EndpointSliceTranslator) Merge(entries map[string]*envoy_endpoint_v3.ClusterLoadAssignment) { + e.mu.Lock() + defer e.mu.Unlock() + + for k, v := range entries { + e.entries[k] = v + } +} + +// OnChange observes DAG rebuild events. +func (e *EndpointSliceTranslator) OnChange(root *dag.DAG) { + clusters := []*dag.ServiceCluster{} + names := map[string]bool{} + + for _, svc := range root.GetServiceClusters() { + if err := svc.Validate(); err != nil { + e.WithError(err).Errorf("dropping invalid service cluster %q", svc.ClusterName) + } else if _, ok := names[svc.ClusterName]; ok { + e.Debugf("dropping service cluster with duplicate name %q", svc.ClusterName) + } else { + e.Debugf("added ServiceCluster %q from DAG", svc.ClusterName) + clusters = append(clusters, svc.DeepCopy()) + names[svc.ClusterName] = true + } + } + + // Update the cache with the new clusters. + if err := e.cache.SetClusters(clusters); err != nil { + e.WithError(err).Error("failed to cache service clusters") + } + + // After rebuilding the DAG, the service cluster could be + // completely different. Some could be added, and some could + // be removed. Since we reset the cluster cache above, all + // the load assignments will be recalculated and we can just + // set the entries rather than merging them. + entries := e.cache.Recalculate() + + // Only update and notify if entries has changed. + changed := false + + e.mu.Lock() + if !equal(e.entries, entries) { + e.entries = entries + changed = true + } + e.mu.Unlock() + + if changed { + e.Debug("cluster load assignments changed, notifying waiters") + e.Notify() + } else { + e.Debug("cluster load assignments did not change") + } +} + +func (e *EndpointSliceTranslator) OnAdd(obj any, isInInitialList bool) { + switch obj := obj.(type) { + case *discoveryv1.EndpointSlice: + if !e.cache.UpdateEndpointSlice(obj) { + return + } + + e.WithField("endpointSlice", k8s.NamespacedNameOf(obj)).Debug("EndpointSlice is in use by a ServiceCluster, recalculating ClusterLoadAssignments") + e.Merge(e.cache.Recalculate()) + e.Notify() + if e.Observer != nil { + e.Observer.Refresh() + } + default: + e.Errorf("OnAdd unexpected type %T: %#v", obj, obj) + } +} + +func (e *EndpointSliceTranslator) OnUpdate(oldObj, newObj any) { + switch newObj := newObj.(type) { + case *discoveryv1.EndpointSlice: + oldObj, ok := oldObj.(*discoveryv1.EndpointSlice) + if !ok { + e.Errorf("OnUpdate endpointSlice %#v received invalid oldObj %T; %#v", newObj, oldObj, oldObj) + return + } + + // Skip computation if either old and new services or + // endpointSlice are equal (thus also handling nil). + if oldObj == newObj { + return + } + + // If there are no endpointSlice in this object, and the old + // object also had zero endpointSlice, ignore this update + // to avoid sending a noop notification to watchers. + if len(oldObj.Endpoints) == 0 && len(newObj.Endpoints) == 0 { + return + } + + if !e.cache.UpdateEndpointSlice(newObj) { + return + } + + e.WithField("endpointSlice", k8s.NamespacedNameOf(newObj)).Debug("EndpointSlice is in use by a ServiceCluster, recalculating ClusterLoadAssignments") + e.Merge(e.cache.Recalculate()) + e.Notify() + if e.Observer != nil { + e.Observer.Refresh() + } + default: + e.Errorf("OnUpdate unexpected type %T: %#v", newObj, newObj) + } +} + +func (e *EndpointSliceTranslator) OnDelete(obj any) { + switch obj := obj.(type) { + case *discoveryv1.EndpointSlice: + if !e.cache.DeleteEndpointSlice(obj) { + return + } + + e.WithField("endpointSlice", k8s.NamespacedNameOf(obj)).Debug("EndpointSlice was in use by a ServiceCluster, recalculating ClusterLoadAssignments") + e.Merge(e.cache.Recalculate()) + e.Notify() + if e.Observer != nil { + e.Observer.Refresh() + } + case cache.DeletedFinalStateUnknown: + e.OnDelete(obj.Obj) // recurse into ourselves with the tombstoned value + default: + e.Errorf("OnDelete unexpected type %T: %#v", obj, obj) + } +} + +// Contents returns a copy of the contents of the cache. +func (e *EndpointSliceTranslator) Contents() []proto.Message { + e.mu.Lock() + defer e.mu.Unlock() + + values := make([]*envoy_endpoint_v3.ClusterLoadAssignment, 0, len(e.entries)) + for _, v := range e.entries { + values = append(values, v) + } + + sort.Stable(sorter.For(values)) + return protobuf.AsMessages(values) +} + +func (e *EndpointSliceTranslator) Query(names []string) []proto.Message { + e.mu.Lock() + defer e.mu.Unlock() + + values := make([]*envoy_endpoint_v3.ClusterLoadAssignment, 0, len(names)) + for _, n := range names { + v, ok := e.entries[n] + if !ok { + e.Debugf("no cache entry for %q", n) + v = &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: n, + } + } + values = append(values, v) + } + + sort.Stable(sorter.For(values)) + return protobuf.AsMessages(values) +} + +func (*EndpointSliceTranslator) TypeURL() string { return resource.EndpointType } + +func (e *EndpointSliceTranslator) SetObserver(observer contour.Observer) { e.Observer = observer } diff --git a/internal/xdscache/v3/endpointslicetranslator_test.go b/internal/xdscache/v3/endpointslicetranslator_test.go new file mode 100644 index 00000000000..8eabc16ccc5 --- /dev/null +++ b/internal/xdscache/v3/endpointslicetranslator_test.go @@ -0,0 +1,1198 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3 + +import ( + "testing" + + envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + "github.com/projectcontour/contour/internal/dag" + envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" + "github.com/projectcontour/contour/internal/fixture" + "github.com/projectcontour/contour/internal/protobuf" + "github.com/projectcontour/contour/internal/ref" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" +) + +func TestEndpointSliceTranslatorContents(t *testing.T) { + tests := map[string]struct { + contents map[string]*envoy_endpoint_v3.ClusterLoadAssignment + want []proto.Message + }{ + "empty": { + contents: nil, + want: nil, + }, + "simple": { + contents: clusterloadassignments( + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + ), + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + endpointSliceTranslator.entries = tc.contents + got := endpointSliceTranslator.Contents() + protobuf.ExpectEqual(t, tc.want, got) + }) + } +} + +func TestEndpointSliceCacheQuery(t *testing.T) { + tests := map[string]struct { + contents map[string]*envoy_endpoint_v3.ClusterLoadAssignment + query []string + want []proto.Message + }{ + "exact match": { + contents: clusterloadassignments( + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + ), + query: []string{"default/httpbin-org"}, + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + }, + }, + "partial match": { + contents: clusterloadassignments( + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + ), + query: []string{"default/kuard/8080", "default/httpbin-org"}, + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + envoy_v3.ClusterLoadAssignment("default/kuard/8080"), + }, + }, + "no match": { + contents: clusterloadassignments( + envoy_v3.ClusterLoadAssignment("default/httpbin-org", + envoy_v3.SocketAddress("10.10.10.10", 80), + ), + ), + query: []string{"default/kuard/8080"}, + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/kuard/8080"), + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + endpointSliceTranslator.entries = tc.contents + got := endpointSliceTranslator.Query(tc.query) + protobuf.ExpectEqual(t, tc.want, got) + }) + } +} + +func TestEndpointSliceTranslatorAddEndpoints(t *testing.T) { + clusters := []*dag.ServiceCluster{ + { + ClusterName: "default/httpbin-org/a", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "httpbin-org", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{Name: "a"}, + }, + }, + }, + { + ClusterName: "default/httpbin-org/b", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "httpbin-org", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{Name: "b"}, + }, + }, + }, + { + ClusterName: "default/simple", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "simple", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + }, + }, + { + ClusterName: "default/healthcheck-port", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "healthcheck-port", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{Name: "a"}, + HealthPort: v1.ServicePort{Name: "health", Port: 8998}, + }, + }, + }, + } + + tests := map[string]struct { + endpointSlice *discoveryv1.EndpointSlice + want []proto.Message + wantUpdate bool + }{ + "simple": { + endpointSlice: endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/healthcheck-port"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/a"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/b"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)), + }, + }, + wantUpdate: true, + }, + "adding an endpoint slice not used by a cluster should not trigger a calculation": { + endpointSlice: endpointSlice("default", "not-used-eps-sdf8s", "not-used-endpoint", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: nil, + wantUpdate: false, + }, + "single slice, multiple addresses": { + endpointSlice: endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{ + "50.17.206.192", + }, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/healthcheck-port"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/a"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/b"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("50.17.206.192", 80), + ), + }, + }, + wantUpdate: true, + }, + "multiple slices": { + endpointSlice: endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{ + "50.17.206.192", + }, + }, + { + Addresses: []string{ + "23.23.247.89", + }, + }, + { + Addresses: []string{ + "50.17.192.147", + }, + }, + { + Addresses: []string{ + "50.19.99.160", + }, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/healthcheck-port"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/a"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/b"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("23.23.247.89", 80), // addresses should be sorted + envoy_v3.SocketAddress("50.17.192.147", 80), + envoy_v3.SocketAddress("50.17.206.192", 80), + envoy_v3.SocketAddress("50.19.99.160", 80), + ), + }, + }, + wantUpdate: true, + }, + "multiple ports": { + endpointSlice: endpointSlice("default", "httpbin-org-s9d8f", "httpbin-org", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{ + "10.10.1.1", + }, + }, + }, []discoveryv1.EndpointPort{ + { + Name: ref.To[string]("a"), + Port: ref.To[int32](8675), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + { + Name: ref.To[string]("b"), + Port: ref.To[int32](309), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/healthcheck-port"}, + // Results should be sorted by cluster name. + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org/a", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("10.10.1.1", 8675)), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org/b", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("10.10.1.1", 309)), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/simple"}, + }, + wantUpdate: true, + }, + "cartesian product": { + endpointSlice: endpointSlice("default", "httpbin-org-s9d8f", "httpbin-org", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{ + "10.10.1.1", + }, + }, + { + Addresses: []string{ + "10.10.2.2", + }, + }, + }, []discoveryv1.EndpointPort{ + { + Name: ref.To[string]("a"), + Port: ref.To[int32](8675), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + { + Name: ref.To[string]("b"), + Port: ref.To[int32](309), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/healthcheck-port"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org/a", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("10.10.1.1", 8675), // addresses should be sorted + envoy_v3.SocketAddress("10.10.2.2", 8675), + ), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org/b", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("10.10.1.1", 309), + envoy_v3.SocketAddress("10.10.2.2", 309), + ), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/simple"}, + }, + wantUpdate: true, + }, + "not ready": { + endpointSlice: endpointSlice("default", "httpbin-org-s9d8f", "httpbin-org", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{ + "10.10.1.1", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: ref.To[bool](false), + }, + }, + { + Addresses: []string{ + "10.10.2.2", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: ref.To[bool](true), + }, + }, + }, []discoveryv1.EndpointPort{ + { + Name: ref.To[string]("a"), + Port: ref.To[int32](8675), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + { + Name: ref.To[string]("b"), + Port: ref.To[int32](309), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/healthcheck-port"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org/a", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("10.10.2.2", 8675), + ), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org/b", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("10.10.2.2", 309), + ), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/simple"}, + }, + wantUpdate: true, + }, + "health port": { + endpointSlice: endpointSlice("default", "healthcheck-port-s9d8f", "healthcheck-port", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{ + "10.10.1.1", + }, + }, + }, []discoveryv1.EndpointPort{ + { + Name: ref.To[string]("health"), + Port: ref.To[int32](8998), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + { + Name: ref.To[string]("a"), + Port: ref.To[int32](309), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/healthcheck-port", + Endpoints: weightedHealthcheckEndpoints(1, 8998, + envoy_v3.SocketAddress("10.10.1.1", 309), + ), + }, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/a"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/httpbin-org/b"}, + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/simple"}, + }, + wantUpdate: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + observer := &simpleObserver{} + endpointSliceTranslator.Observer = observer + + require.NoError(t, endpointSliceTranslator.cache.SetClusters(clusters)) + endpointSliceTranslator.OnAdd(tc.endpointSlice, false) + got := endpointSliceTranslator.Contents() + protobuf.ExpectEqual(t, tc.want, got) + require.Equal(t, tc.wantUpdate, observer.updated) + }) + } +} + +func TestEndpointSliceTranslatorRemoveEndpoints(t *testing.T) { + clusters := []*dag.ServiceCluster{ + { + ClusterName: "default/simple", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "simple", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + }, + }, + { + ClusterName: "super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/http", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "what-a-descriptive-service-name-you-must-be-so-proud", + ServiceNamespace: "super-long-namespace-name-oh-boy", + ServicePort: v1.ServicePort{Name: "http"}, + }, + }, + }, + { + ClusterName: "super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/https", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "what-a-descriptive-service-name-you-must-be-so-proud", + ServiceNamespace: "super-long-namespace-name-oh-boy", + ServicePort: v1.ServicePort{Name: "https"}, + }, + }, + }, + } + + tests := map[string]struct { + setup func(*EndpointSliceTranslator) + endpointSlice *discoveryv1.EndpointSlice + want []proto.Message + wantUpdate bool + }{ + "remove existing": { + setup: func(endpointSliceTranslator *EndpointSliceTranslator) { + endpointSliceTranslator.OnAdd(endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), false) + }, + endpointSlice: endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/simple"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/http"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/https"), + }, + wantUpdate: true, + }, + "removing an Endpoints not used by a ServiceCluster should not trigger a recalculation": { + setup: func(endpointSliceTranslator *EndpointSliceTranslator) { + endpointSliceTranslator.OnAdd(endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), false) + }, + endpointSlice: endpointSlice("default", "different-fs9du", "different", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)), + }, + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/http"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/https"), + }, + wantUpdate: false, + }, + "remove non existent": { + setup: func(*EndpointSliceTranslator) {}, + endpointSlice: endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/simple"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/http"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/https"), + }, + wantUpdate: true, + }, + "remove long name": { + setup: func(endpointSliceTranslator *EndpointSliceTranslator) { + e1 := endpointSlice( + "super-long-namespace-name-oh-boy", + "what-a-descriptive-service-name-you-must-be-so-proud-9d8f8", + "what-a-descriptive-service-name-you-must-be-so-proud", + discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"172.16.0.2"}, + }, + { + Addresses: []string{"172.16.0.1"}, + }, + }, []discoveryv1.EndpointPort{ + { + Name: ref.To[string]("http"), + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + { + Name: ref.To[string]("https"), + Port: ref.To[int32](8443), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ) + endpointSliceTranslator.OnAdd(e1, false) + }, + endpointSlice: endpointSlice( + "super-long-namespace-name-oh-boy", + "what-a-descriptive-service-name-you-must-be-so-proud-9d8f8", + "what-a-descriptive-service-name-you-must-be-so-proud", + discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"172.16.0.2"}, + }, + { + Addresses: []string{"172.16.0.1"}, + }, + }, []discoveryv1.EndpointPort{ + { + Name: ref.To[string]("http"), + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + { + Name: ref.To[string]("https"), + Port: ref.To[int32](8443), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + envoy_v3.ClusterLoadAssignment("default/simple"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/http"), + envoy_v3.ClusterLoadAssignment("super-long-namespace-name-oh-boy/what-a-descriptive-service-name-you-must-be-so-proud/https"), + }, + wantUpdate: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + require.NoError(t, endpointSliceTranslator.cache.SetClusters(clusters)) + tc.setup(endpointSliceTranslator) + + // add the dummy observer after setting things up + // so we only get notified if the deletion triggers + // changes, not if the setup additions trigger changes. + observer := &simpleObserver{} + endpointSliceTranslator.Observer = observer + + endpointSliceTranslator.OnDelete(tc.endpointSlice) + got := endpointSliceTranslator.Contents() + protobuf.ExpectEqual(t, tc.want, got) + require.Equal(t, tc.wantUpdate, observer.updated) + }) + } +} + +func TestEndpointSliceTranslatorUpdateEndpoints(t *testing.T) { + clusters := []*dag.ServiceCluster{ + { + ClusterName: "default/simple", + Services: []dag.WeightedService{ + { + Weight: 1, + ServiceName: "simple", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + }, + }, + } + + tests := map[string]struct { + setup func(*EndpointSliceTranslator) + old, new *discoveryv1.EndpointSlice + want []proto.Message + wantUpdate bool + }{ + "update existing": { + setup: func(endpointSliceTranslator *EndpointSliceTranslator) { + endpointSliceTranslator.OnAdd(endpointSlice("default", "simple-sdf8s", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), false) + }, + old: endpointSlice("default", "simple-sdf8s", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + new: endpointSlice("default", "simple-sdf8s", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.25"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8081), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.25", 8081)), + }, + }, + wantUpdate: true, + }, + "getting an update for an Endpoints not used by a ServiceCluster should not trigger a recalculation": { + setup: func(endpointSliceTranslator *EndpointSliceTranslator) { + endpointSliceTranslator.OnAdd(endpointSlice("default", "simple-sdf8s", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), false) + }, + old: endpointSlice("default", "different-eps-fs9du", "different", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + new: endpointSlice("default", "different-eps-fs9du", "different", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.25"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8081), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)), + }, + }, + wantUpdate: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + require.NoError(t, endpointSliceTranslator.cache.SetClusters(clusters)) + tc.setup(endpointSliceTranslator) + + // add the dummy observer after setting things up + // so we only get notified if the update triggers + // changes, not if the setup additions trigger changes. + observer := &simpleObserver{} + endpointSliceTranslator.Observer = observer + + endpointSliceTranslator.OnUpdate(tc.old, tc.new) + got := endpointSliceTranslator.Contents() + protobuf.ExpectEqual(t, tc.want, got) + require.Equal(t, tc.wantUpdate, observer.updated) + }) + } +} + +func TestEndpointSliceTranslatorRecomputeClusterLoadAssignment(t *testing.T) { + tests := map[string]struct { + cluster dag.ServiceCluster + endpointSlice *discoveryv1.EndpointSlice + want []proto.Message + }{ + "simple": { + cluster: dag.ServiceCluster{ + ClusterName: "default/simple", + Services: []dag.WeightedService{{ + Weight: 1, + ServiceName: "simple", + ServiceNamespace: "default", + }}, + }, + endpointSlice: endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("192.168.183.24", 8080)), + }, + }, + }, + "multiple addresses": { + cluster: dag.ServiceCluster{ + ClusterName: "default/httpbin-org", + Services: []dag.WeightedService{{ + Weight: 1, + ServiceName: "httpbin-org", + ServiceNamespace: "default", + }}, + }, + endpointSlice: endpointSlice("default", "httpbin-org-fs9du", "httpbin-org", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"50.17.192.147"}, + }, + { + Addresses: []string{"23.23.247.89"}, + }, + { + Addresses: []string{"50.17.206.192"}, + }, + { + Addresses: []string{"50.19.99.160"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("23.23.247.89", 80), + envoy_v3.SocketAddress("50.17.192.147", 80), + envoy_v3.SocketAddress("50.17.206.192", 80), + envoy_v3.SocketAddress("50.19.99.160", 80), + ), + }, + }, + }, + "named container port": { + cluster: dag.ServiceCluster{ + ClusterName: "default/secure/https", + Services: []dag.WeightedService{{ + Weight: 1, + ServiceName: "secure", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{Name: "https"}, + }}, + }, + endpointSlice: endpointSlice("default", "secure-fs9du", "secure", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8443), + Protocol: ref.To[v1.Protocol]("TCP"), + Name: ref.To[string]("https"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/secure/https", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("192.168.183.24", 8443)), + }, + }, + }, + "multiple addresses and healthcheck port": { + cluster: dag.ServiceCluster{ + ClusterName: "default/httpbin-org", + Services: []dag.WeightedService{{ + Weight: 1, + ServiceName: "httpbin-org", + ServiceNamespace: "default", + HealthPort: v1.ServicePort{Name: "health", Port: 8998}, + ServicePort: v1.ServicePort{Name: "a", Port: 80}, + }}, + }, + endpointSlice: endpointSlice("default", "httpbin-org-fs9du", "httpbin-org", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"50.17.192.147"}, + }, + { + Addresses: []string{"23.23.247.89"}, + }, + { + Addresses: []string{"50.17.206.192"}, + }, + { + Addresses: []string{"50.19.99.160"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + Protocol: ref.To[v1.Protocol]("TCP"), + Name: ref.To[string]("a"), + }, + { + Port: ref.To[int32](8998), + Protocol: ref.To[v1.Protocol]("TCP"), + Name: ref.To[string]("health"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org", + Endpoints: weightedHealthcheckEndpoints(1, 8998, + envoy_v3.SocketAddress("23.23.247.89", 80), + envoy_v3.SocketAddress("50.17.192.147", 80), + envoy_v3.SocketAddress("50.17.206.192", 80), + envoy_v3.SocketAddress("50.19.99.160", 80), + ), + }, + }, + }, + "health port is the same as service port": { + cluster: dag.ServiceCluster{ + ClusterName: "default/httpbin-org", + Services: []dag.WeightedService{{ + Weight: 1, + ServiceName: "httpbin-org", + ServiceNamespace: "default", + HealthPort: v1.ServicePort{Name: "a", Port: 80}, + ServicePort: v1.ServicePort{Name: "a", Port: 80}, + }}, + }, + endpointSlice: endpointSlice("default", "httpbin-org-fs9du", "httpbin-org", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"50.17.192.147"}, + }, + { + Addresses: []string{"23.23.247.89"}, + }, + { + Addresses: []string{"50.17.206.192"}, + }, + { + Addresses: []string{"50.19.99.160"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + Protocol: ref.To[v1.Protocol]("TCP"), + Name: ref.To[string]("a"), + }, + { + Port: ref.To[int32](8998), + Protocol: ref.To[v1.Protocol]("TCP"), + Name: ref.To[string]("health"), + }, + }, + ), + want: []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/httpbin-org", + Endpoints: envoy_v3.WeightedEndpoints(1, + envoy_v3.SocketAddress("23.23.247.89", 80), + envoy_v3.SocketAddress("50.17.192.147", 80), + envoy_v3.SocketAddress("50.17.206.192", 80), + envoy_v3.SocketAddress("50.19.99.160", 80), + ), + }, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + require.NoError(t, endpointSliceTranslator.cache.SetClusters([]*dag.ServiceCluster{&tc.cluster})) + endpointSliceTranslator.OnAdd(tc.endpointSlice, false) + got := endpointSliceTranslator.Contents() + protobuf.ExpectEqual(t, tc.want, got) + }) + } +} + +func TestEndpointSliceTranslatorScaleToZeroEndpoints(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + + require.NoError(t, endpointSliceTranslator.cache.SetClusters([]*dag.ServiceCluster{ + { + ClusterName: "default/simple", + Services: []dag.WeightedService{{ + Weight: 1, + ServiceName: "simple", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }}, + }, + })) + + e1 := endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + }, []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + }, + ) + endpointSliceTranslator.OnAdd(e1, false) + + // Assert endpoint was added + want := []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/simple", + Endpoints: envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)), + }, + } + + protobuf.RequireEqual(t, want, endpointSliceTranslator.Contents()) + + // e2 is the same as e1, but without endpoint subsets + e2 := endpointSlice("default", "simple-eps-fs9du", "simple", discoveryv1.AddressTypeIPv4, nil, nil) + endpointSliceTranslator.OnUpdate(e1, e2) + + // Assert endpoints are removed + want = []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ClusterName: "default/simple"}, + } + + protobuf.RequireEqual(t, want, endpointSliceTranslator.Contents()) +} + +func TestEndpointSliceTranslatorWeightedService(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + clusters := []*dag.ServiceCluster{ + { + ClusterName: "default/weighted", + Services: []dag.WeightedService{ + { + Weight: 0, + ServiceName: "weight0", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + { + Weight: 1, + ServiceName: "weight1", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + { + Weight: 2, + ServiceName: "weight2", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + }, + }, + } + + require.NoError(t, endpointSliceTranslator.cache.SetClusters(clusters)) + + endpoints := []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + } + + ports := []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + } + + endpointSliceTranslator.OnAdd(endpointSlice("default", "weight0-eps-fs23r", "weight0", discoveryv1.AddressTypeIPv4, endpoints, ports), false) + endpointSliceTranslator.OnAdd(endpointSlice("default", "weight0-eps-sdf9f", "weight1", discoveryv1.AddressTypeIPv4, endpoints, ports), false) + endpointSliceTranslator.OnAdd(endpointSlice("default", "weight0-eps-v9drg", "weight2", discoveryv1.AddressTypeIPv4, endpoints, ports), false) + + // Each helper builds a `LocalityLbEndpoints` with one + // entry, so we can compose the final result by reaching + // in an taking the first element of each slice. + w0 := envoy_v3.Endpoints(envoy_v3.SocketAddress("192.168.183.24", 8080)) + w1 := envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)) + w2 := envoy_v3.WeightedEndpoints(2, envoy_v3.SocketAddress("192.168.183.24", 8080)) + + want := []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/weighted", + Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{ + w0[0], w1[0], w2[0], + }, + }, + } + + protobuf.ExpectEqual(t, want, endpointSliceTranslator.Contents()) +} + +func TestEndpointSliceTranslatorDefaultWeightedService(t *testing.T) { + endpointSliceTranslator := NewEndpointSliceTranslator(fixture.NewTestLogger(t)) + clusters := []*dag.ServiceCluster{ + { + ClusterName: "default/weighted", + Services: []dag.WeightedService{ + { + ServiceName: "weight0", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + { + ServiceName: "weight1", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + { + ServiceName: "weight2", + ServiceNamespace: "default", + ServicePort: v1.ServicePort{}, + }, + }, + }, + } + + require.NoError(t, endpointSliceTranslator.cache.SetClusters(clusters)) + + endpoints := []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.183.24"}, + }, + } + + ports := []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](8080), + Protocol: ref.To[v1.Protocol]("TCP"), + }, + } + + endpointSliceTranslator.OnAdd(endpointSlice("default", "weight0-eps-fs23r", "weight0", discoveryv1.AddressTypeIPv4, endpoints, ports), false) + endpointSliceTranslator.OnAdd(endpointSlice("default", "weight0-eps-sdf9f", "weight1", discoveryv1.AddressTypeIPv4, endpoints, ports), false) + endpointSliceTranslator.OnAdd(endpointSlice("default", "weight0-eps-v9drg", "weight2", discoveryv1.AddressTypeIPv4, endpoints, ports), false) + + // Each helper builds a `LocalityLbEndpoints` with one + // entry, so we can compose the final result by reaching + // in an taking the first element of each slice. + w0 := envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)) + w1 := envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)) + w2 := envoy_v3.WeightedEndpoints(1, envoy_v3.SocketAddress("192.168.183.24", 8080)) + + want := []proto.Message{ + &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "default/weighted", + Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{ + w0[0], w1[0], w2[0], + }, + }, + } + + protobuf.ExpectEqual(t, want, endpointSliceTranslator.Contents()) +} diff --git a/internal/xdscache/v3/endpointstranslator.go b/internal/xdscache/v3/endpointstranslator.go index fe15c3feb09..83e1d7309f8 100644 --- a/internal/xdscache/v3/endpointstranslator.go +++ b/internal/xdscache/v3/endpointstranslator.go @@ -38,16 +38,16 @@ type LoadBalancingEndpoint = envoy_endpoint_v3.LbEndpoint // RecalculateEndpoints generates a slice of LoadBalancingEndpoint // resources by matching the given service port to the given v1.Endpoints. -// ep may be nil, in which case, the result is also nil. -func RecalculateEndpoints(port, healthPort v1.ServicePort, ep *v1.Endpoints) []*LoadBalancingEndpoint { - if ep == nil { +// eps may be nil, in which case, the result is also nil. +func RecalculateEndpoints(port, healthPort v1.ServicePort, eps *v1.Endpoints) []*LoadBalancingEndpoint { + if eps == nil { return nil } var lb []*LoadBalancingEndpoint var healthCheckPort int32 - for _, s := range ep.Subsets { + for _, s := range eps.Subsets { // Skip subsets without ready addresses. if len(s.Addresses) < 1 { continue @@ -199,16 +199,16 @@ func (c *EndpointsCache) SetClusters(clusters []*dag.ServiceCluster) error { return nil } -// UpdateEndpoint adds ep to the cache, or replaces it if it is +// UpdateEndpoint adds eps to the cache, or replaces it if it is // already cached. Any ServiceClusters that are backed by a Service -// that ep belongs become stale. Returns a boolean indicating whether -// any ServiceClusters use ep or not. -func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool { +// that eps belongs become stale. Returns a boolean indicating whether +// any ServiceClusters use eps or not. +func (c *EndpointsCache) UpdateEndpoint(eps *v1.Endpoints) bool { c.mu.Lock() defer c.mu.Unlock() - name := k8s.NamespacedNameOf(ep) - c.endpoints[name] = ep.DeepCopy() + name := k8s.NamespacedNameOf(eps) + c.endpoints[name] = eps.DeepCopy() // If any service clusters include this endpoint, mark them // all as stale. @@ -220,14 +220,14 @@ func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool { return false } -// DeleteEndpoint deletes ep from the cache. Any ServiceClusters -// that are backed by a Service that ep belongs become stale. Returns -// a boolean indicating whether any ServiceClusters use ep or not. -func (c *EndpointsCache) DeleteEndpoint(ep *v1.Endpoints) bool { +// DeleteEndpoint deletes eps from the cache. Any ServiceClusters +// that are backed by a Service that eps belongs become stale. Returns +// a boolean indicating whether any ServiceClusters use eps or not. +func (c *EndpointsCache) DeleteEndpoint(eps *v1.Endpoints) bool { c.mu.Lock() defer c.mu.Unlock() - name := k8s.NamespacedNameOf(ep) + name := k8s.NamespacedNameOf(eps) delete(c.endpoints, name) // If any service clusters include this endpoint, mark them @@ -458,3 +458,5 @@ func (e *EndpointsTranslator) Query(names []string) []proto.Message { } func (*EndpointsTranslator) TypeURL() string { return resource.EndpointType } + +func (e *EndpointsTranslator) SetObserver(observer contour.Observer) { e.Observer = observer } diff --git a/internal/xdscache/v3/server_test.go b/internal/xdscache/v3/server_test.go index b1716839613..e22e0d250c2 100644 --- a/internal/xdscache/v3/server_test.go +++ b/internal/xdscache/v3/server_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networking_v1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -40,6 +41,7 @@ import ( "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/fixture" + "github.com/projectcontour/contour/internal/ref" "github.com/projectcontour/contour/internal/xds" contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3" "github.com/projectcontour/contour/internal/xdscache" @@ -49,6 +51,7 @@ func TestGRPC(t *testing.T) { // tr and et is recreated before the start of each test. var et *EndpointsTranslator var eh *contour.EventHandler + var est *EndpointSliceTranslator tests := map[string]func(*testing.T, *grpc.ClientConn){ "StreamClusters": func(t *testing.T, cc *grpc.ClientConn) { @@ -105,6 +108,39 @@ func TestGRPC(t *testing.T) { checkrecv(t, stream) // check we receive one notification checktimeout(t, stream) // check that the second receive times out }, + "StreamEndpointSlices": func(t *testing.T, cc *grpc.ClientConn) { + et.OnAdd(&discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-scheduler", + Namespace: "kube-system", + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{ + "130.211.139.167", + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + }, + { + Port: ref.To[int32](80), + }, + }, + }, false) + + eds := envoy_service_endpoint_v3.NewEndpointDiscoveryServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + stream, err := eds.StreamEndpoints(ctx) + require.NoError(t, err) + sendreq(t, stream, resource.EndpointType) // send initial notification + checkrecv(t, stream) // check we receive one notification + checktimeout(t, stream) // check that the second receive times out + }, "StreamListeners": func(t *testing.T, cc *grpc.ClientConn) { // add an ingress, which will create a non tls listener eh.OnAdd(&networking_v1.Ingress{ @@ -201,12 +237,14 @@ func TestGRPC(t *testing.T) { for name, fn := range tests { t.Run(name, func(t *testing.T) { et = NewEndpointsTranslator(fixture.NewTestLogger(t)) + est = NewEndpointSliceTranslator(fixture.NewTestLogger(t)) resources := []xdscache.ResourceCache{ &ListenerCache{}, &SecretCache{}, &RouteCache{}, &ClusterCache{}, + est, et, NewRuntimeCache(ConfigurableRuntimeSettings{}), } diff --git a/pkg/config/parameters.go b/pkg/config/parameters.go index 9046c6485b9..10179cc1668 100644 --- a/pkg/config/parameters.go +++ b/pkg/config/parameters.go @@ -674,6 +674,13 @@ type Parameters struct { // Tracing holds the relevant configuration for exporting trace data to OpenTelemetry. Tracing *Tracing `yaml:"tracing,omitempty"` + + // FeatureFlags defines toggle to enable new contour features. + // available toggles are + // useEndpointSlices - configures contour to fetch endpoint data + // from k8s endpoint slices. defaults to false and reading endpoint + // data from the k8s endpoints. + FeatureFlags []string `yaml:"featureFlags,omitempty"` } // Tracing defines properties for exporting trace data to OpenTelemetry. @@ -841,6 +848,10 @@ type MetricsServerParameters struct { CABundle string `yaml:"ca-certificate-path,omitempty"` } +// FeatureFlags defines the set of feature flags +// to toggle new contour features. +type FeatureFlags []string + func (p *MetricsParameters) Validate() error { if err := p.Contour.Validate(); err != nil { return fmt.Errorf("metrics.contour: %v", err) diff --git a/site/content/docs/main/config/api-reference.html b/site/content/docs/main/config/api-reference.html index b988b1ac283..2c5bc508f24 100644 --- a/site/content/docs/main/config/api-reference.html +++ b/site/content/docs/main/config/api-reference.html @@ -5133,6 +5133,24 @@

ContourConfiguration

Tracing defines properties for exporting trace data to OpenTelemetry.

+ + +featureFlags +
+ + +FeatureFlags + + + + +

FeatureFlags defines toggle to enable new contour features. +Available toggles are: +useEndpointSlices - configures contour to fetch endpoint data +from k8s endpoint slices. defaults to false and reading endpoint +data from the k8s endpoints.

+ + @@ -5875,6 +5893,24 @@

ContourConfiguratio

Tracing defines properties for exporting trace data to OpenTelemetry.

+ + +featureFlags +
+ + +FeatureFlags + + + + +

FeatureFlags defines toggle to enable new contour features. +Available toggles are: +useEndpointSlices - configures contour to fetch endpoint data +from k8s endpoint slices. defaults to false and reading endpoint +data from the k8s endpoints.

+ +

ContourConfigurationStatus @@ -7464,6 +7500,16 @@

ExtensionServiceTarge +

FeatureFlags +([]string alias)

+

+(Appears on: +ContourConfigurationSpec) +

+

+

FeatureFlags defines the set of feature flags +to toggle new contour features.

+

GatewayConfig

diff --git a/site/content/docs/main/configuration.md b/site/content/docs/main/configuration.md index d09df20caaa..06eea341d61 100644 --- a/site/content/docs/main/configuration.md +++ b/site/content/docs/main/configuration.md @@ -103,6 +103,7 @@ Where Contour settings can also be specified with command-line flags, the comman | rateLimitService | RateLimitServiceConfig | | The [rate limit service configuration](#rate-limit-service-configuration). | | enableExternalNameService | boolean | `false` | Enable ExternalName Service processing. Enabling this has security implications. Please see the [advisory](https://github.com/projectcontour/contour/security/advisories/GHSA-5ph6-qq5x-7jwc) for more details. | | metrics | MetricsParameters | | The [metrics configuration](#metrics-configuration) | +| featureFlags | string array | `[]` | Defines the toggle to enable new contour features. Available toggles are:
1. `useEndpointSlices` - configures contour to fetch endpoint data from k8s endpoint slices. | ### TLS Configuration @@ -111,7 +112,7 @@ Contour should provision TLS hosts. | Field Name | Type | Default | Description | | ------------------------ | -------- | ----------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| minimum-protocol-version | string | `1.2` | This field specifies the minimum TLS protocol version that is allowed. Valid options are `1.2` (default) and `1.3`. Any other value defaults to TLS 1.2. +| minimum-protocol-version | string | `1.2` | This field specifies the minimum TLS protocol version that is allowed. Valid options are `1.2` (default) and `1.3`. Any other value defaults to TLS 1.2. | maximum-protocol-version | string | `1.3` | This field specifies the maximum TLS protocol version that is allowed. Valid options are `1.2` and `1.3`. Any other value defaults to TLS 1.3. | | fallback-certificate | | | [Fallback certificate configuration](#fallback-certificate). | | envoy-client-certificate | | | [Client certificate configuration for Envoy](#envoy-client-certificate). | diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index b95fb0778e9..e23cec58d5d 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -19,6 +19,7 @@ import ( "context" "io" "os" + "time" "github.com/onsi/ginkgo/v2" contour_api_v1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1" @@ -29,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -66,7 +68,7 @@ type Echo struct { } // Deploy runs DeployN with a default of 1 replica. -func (e *Echo) Deploy(ns, name string) func() { +func (e *Echo) Deploy(ns, name string) (func(), *appsv1.Deployment) { return e.DeployN(ns, name, 1) } @@ -76,7 +78,7 @@ func (e *Echo) Deploy(ns, name string) func() { // can be configured. Namespace is defaulted to "default" // and name is defaulted to "ingress-conformance-echo" if not provided. Returns // a cleanup function. -func (e *Echo) DeployN(ns, name string, replicas int32) func() { +func (e *Echo) DeployN(ns, name string, replicas int32) (func(), *appsv1.Deployment) { ns = valOrDefault(ns, "default") name = valOrDefault(name, "ingress-conformance-echo") @@ -179,7 +181,51 @@ func (e *Echo) DeployN(ns, name string, replicas int32) func() { return func() { require.NoError(e.t, e.client.Delete(context.TODO(), service)) require.NoError(e.t, e.client.Delete(context.TODO(), deployment)) + }, deployment +} + +func (e *Echo) ScaleAndWaitDeployment(name, ns string, replicas int32) { + deployment := &appsv1.Deployment{} + key := types.NamespacedName{ + Namespace: ns, + Name: name, } + + require.NoError(e.t, e.client.Get(context.TODO(), key, deployment)) + + deployment.Spec.Replicas = &replicas + + updateAndWaitFor(e.t, e.client, deployment, func(d *appsv1.Deployment) bool { + err := e.client.Get(context.Background(), key, deployment) + if err != nil { + return false + } + if deployment.Status.Replicas == replicas && deployment.Status.ReadyReplicas == replicas { + return true + } + return false + }, time.Second, time.Second*10) +} + +func (e *Echo) ListPodIPs(ns, name string) ([]string, error) { + ns = valOrDefault(ns, "default") + name = valOrDefault(name, "ingress-conformance-echo") + + pods := new(corev1.PodList) + podListOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{"app.kubernetes.io/name": name}), + Namespace: ns, + } + if err := e.client.List(context.TODO(), pods, podListOptions); err != nil { + return nil, err + } + + podIPs := make([]string, 0) + for _, pod := range pods.Items { + podIPs = append(podIPs, pod.Status.PodIP) + } + + return podIPs, nil } // DumpEchoLogs returns logs of the "conformance-echo" container in @@ -528,6 +574,7 @@ func DefaultContourConfiguration() *contour_api_v1alpha1.ContourConfiguration { Address: listenAllAddress(), Port: 8000, }, + FeatureFlags: UseFeatureFlagsFromEnv(), Envoy: &contour_api_v1alpha1.EnvoyConfig{ DefaultHTTPVersions: []contour_api_v1alpha1.HTTPVersionType{ "HTTP/1.1", "HTTP/2", @@ -600,6 +647,15 @@ func XDSServerTypeFromEnv() contour_api_v1alpha1.XDSServerType { return serverType } +func UseFeatureFlagsFromEnv() []string { + flags := make([]string, 0) + _, found := os.LookupEnv("CONTOUR_E2E_USE_ENDPOINT_SLICES") + if found { + flags = append(flags, "useEndpointSlices") + } + return flags +} + func valOrDefault(val, defaultVal string) string { if val != "" { return val diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 3af1dd02c8d..2bb9f028341 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -330,6 +330,31 @@ func createAndWaitFor[T client.Object](t require.TestingT, client client.Client, return obj, true } +func updateAndWaitFor[T client.Object](t require.TestingT, client client.Client, obj T, condition func(T) bool, interval, timeout time.Duration) (T, bool) { + require.NoError(t, client.Update(context.Background(), obj)) + + key := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + + if err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) { + if err := client.Get(ctx, key, obj); err != nil { + // if there was an error, we want to keep + // retrying, so just return false, not an + // error. + return false, nil + } + + return condition(obj), nil + }); err != nil { + // return the last response for logging/debugging purposes + return obj, false + } + + return obj, true +} + // CreateHTTPProxyAndWaitFor creates the provided HTTPProxy in the Kubernetes API // and then waits for the specified condition to be true. func (f *Framework) CreateHTTPProxyAndWaitFor(proxy *contourv1.HTTPProxy, condition func(*contourv1.HTTPProxy) bool) (*contourv1.HTTPProxy, bool) { diff --git a/test/e2e/infra/endpointslice_test.go b/test/e2e/infra/endpointslice_test.go new file mode 100644 index 00000000000..b5ba2e445ff --- /dev/null +++ b/test/e2e/infra/endpointslice_test.go @@ -0,0 +1,129 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package infra + +import ( + "time" + + envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + . "github.com/onsi/ginkgo/v2" + contourv1 "github.com/projectcontour/contour/apis/projectcontour/v1" + "github.com/projectcontour/contour/test/e2e" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func testSimpleEndpointSlice(namespace string) { + Specify("test endpoint slices", func() { + f.Fixtures.Echo.DeployN(namespace, "echo", 1) + + p := &contourv1.HTTPProxy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "endpoint-slice", + }, + Spec: contourv1.HTTPProxySpec{ + VirtualHost: &contourv1.VirtualHost{ + Fqdn: "eps.projectcontour.io", + }, + Routes: []contourv1.Route{ + { + Conditions: []contourv1.MatchCondition{ + { + Prefix: "/", + }, + }, + Services: []contourv1.Service{ + { + Name: "echo", + Port: 80, + }, + }, + }, + }, + }, + } + + f.CreateHTTPProxyAndWaitFor(p, e2e.HTTPProxyValid) + time.Sleep(time.Second * 10) + + k8sPodIPs, err := f.Fixtures.Echo.ListPodIPs(namespace, "echo") + require.NoError(f.T(), err) + envoyEndpoints, err := GetIPsFromAdminRequest() + require.NoError(f.T(), err) + require.ElementsMatch(f.T(), k8sPodIPs, envoyEndpoints) + + // scale up to 10 pods + f.Fixtures.Echo.ScaleAndWaitDeployment("echo", namespace, 10) + // give time for changes to be propagated by envoy and contour + time.Sleep(time.Second * 10) + k8sPodIPs, err = f.Fixtures.Echo.ListPodIPs(namespace, "echo") + require.NoError(f.T(), err) + envoyEndpoints, err = GetIPsFromAdminRequest() + require.NoError(f.T(), err) + require.ElementsMatch(f.T(), k8sPodIPs, envoyEndpoints) + + // scale down to 2 pods + f.Fixtures.Echo.ScaleAndWaitDeployment("echo", namespace, 2) + // give time for changes to be propagated by envoy and contour + time.Sleep(time.Second * 10) + k8sPodIPs, err = f.Fixtures.Echo.ListPodIPs(namespace, "echo") + require.NoError(f.T(), err) + envoyEndpoints, err = GetIPsFromAdminRequest() + require.NoError(f.T(), err) + require.ElementsMatch(f.T(), k8sPodIPs, envoyEndpoints) + + // scale to 0 + f.Fixtures.Echo.ScaleAndWaitDeployment("echo", namespace, 0) + // give time for changes to be propagated by envoy and contour + time.Sleep(time.Second * 10) + k8sPodIPs, err = f.Fixtures.Echo.ListPodIPs(namespace, "echo") + require.NoError(f.T(), err) + envoyEndpoints, err = GetIPsFromAdminRequest() + require.NoError(f.T(), err) + require.ElementsMatch(f.T(), k8sPodIPs, envoyEndpoints) + + }) +} + +// GetIPsFromAdminRequest makes a call to the envoy admin endpoint and parses +// all the IPs as a list from the echo cluster +func GetIPsFromAdminRequest() ([]string, error) { + resp, _ := f.HTTP.AdminRequestUntil(&e2e.HTTPRequestOpts{ + Path: "/clusters?format=json", + Condition: e2e.HasStatusCode(200), + }) + + ips := make([]string, 0) + + clusters := &envoy_cluster_v3.Clusters{} + err := protojson.Unmarshal(resp.Body, clusters) + if err != nil { + return nil, err + } + + for _, cluster := range clusters.ClusterStatuses { + if cluster.Name == "simple-endpoint-slice/echo/80/da39a3ee5e" { + for _, host := range cluster.HostStatuses { + ips = append(ips, host.Address.GetSocketAddress().Address) + } + } + } + + return ips, nil +} diff --git a/test/e2e/infra/infra_test.go b/test/e2e/infra/infra_test.go index 21ec33a229e..8753fe55d48 100644 --- a/test/e2e/infra/infra_test.go +++ b/test/e2e/infra/infra_test.go @@ -149,4 +149,22 @@ var _ = Describe("Infra", func() { }) f.Test(testAdminInterface) + + Context("contour with endpoint slices", func() { + withEndpointSlicesEnabled := func(body e2e.NamespacedTestBody) e2e.NamespacedTestBody { + return func(namespace string) { + Context("with endpoint slice enabled", func() { + BeforeEach(func() { + contourConfig.FeatureFlags = []string{ + "useEndpointSlices", + } + }) + + body(namespace) + }) + } + } + + f.NamespacedTest("simple-endpoint-slice", withEndpointSlicesEnabled(testSimpleEndpointSlice)) + }) })