diff --git a/cmd/antrea-agent-simulator/simulator.go b/cmd/antrea-agent-simulator/simulator.go index 755642e6ac3..f9410edfc37 100644 --- a/cmd/antrea-agent-simulator/simulator.go +++ b/cmd/antrea-agent-simulator/simulator.go @@ -29,7 +29,7 @@ import ( componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/env" "antrea.io/antrea/pkg/util/k8s" @@ -49,7 +49,10 @@ func run() error { } // Create Antrea Clientset for the given config. - antreaClientProvider := agent.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient) + antreaClientProvider, err := client.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient) + if err != nil { + return err + } if err = antreaClientProvider.RunOnce(); err != nil { return err diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 246a1f19a0c..bf621481bde 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -34,6 +34,7 @@ import ( mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/agent" "antrea.io/antrea/pkg/agent/apiserver" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/config" @@ -127,7 +128,10 @@ func run(o *Options) error { nodeLatencyMonitorInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors() // Create Antrea Clientset for the given config. - antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) + antreaClientProvider, err := client.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) + if err != nil { + return fmt.Errorf("failed to create Antrea client provider: %w", err) + } // Register Antrea Agent metrics if EnablePrometheusMetrics is set if *o.config.EnablePrometheusMetrics { @@ -794,8 +798,6 @@ func run(o *Options) error { } } - // NetworkPolicyController and EgressController accesses the "antrea" Service via its ClusterIP. - // Run them after AntreaProxy is ready. go networkPolicyController.Run(stopCh) if o.enableEgress { go egressController.Run(stopCh) diff --git a/pkg/agent/client.go b/pkg/agent/client/client.go similarity index 58% rename from pkg/agent/client.go rename to pkg/agent/client/client.go index 57d87baa92e..63b13e78de0 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client/client.go @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package agent +package client import ( "context" "fmt" - "net" "os" + "strconv" "sync" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -40,7 +40,9 @@ type AntreaClientProvider interface { GetAntreaClient() (versioned.Interface, error) } -// antreaClientProvider provides an AntreaClientProvider that can dynamically react to ConfigMap changes. +// antreaClientProvider provides an AntreaClientProvider that can dynamically react to CA bundle +// ConfigMap changes, as well as directly resolve the Antrea Service Endpoint when running inside a K8s cluster. +// The consumers of antreaClientProvider are supposed to always call GetAntreaClient() to get a client and not cache it. type antreaClientProvider struct { config config.ClientConnectionConfiguration // mutex protects client. @@ -49,27 +51,62 @@ type antreaClientProvider struct { client versioned.Interface // caContentProvider provides the very latest content of the ca bundle. caContentProvider *dynamiccertificates.ConfigMapCAController + // endpointResolver provides a known Endpoint for the Antrea Service. There is usually a + // single Endpoint at any given time, given that the Antrea Controller runs as a + // single-replica Deployment. By resolving the Endpoint manually and accessing it directly, + // instead of depending on the ClusterIP functionality provided by the K8s proxy, we get + // more flexibility when initializing the Antrea Agent. For example, we can retrieve + // NetworkPolicies from the Controller even if the proxy is not (yet) available. + // endpointResolver is only used when no kubeconfig is provided (otherwise we honor the + // provided config). + endpointResolver *EndpointResolver } +// antreaClientProvider must implement the dynamiccertificates.Listener interface to be notified of +// CA bundle updates. var _ dynamiccertificates.Listener = &antreaClientProvider{} -func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) *antreaClientProvider { - // The key "ca.crt" may not exist at the beginning, no need to fail as the CA provider will watch the ConfigMap - // and notify antreaClientProvider of any update. The consumers of antreaClientProvider are supposed to always - // call GetAntreaClient() to get a client and not cache it. - antreaCAProvider, _ := dynamiccertificates.NewDynamicCAFromConfigMapController( +// antreaClientProvider must implement the Listener interface to be notified of an Endpoint change +// for the Antrea Service. +var _ Listener = &antreaClientProvider{} + +func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) (*antreaClientProvider, error) { + antreaCAProvider, err := dynamiccertificates.NewDynamicCAFromConfigMapController( "antrea-ca", cert.GetCAConfigMapNamespace(), apis.AntreaCAConfigMapName, apis.CAConfigMapKey, kubeClient) + if err != nil { + return nil, err + } + + var endpointResolver *EndpointResolver + if len(config.Kubeconfig) == 0 { + klog.InfoS("No Antrea kubeconfig file was specified. Falling back to in-cluster config") + port := os.Getenv("ANTREA_SERVICE_PORT") + if len(port) == 0 { + return nil, fmt.Errorf("unable to create Endpoint resolver for Antrea Service, ANTREA_SERVICE_PORT must be defined for in-cluster config") + } + servicePort, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid port number stored in ANTREA_SERVICE_PORT: %w", err) + } + endpointResolver = NewEndpointResolver(kubeClient, env.GetAntreaNamespace(), apis.AntreaServiceName, int32(servicePort)) + } + antreaClientProvider := &antreaClientProvider{ config: config, caContentProvider: antreaCAProvider, + endpointResolver: endpointResolver, } antreaCAProvider.AddListener(antreaClientProvider) - return antreaClientProvider + if endpointResolver != nil { + endpointResolver.AddListener(antreaClientProvider) + } + + return antreaClientProvider, nil } // RunOnce runs the task a single time synchronously, ensuring client is initialized if kubeconfig is specified. @@ -80,14 +117,18 @@ func (p *antreaClientProvider) RunOnce() error { // Run starts the caContentProvider, which watches the ConfigMap and notifies changes // by calling Enqueue. func (p *antreaClientProvider) Run(ctx context.Context) { - p.caContentProvider.Run(ctx, 1) + go p.caContentProvider.Run(ctx, 1) + if p.endpointResolver != nil { + go p.endpointResolver.Run(ctx) + } + <-ctx.Done() } // Enqueue implements dynamiccertificates.Listener. It will be called by caContentProvider // when caBundle is updated. func (p *antreaClientProvider) Enqueue() { if err := p.updateAntreaClient(); err != nil { - klog.Errorf("Failed to update Antrea client: %v", err) + klog.ErrorS(err, "Failed to update Antrea client") } } @@ -105,13 +146,17 @@ func (p *antreaClientProvider) updateAntreaClient() error { var kubeConfig *rest.Config var err error if len(p.config.Kubeconfig) == 0 { - klog.Info("No antrea kubeconfig file was specified. Falling back to in-cluster config") caBundle := p.caContentProvider.CurrentCABundleContent() if caBundle == nil { - klog.Info("Didn't get CA certificate, skip updating Antrea Client") + klog.InfoS("Didn't get CA certificate, skip updating Antrea Client") return nil } - kubeConfig, err = inClusterConfig(caBundle) + endpointURL := p.endpointResolver.CurrentEndpointURL() + if endpointURL == nil { + klog.InfoS("Didn't get Endpoint URL for Antrea Service, skip updating Antrea Client") + return nil + } + kubeConfig, err = inClusterConfig(caBundle, endpointURL.String()) } else { kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &clientcmd.ClientConfigLoadingRules{ExplicitPath: p.config.Kubeconfig}, @@ -138,17 +183,12 @@ func (p *antreaClientProvider) updateAntreaClient() error { return nil } -// inClusterConfig returns a config object which uses the service account -// kubernetes gives to pods. It's intended for clients that expect to be -// running inside a pod running on kubernetes. It will return error -// if called from a process not running in a kubernetes environment. -func inClusterConfig(caBundle []byte) (*rest.Config, error) { +// inClusterConfig returns a config object which uses the service account Kubernetes gives to +// Pods. It's intended for clients that expect to be running inside a Pod running on Kubernetes. It +// will return error if called from a process not running in a Kubernetes environment. +func inClusterConfig(caBundle []byte, endpoint string) (*rest.Config, error) { // #nosec G101: false positive triggered by variable name which includes "token" const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" - host, port := os.Getenv("ANTREA_SERVICE_HOST"), os.Getenv("ANTREA_SERVICE_PORT") - if len(host) == 0 || len(port) == 0 { - return nil, fmt.Errorf("unable to load in-cluster configuration, ANTREA_SERVICE_HOST and ANTREA_SERVICE_PORT must be defined") - } token, err := os.ReadFile(tokenFile) if err != nil { @@ -161,7 +201,7 @@ func inClusterConfig(caBundle []byte) (*rest.Config, error) { } return &rest.Config{ - Host: "https://" + net.JoinHostPort(host, port), + Host: endpoint, TLSClientConfig: tlsClientConfig, BearerToken: string(token), BearerTokenFile: tokenFile, diff --git a/pkg/agent/client/endpoint_resolver.go b/pkg/agent/client/endpoint_resolver.go new file mode 100644 index 00000000000..9e4eef36fd7 --- /dev/null +++ b/pkg/agent/client/endpoint_resolver.go @@ -0,0 +1,292 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "fmt" + "net/url" + "reflect" + "sync/atomic" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/proxy" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const ( + // informerDefaultResync is the default resync period if a handler doesn't specify one. + // Use the same default value as kube-controller-manager: + // https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120 + informerDefaultResync = 12 * time.Hour + + minRetryDelay = 100 * time.Millisecond + maxRetryDelay = 30 * time.Second +) + +// Listener defines the interface which needs to be implemented by clients which want to subscribe +// to Endpoint updates. +type Listener interface { + Enqueue() +} + +// EndpointResolver is in charge of resolving a specific Service Endpoint, which can then be +// accessed directly instead of depending on the ClusterIP functionality provided by K8s proxies +// (whether it's kube-proxy or AntreaProxy). A new Endpoint is resolved every time the Service's +// Spec or the Endpoints' Subsets are updated, and registered listeners are notified. While this +// EndpointResolver is somewhat generic, at the moment it is only meant to be used for the Antrea +// Service. +type EndpointResolver struct { + // name is the name of the controller in charge of Endpoint resolution. + name string + namespace string + serviceName string + servicePort int32 + // informerFactory is stored here so it can be started in the Run() method. + informerFactory informers.SharedInformerFactory + // serviceLister is used to retrieve the Service when selecting an Endpoint. + serviceLister corev1listers.ServiceLister + serviceListerSynced cache.InformerSynced + // endpointLister is used to retrieve the Endpoints for the Service during Endpoint selection. + endpointsLister corev1listers.EndpointsLister + endpointsListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + // listeners need to implement the Listerner interface and will get notified when the + // current Endpoint URL changes. + listeners []Listener + endpointURL atomic.Pointer[url.URL] +} + +func NewEndpointResolver(kubeClient kubernetes.Interface, namespace, serviceName string, servicePort int32) *EndpointResolver { + key := namespace + "/" + serviceName + controllerName := fmt.Sprintf("ServiceEndpointResolver:%s", key) + + // We only need a specific Service and corresponding Endpoints resource, so we create our + // own informer factory, and we filter by namespace and name. + informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, informerDefaultResync, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(listOptions *metav1.ListOptions) { + listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", serviceName).String() + })) + serviceInformer := informerFactory.Core().V1().Services() + endpointsInformer := informerFactory.Core().V1().Endpoints() + + resolver := &EndpointResolver{ + name: controllerName, + namespace: namespace, + serviceName: serviceName, + servicePort: servicePort, + informerFactory: informerFactory, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + endpointsLister: endpointsInformer.Lister(), + endpointsListerSynced: endpointsInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), controllerName), + } + + serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + // FilterFunc ignores all Service events which do not relate to the named Service. + // It should be redundant given the filtering that we already do at the informer level. + FilterFunc: func(obj interface{}) bool { + if service, ok := obj.(*corev1.Service); ok { + return service.Namespace == namespace && service.Name == serviceName + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if service, ok := tombstone.Obj.(*corev1.Service); ok { + return service.Namespace == namespace && service.Name == serviceName + } + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // This should not happen: both objects should be Services in the + // update event handler. + oldSvc, ok := oldObj.(*corev1.Service) + if !ok { + return + } + newSvc, ok := newObj.(*corev1.Service) + if !ok { + return + } + // Ignore changes to metadata or status. + if reflect.DeepEqual(newSvc.Spec, oldSvc.Spec) { + return + } + resolver.queue.Add(key) + }, + DeleteFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + }, + }) + endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + // FilterFunc ignores all Endpoints events which do not relate to the named Service. + // It should be redundant given the filtering that we already do at the informer level. + FilterFunc: func(obj interface{}) bool { + // The Endpoints resource for a Service has the same name as the Service. + if endpoints, ok := obj.(*corev1.Endpoints); ok { + return endpoints.Namespace == namespace && endpoints.Name == serviceName + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if endpoints, ok := tombstone.Obj.(*corev1.Endpoints); ok { + return endpoints.Namespace == namespace && endpoints.Name == serviceName + } + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // This should not happen: both objects should be Endpoints in the + // update event handler. + oldEndpoints, ok := oldObj.(*corev1.Endpoints) + if !ok { + return + } + newEndpoints, ok := newObj.(*corev1.Endpoints) + if !ok { + return + } + // Ignore changes to metadata. + if reflect.DeepEqual(newEndpoints.Subsets, oldEndpoints.Subsets) { + return + } + resolver.queue.Add(key) + }, + DeleteFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + }, + }) + return resolver +} + +func (r *EndpointResolver) Run(ctx context.Context) { + defer r.queue.ShutDown() + + klog.InfoS("Starting controller", "name", r.name) + defer klog.InfoS("Shutting down controller", "name", r.name) + + r.informerFactory.Start(ctx.Done()) + defer r.informerFactory.Shutdown() + + if !cache.WaitForNamedCacheSync(r.name, ctx.Done(), r.serviceListerSynced, r.endpointsListerSynced) { + return + } + + // We only start one worker for this controller. + go wait.Until(r.runWorker, time.Second, ctx.Done()) + + <-ctx.Done() +} + +func (r *EndpointResolver) runWorker() { + for r.processNextWorkItem() { + } +} + +func (r *EndpointResolver) processNextWorkItem() bool { + key, quit := r.queue.Get() + if quit { + return false + } + defer r.queue.Done(key) + + if err := r.resolveEndpoint(); err == nil { + r.queue.Forget(key) + } else { + klog.ErrorS(err, "Failed to resolve Service Endpoint, requeuing", "key", key) + r.queue.AddRateLimited(key) + } + + return true +} + +func (r *EndpointResolver) resolveEndpoint() error { + klog.V(2).InfoS("Resolving Endpoint", "service", klog.KRef(r.namespace, r.serviceName)) + endpointURL, err := proxy.ResolveEndpoint(r.serviceLister, r.endpointsLister, r.namespace, r.serviceName, r.servicePort) + // Typically we will get one of these 2 errors (unavailable or not found). + // In this case, it makes sense to reset the Endpoint URL to nil and notify listeners. + // There is also no need to retry, as we won't find a suitable Endpoint until the Service or + // the Endpoints resource is updated in a way that will cause this function to be called again. + if errors.IsServiceUnavailable(err) { + klog.ErrorS(err, "Cannot resolve endpoint because Service is unavailable", "service", klog.KRef(r.namespace, r.serviceName)) + r.updateEndpointIfNeeded(nil) + return nil + } + if errors.IsNotFound(err) { + klog.ErrorS(err, "Cannot resolve endpoint because of missing resource", "service", klog.KRef(r.namespace, r.serviceName)) + r.updateEndpointIfNeeded(nil) + return nil + } + if err != nil { + // Unknown error: we err on the side of caution. + // Do not reset the URL or notify listeners, and trigger a retry. + return err + } + klog.V(2).InfoS("Resolved Endpoint", "service", klog.KRef(r.namespace, r.serviceName), "url", endpointURL) + r.updateEndpointIfNeeded(endpointURL) + return nil +} + +func (r *EndpointResolver) updateEndpointIfNeeded(endpointURL *url.URL) { + // The separate Load and Store calls are safe because there is a single writer for r.endpointURL. + currentEndpointURL := r.endpointURL.Load() + updateNeeded := func() bool { + if endpointURL == nil && currentEndpointURL == nil { + return false + } + if endpointURL == nil || currentEndpointURL == nil { + return true + } + return endpointURL.String() != currentEndpointURL.String() + } + if !updateNeeded() { + klog.V(2).InfoS("No change to Endpoint for Service, no need to notify listeners", "service", klog.KRef(r.namespace, r.serviceName)) + return + } + if endpointURL != nil { + klog.InfoS("Selected a new Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName), "url", endpointURL) + } else { + klog.InfoS("Selected no Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName)) + } + r.endpointURL.Store(endpointURL) + for _, listener := range r.listeners { + listener.Enqueue() + } +} + +func (r *EndpointResolver) AddListener(listener Listener) { + r.listeners = append(r.listeners, listener) +} + +func (r *EndpointResolver) CurrentEndpointURL() *url.URL { + return r.endpointURL.Load() +} diff --git a/pkg/agent/client/endpoint_resolver_test.go b/pkg/agent/client/endpoint_resolver_test.go new file mode 100644 index 00000000000..e3fc6017667 --- /dev/null +++ b/pkg/agent/client/endpoint_resolver_test.go @@ -0,0 +1,159 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "fmt" + "net" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" +) + +const ( + testNamespace = "kube-system" + testServiceName = "antrea" + testServicePort = 443 + testTargetPort = 10349 + testEndpointIP1 = "172.18.0.3" + testEndpointIP2 = "172.18.0.4" +) + +func getTestObjects() (*corev1.Service, *corev1.Endpoints) { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testServiceName, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "antrea", + "component": "antrea=controller", + }, + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{ + { + Port: 443, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(testTargetPort), + }, + }, + }, + } + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testServiceName, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: testEndpointIP1, + }, + }, + Ports: []corev1.EndpointPort{ + { + Port: testTargetPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + } + return svc, endpoints +} + +func getEndpointURL(ip string) *url.URL { + return &url.URL{ + Scheme: "https", + Host: net.JoinHostPort(ip, fmt.Sprint(testTargetPort)), + } +} + +func runTestEndpointResolver(ctx context.Context, objects ...runtime.Object) (*fake.Clientset, *EndpointResolver) { + k8sClient := fake.NewSimpleClientset(objects...) + resolver := NewEndpointResolver(k8sClient, testNamespace, testServiceName, testServicePort) + go resolver.Run(ctx) + return k8sClient, resolver +} + +func TestEndpointResolver(t *testing.T) { + t.Run("add Service and Endpoints", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + k8sClient, resolver := runTestEndpointResolver(ctx) + require.Nil(t, resolver.CurrentEndpointURL()) + svc, endpoints := getTestObjects() + k8sClient.CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{}) + k8sClient.CoreV1().Endpoints(testNamespace).Create(ctx, endpoints, metav1.CreateOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) + + t.Run("update Endpoint address", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + svc, endpoints := getTestObjects() + k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + endpoints.Subsets[0].Addresses[0].IP = testEndpointIP2 + k8sClient.CoreV1().Endpoints(testNamespace).Update(ctx, endpoints, metav1.UpdateOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP2), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) + + t.Run("remove Endpoint address", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + svc, endpoints := getTestObjects() + k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + endpoints.Subsets = nil + k8sClient.CoreV1().Endpoints(testNamespace).Update(ctx, endpoints, metav1.UpdateOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Nil(t, resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) + + t.Run("delete Service", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + svc, endpoints := getTestObjects() + k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + k8sClient.CoreV1().Services(testNamespace).Delete(ctx, testServiceName, metav1.DeleteOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Nil(t, resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) +} diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index a6213897d40..7d50da0bf45 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -41,7 +41,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/ipassigner" "antrea.io/antrea/pkg/agent/memberlist" @@ -155,7 +155,7 @@ type EgressController struct { routeClient route.Interface k8sClient kubernetes.Interface crdClient clientsetversioned.Interface - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider egressInformer cache.SharedIndexInformer egressLister crdlisters.EgressLister @@ -210,7 +210,7 @@ type EgressController struct { func NewEgressController( ofClient openflow.Client, k8sClient kubernetes.Interface, - antreaClientGetter agent.AntreaClientProvider, + antreaClientGetter client.AntreaClientProvider, crdClient clientsetversioned.Interface, ifaceStore interfacestore.InterfaceStore, routeClient route.Interface, diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 63f1be42f4a..b95a8e1ec0f 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -34,7 +34,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/networkpolicy/l7engine" "antrea.io/antrea/pkg/agent/flowexporter/connections" @@ -113,14 +113,17 @@ type Controller struct { multicastEnabled bool // nodeType indicates type of the Node where Antrea Agent is running on. nodeType config.NodeType - // antreaClientProvider provides interfaces to get antreaClient, which can be - // used to watch Antrea AddressGroups, AppliedToGroups, and NetworkPolicies. - // We need to get antreaClient dynamically because the apiserver cert can be - // rotated and we need a new client with the updated CA cert. + // antreaClientProvider provides interfaces to get antreaClient, which + // can be used to watch Antrea AddressGroups, AppliedToGroups, and + // NetworkPolicies. We need to get antreaClient dynamically because we + // are not relying on the ClusterIP to access the Antrea Service (we + // resolve the endpoint directly, and the endpoint can change if the + // antrea-controller Pod is rescheduled), and because the apiserver cert + // can be rotated and we need a new client with the updated CA cert. // Verifying server certificate only takes place for new requests and existing // watches won't be interrupted by rotating cert. The new client will be used // after the existing watches expire. - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider // queue maintains the NetworkPolicy ruleIDs that need to be synced. queue workqueue.RateLimitingInterface // ruleCache maintains the desired state of NetworkPolicy rules. @@ -167,7 +170,7 @@ type Controller struct { } // NewNetworkPolicyController returns a new *Controller. -func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, +func NewNetworkPolicyController(antreaClientGetter client.AntreaClientProvider, ofClient openflow.Client, routeClient route.Interface, ifaceStore interfacestore.InterfaceStore, @@ -594,7 +597,13 @@ func (c *Controller) SetDenyConnStore(denyConnStore *connections.DenyConnectionS // Run will not return until stopCh is closed. func (c *Controller) Run(stopCh <-chan struct{}) { attempts := 0 - if err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 200*time.Millisecond, true, func(ctx context.Context) (bool, error) { + // If Antrea client is not ready within 5s, we assume that the Antrea Controller is not + // available. We proceed with our watches, which are likely to fail. In turn, this will + // trigger the fallback mechanism. + // 5s should be more than enough if the Antrea Controller is running correctly. + ctx, cancel := context.WithTimeout(wait.ContextForChannel(stopCh), 5*time.Second) + defer cancel() + if err := wait.PollUntilContextCancel(ctx, 200*time.Millisecond, true, func(ctx context.Context) (bool, error) { if attempts%10 == 0 { klog.Info("Waiting for Antrea client to be ready") } @@ -605,9 +614,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) { return true, nil }); err != nil { klog.Info("Stopped waiting for Antrea client") - return + } else { + klog.Info("Antrea client is ready") } - klog.Info("Antrea client is ready") // Use NonSlidingUntil so that normal reconnection (disconnected after // running a while) can reconnect immediately while abnormal reconnection diff --git a/pkg/agent/controller/networkpolicy/status_controller.go b/pkg/agent/controller/networkpolicy/status_controller.go index f062c41accc..9307048c69b 100644 --- a/pkg/agent/controller/networkpolicy/status_controller.go +++ b/pkg/agent/controller/networkpolicy/status_controller.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" ) @@ -82,7 +82,7 @@ func realizedRulePolicyIndexFunc(obj interface{}) ([]string, error) { return []string{string(rule.policyID)}, nil } -func newStatusController(antreaClientProvider agent.AntreaClientProvider, nodeName string, ruleCache *ruleCache) *StatusController { +func newStatusController(antreaClientProvider client.AntreaClientProvider, nodeName string, ruleCache *ruleCache) *StatusController { return &StatusController{ statusControlInterface: &networkPolicyStatusControl{antreaClientProvider: antreaClientProvider}, nodeName: nodeName, @@ -223,7 +223,7 @@ type networkPolicyStatusControlInterface interface { } type networkPolicyStatusControl struct { - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider } func (c *networkPolicyStatusControl) UpdateNetworkPolicyStatus(name string, status *v1beta2.NetworkPolicyStatus) error { diff --git a/pkg/agent/stats/collector.go b/pkg/agent/stats/collector.go index 2e1f19ffe8b..13543e271e6 100644 --- a/pkg/agent/stats/collector.go +++ b/pkg/agent/stats/collector.go @@ -23,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/multicast" "antrea.io/antrea/pkg/agent/openflow" agenttypes "antrea.io/antrea/pkg/agent/types" @@ -57,7 +57,7 @@ type Collector struct { nodeName string // antreaClientProvider provides interfaces to get antreaClient, which will be used to report the statistics to the // antrea-controller. - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider // ofClient is the Openflow interface that can fetch the statistic of the Openflow entries. ofClient openflow.Client networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier @@ -68,7 +68,7 @@ type Collector struct { multicastEnabled bool } -func NewCollector(antreaClientProvider agent.AntreaClientProvider, ofClient openflow.Client, npQuerier querier.AgentNetworkPolicyInfoQuerier, mcQuerier *multicast.Controller) *Collector { +func NewCollector(antreaClientProvider client.AntreaClientProvider, ofClient openflow.Client, npQuerier querier.AgentNetworkPolicyInfoQuerier, mcQuerier *multicast.Controller) *Collector { nodeName, _ := env.GetNodeName() manager := &Collector{ nodeName: nodeName, diff --git a/pkg/agent/supportbundlecollection/support_bundle_controller.go b/pkg/agent/supportbundlecollection/support_bundle_controller.go index 160c236afdf..1202448019e 100644 --- a/pkg/agent/supportbundlecollection/support_bundle_controller.go +++ b/pkg/agent/supportbundlecollection/support_bundle_controller.go @@ -35,7 +35,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/exec" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" agentquerier "antrea.io/antrea/pkg/agent/querier" "antrea.io/antrea/pkg/apis/controlplane" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -69,7 +69,7 @@ type SupportBundleController struct { nodeName string supportBundleNodeType controlplane.SupportBundleCollectionNodeType namespace string - antreaClientGetter agent.AntreaClientProvider + antreaClientGetter client.AntreaClientProvider queue workqueue.Interface supportBundleCollection *cpv1b2.SupportBundleCollection supportBundleCollectionMutex sync.RWMutex @@ -84,7 +84,7 @@ type SupportBundleController struct { func NewSupportBundleController(nodeName string, supportBundleNodeType controlplane.SupportBundleCollectionNodeType, namespace string, - antreaClientGetter agent.AntreaClientProvider, + antreaClientGetter client.AntreaClientProvider, ovsCtlClient ovsctl.OVSCtlClient, aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 080c879c1a9..bef78a76cb0 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -743,17 +744,17 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { require.NoError(t, err) t.Cleanup(func() { data.deleteNetworkpolicy(netpol) }) - checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) { + checkFunc := func(t assert.TestingT, testPod string, testPodIPs *PodIPs, expectErr bool) { var wg sync.WaitGroup checkOne := func(clientPod, serverPod string, serverIP *net.IP) { defer wg.Done() if serverIP != nil { cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"} _, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd) - if expectErr && err == nil { - t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) - } else if !expectErr && err != nil { - t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err) + if expectErr { + assert.Error(t, err, "Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) + } else { + assert.NoError(t, err, "Pod %s should be able to connect %s, but was not able to connect", clientPod, serverPod) } } } @@ -783,7 +784,7 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { // While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully. for i := 0; i < 5; i++ { - checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(t, deniedPod, deniedPodIPs, true) } antreaPod, err := data.getAntreaPodOnNode(workerNode) @@ -792,15 +793,21 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse) waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue) // Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working. - checkFunc(deniedPod, deniedPodIPs, true) - checkFunc(allowedPod, allowedPodIPs, false) + checkFunc(t, deniedPod, deniedPodIPs, true) + // It may take some time for the antrea-agent to fallback to locally-saved policies. Until + // it happens, allowed traffic may be dropped. So we use polling to tolerate some delay. + // The important part is that traffic that should be denied is always denied, which we have + // already validated at that point. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + checkFunc(t, allowedPod, allowedPodIPs, false) + }, 10*time.Second, 1*time.Second) // Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller. scaleFunc(1) // Make sure antrea-agent connects to antrea-controller. waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue) - checkFunc(deniedPod, deniedPodIPs, true) - checkFunc(allowedPod, allowedPodIPs, false) + checkFunc(t, deniedPod, deniedPodIPs, true) + checkFunc(t, allowedPod, allowedPodIPs, false) } func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) {