From b9b4e5f13bd0e32c2cd0bea49c86f670e5752009 Mon Sep 17 00:00:00 2001 From: Wenying Dong Date: Wed, 6 Nov 2024 20:07:38 -0800 Subject: [PATCH 1/4] Install Pod flows after PortStatus message is received (#6763) This change introduces a worker in `podConfigurator` to listen for the OpenFlow PortStatus messages generated when new OpenFlow ports are allocated in OVS. After receiving a message, the Windows antrea-agent will install Pod-related OpenFlow entries. If the OpenFlow port is not allocated within 30s after the CmdAdd request is handled, a K8s event with type "NetworkNotReady" is added on the Pod. Whenever the Pod networking forwarding rules are installed, a K8s event with type "NetworkIsReady" is added. Signed-off-by: Wenying Dong --- cmd/antrea-agent/agent.go | 1 + go.mod | 13 +- go.sum | 27 +- hack/update-codegen-dockerized.sh | 1 + pkg/agent/cniserver/pod_configuration.go | 158 +++++- .../cniserver/pod_configuration_linux.go | 9 + .../cniserver/pod_configuration_linux_test.go | 2 +- pkg/agent/cniserver/pod_configuration_test.go | 459 ++++++++++++++++++ .../cniserver/pod_configuration_windows.go | 106 +++- pkg/agent/cniserver/secondary.go | 2 +- pkg/agent/cniserver/server.go | 10 +- pkg/agent/cniserver/server_linux_test.go | 12 +- pkg/agent/cniserver/server_windows_test.go | 175 +++---- pkg/agent/interfacestore/interface_cache.go | 5 + .../interfacestore/interface_cache_test.go | 48 ++ .../testing/mock_interfacestore.go | 14 +- pkg/agent/interfacestore/types.go | 6 + .../interfacestore/zz_generated.deepcopy.go | 155 ++++++ pkg/agent/openflow/client.go | 7 + pkg/agent/openflow/client_test.go | 11 + pkg/agent/openflow/testing/mock_openflow.go | 15 +- pkg/ovs/openflow/interfaces.go | 3 + pkg/ovs/openflow/ofctrl_bridge.go | 32 +- pkg/ovs/openflow/testing/mock_openflow.go | 14 +- test/integration/agent/cniserver_test.go | 3 + 25 files changed, 1133 insertions(+), 155 deletions(-) create mode 100644 pkg/agent/cniserver/pod_configuration_test.go create mode 100644 pkg/agent/interfacestore/zz_generated.deepcopy.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ec67c49a235..c761a8ffc61 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -598,6 +598,7 @@ func run(o *Options) error { o.config.CNISocket, o.config.HostProcPathPrefix, nodeConfig, + localPodInformer.Get(), k8sClient, routeClient, isChaining, diff --git a/go.mod b/go.mod index b69d2732e6b..aac52ffb2e2 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module antrea.io/antrea go 1.23.0 require ( - antrea.io/libOpenflow v0.14.0 - antrea.io/ofnet v0.12.0 + antrea.io/libOpenflow v0.15.0 + antrea.io/ofnet v0.14.0 github.com/ClickHouse/clickhouse-go/v2 v2.6.1 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Mellanox/sriovnet v1.1.0 @@ -73,7 +73,7 @@ require ( k8s.io/apiserver v0.29.2 k8s.io/client-go v0.29.2 k8s.io/component-base v0.29.2 - k8s.io/klog/v2 v2.110.1 + k8s.io/klog/v2 v2.130.1 k8s.io/kube-aggregator v0.29.2 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 k8s.io/kubectl v0.29.2 @@ -113,10 +113,9 @@ require ( github.com/aws/smithy-go v1.12.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenk/hub v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cenkalti/hub v1.0.1 // indirect - github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cenkalti/hub v1.0.2 // indirect + github.com/cenkalti/rpc2 v1.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/go.sum b/go.sum index 5c3e83b0a90..22753d9bfff 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE= -antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0= -antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao= -antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg= +antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI= +antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ= +antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc= +antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -120,14 +120,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA= -github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= -github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8= +github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU= +github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k= +github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -252,7 +250,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -1143,8 +1140,8 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= -k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= -k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kms v0.29.2 h1:MDsbp98gSlEQs7K7dqLKNNTwKFQRYYvO4UOlBOjNy6Y= k8s.io/kms v0.29.2/go.mod h1:s/9RC4sYRZ/6Tn6yhNjbfJuZdb8LzlXhdlBnKizeFDo= k8s.io/kube-aggregator v0.29.2 h1:z9qJn5wlGmGaX6EfM7OEhr6fq6SBjDKR6tPRZ/qgxeY= diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index cd5232fa825..a91f378d80c 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -148,6 +148,7 @@ function generate_antrea_client_code { --input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1beta1" \ --input-dirs "${ANTREA_PKG}/pkg/apis/stats" \ --input-dirs "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \ + --input-dirs "${ANTREA_PKG}/pkg/agent/interfacestore" \ -O zz_generated.deepcopy \ --go-header-file hack/boilerplate/license_header.go.txt diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index e44e739fd61..93b75b5d34d 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -15,16 +15,24 @@ package cniserver import ( + "bytes" "encoding/json" "fmt" "net" "strings" "sync" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/cni/pkg/version" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -57,10 +65,17 @@ const ( const ( defaultIFDevName = "eth0" + + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second ) var ( getNSPath = util.GetNSPath + // retryInterval is the interval to re-install Pod OpenFlow entries if any error happened. + // Note, using a variable rather than constant for retryInterval because we may use a shorter time in the + // test code. + retryInterval = 5 * time.Second ) type podConfigurator struct { @@ -76,9 +91,19 @@ type podConfigurator struct { // isSecondaryNetwork is true if this instance of podConfigurator is used to configure // Pod secondary network interfaces. isSecondaryNetwork bool + + containerAccess *containerAccessArbitrator + eventBroadcaster record.EventBroadcaster + recorder record.EventRecorder + podListerSynced cache.InformerSynced + podLister v1.PodLister + kubeClient clientset.Interface + unreadyPortQueue workqueue.RateLimitingInterface + statusCh chan *openflow15.PortStatus } func newPodConfigurator( + kubeClient clientset.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -88,12 +113,14 @@ func newPodConfigurator( isOvsHardwareOffloadEnabled bool, disableTXChecksumOffload bool, podUpdateNotifier channel.Notifier, + podInformer cache.SharedIndexInformer, + containerAccess *containerAccessArbitrator, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload) if err != nil { return nil, err } - return &podConfigurator{ + pc := &podConfigurator{ ovsBridgeClient: ovsBridgeClient, ofClient: ofClient, routeClient: routeClient, @@ -101,7 +128,12 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, - }, nil + kubeClient: kubeClient, + containerAccess: containerAccess, + } + // Initiate the PortStatus message listener. This function is a no-op except on Windows. + pc.initPortStatusMonitor(podInformer) + return pc, nil } func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) { @@ -166,13 +198,13 @@ func getContainerIPsString(ips []net.IP) string { // not created for a Pod interface. func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig { if portData.ExternalIDs == nil { - klog.V(2).Infof("OVS port %s has no external_ids", portData.Name) + klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name) return nil } containerID, found := portData.ExternalIDs[ovsExternalIDContainerID] if !found { - klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID) + klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name) return nil } @@ -187,8 +219,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC]) if err != nil { - klog.Errorf("Failed to parse MAC address from OVS external config %s: %v", - portData.ExternalIDs[ovsExternalIDMAC], err) + klog.ErrorS(err, "Failed to parse MAC address from OVS external config") } podName, _ := portData.ExternalIDs[ovsExternalIDPodName] podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace] @@ -279,7 +310,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s func (pc *podConfigurator) removeInterfaces(containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } @@ -498,7 +529,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int. func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error { containerID := containerConfig.ContainerID - klog.V(2).Infof("Deleting Openflow entries for container %s", containerID) + klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID) if !pc.isSecondaryNetwork { if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil { return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err) @@ -513,6 +544,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil { return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err) } + // Remove container configuration from cache. pc.ifaceStore.DeleteInterface(containerConfig) if !pc.isSecondaryNetwork { @@ -558,7 +590,7 @@ func (pc *podConfigurator) connectInterceptedInterface( func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } for _, ip := range containerConfig.IPs { @@ -570,3 +602,111 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, return pc.disconnectInterfaceFromOVS(containerConfig) // TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy?? } + +func (pc *podConfigurator) processNextWorkItem() bool { + key, quit := pc.unreadyPortQueue.Get() + if quit { + return false + } + defer pc.unreadyPortQueue.Done(key) + + if err := pc.updateUnreadyPod(key.(string)); err != nil { + klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key) + // Put the item back on the workqueue to handle any transient errors. + pc.unreadyPortQueue.AddAfter(key, retryInterval) + } + return true +} + +func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error { + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort) + return nil + } + + pc.containerAccess.lockContainer(ifConfig.ContainerID) + defer pc.containerAccess.unlockContainer(ifConfig.ContainerID) + // Get the InterfaceConfig again after the lock to avoid race conditions. + ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort) + return nil + } + + if ifConfig.OFPort == 0 { + // Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated. + // Returns error so that we can have a retry after 5s. + pc.recordPodEvent(ifConfig, false) + return fmt.Errorf("pod's OpenFlow port is not ready yet") + } + + // Install OpenFlow entries for the Pod. + klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort) + if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil { + // Add Pod not-ready event if the pod flows installation fails. + // Returns error so that we can have a retry after 5s. + pc.recordPodEvent(ifConfig, false) + return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err) + } + + // Notify the Pod update event to required components. + event := agenttypes.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) + + pc.recordPodEvent(ifConfig, true) + return nil +} + +func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConfig, installed bool) { + pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName) + if err != nil { + klog.InfoS("Unable to get Pod, skip recording Pod event", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName)) + return + } + + if installed { + // Add normal event to record Pod network is ready. + pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules") + return + } + + pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed") +} + +func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) { + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + if status.Desc.State != openflow15.PS_LIVE { + return + } + ovsPort := string(bytes.Trim(status.Desc.Name, "\x00")) + ofPort := status.Desc.PortNo + + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found", "ovsPort", ovsPort) + return + } + + func() { + pc.containerAccess.lockContainer(ifConfig.ContainerID) + defer pc.containerAccess.unlockContainer(ifConfig.ContainerID) + // Get the InterfaceConfig again after the lock to avoid race conditions. + ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found", "ovsPort", ovsPort) + return + } + // Update interface config with the ofPort. + newIfConfig := ifConfig.DeepCopy() + newIfConfig.OVSPortConfig.OFPort = int32(ofPort) + pc.ifaceStore.UpdateInterface(newIfConfig) + }() + + pc.unreadyPortQueue.Add(ovsPort) +} diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index fe281f06330..ef59a6beeab 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -21,6 +21,7 @@ import ( "fmt" current "github.com/containernetworking/cni/pkg/types/100" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -113,3 +114,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName) } } + +func (pc *podConfigurator) initPortStatusMonitor(_ cache.SharedIndexInformer) { + +} + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + <-stopCh +} diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 4f51bd4892b..bd7703424b1 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_test.go b/pkg/agent/cniserver/pod_configuration_test.go new file mode 100644 index 00000000000..10457b97175 --- /dev/null +++ b/pkg/agent/cniserver/pod_configuration_test.go @@ -0,0 +1,459 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "fmt" + "net" + "testing" + "time" + + "antrea.io/libOpenflow/openflow15" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + coreinformers "k8s.io/client-go/informers/core/v1" + fakeclientset "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" +) + +var ( + podIfName = "test" + podIPs = []net.IP{net.ParseIP("192.168.9.10")} + podMac, _ = net.ParseMAC("00:15:5D:B2:6F:38") + podInfraContainerID = "261a1970-5b6c-11ed-8caf-000c294e5d03" + + pod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPodNameA, + Namespace: testPodNamespace, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + } + + portStatusMsg = &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + } +) + +type mockClients struct { + kubeClient *fakeclientset.Clientset + localPodInformer cache.SharedIndexInformer + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + ofClient *openflowtest.MockClient + recorder *record.FakeRecorder +} + +func newMockClients(ctrl *gomock.Controller, nodeName string, objects ...runtime.Object) *mockClients { + kubeClient := fakeclientset.NewSimpleClientset(objects...) + + localPodInformer := coreinformers.NewFilteredPodInformer( + kubeClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + }, + ) + podLister := corelisters.NewPodLister(localPodInformer.GetIndexer()) + ofClient := openflowtest.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(100) + recorder.IncludeObject = false + + return &mockClients{ + kubeClient: kubeClient, + localPodInformer: localPodInformer, + podLister: podLister, + podListerSynced: localPodInformer.HasSynced, + ofClient: ofClient, + recorder: recorder, + } +} + +func (c *mockClients) startInformers(stopCh chan struct{}) { + go c.localPodInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, c.localPodInformer.HasSynced) +} + +type asyncWaiter struct { + podName string + containerID string + waitCh chan struct{} + notifier *channel.SubscribableChannel +} + +func (w *asyncWaiter) notify(e interface{}) { + podUpdate := e.(types.PodUpdate) + if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { + w.waitCh <- struct{}{} + } +} + +func (w *asyncWaiter) waitUntil(timeout time.Duration) bool { + select { + case <-w.waitCh: + return true + case <-time.After(timeout): + return false + } +} + +func newAsyncWaiter(podName, containerID string, stopCh chan struct{}) *asyncWaiter { + waiter := &asyncWaiter{ + podName: podName, + containerID: containerID, + waitCh: make(chan struct{}), + notifier: channel.NewSubscribableChannel("PodUpdate", 100), + } + waiter.notifier.Subscribe(waiter.notify) + go waiter.notifier.Run(stopCh) + return waiter +} + +func mockRetryInterval(t *testing.T) { + oriRetryInterval := retryInterval + retryInterval = -1 + t.Cleanup(func() { + retryInterval = oriRetryInterval + }) +} + +func newTestPodConfigurator(testClients *mockClients, waiter *asyncWaiter) *podConfigurator { + interfaceStore := interfacestore.NewInterfaceStore() + eventBroadcaster := record.NewBroadcaster() + queue := workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "podConfigurator") + podCfg := &podConfigurator{ + kubeClient: testClients.kubeClient, + ofClient: testClients.ofClient, + podLister: testClients.podLister, + podListerSynced: testClients.podListerSynced, + ifaceStore: interfaceStore, + eventBroadcaster: eventBroadcaster, + recorder: testClients.recorder, + unreadyPortQueue: queue, + containerAccess: newContainerAccessArbitrator(), + } + if waiter != nil { + podCfg.podUpdateNotifier = waiter.notifier + } + return podCfg +} + +func TestUpdateUnreadyPod(t *testing.T) { + mockRetryInterval(t) + + for _, tc := range []struct { + name string + ofPortAssigned bool + podIfaceIsCached bool + installFlow bool + flowInstalled bool + installOpenFlowErr error + expErr string + expEvent string + }{ + { + name: "updated Port is not in interface store", + podIfaceIsCached: false, + installFlow: false, + }, { + name: "OpenFlow port is not assigned", + podIfaceIsCached: true, + ofPortAssigned: false, + installFlow: false, + expErr: "pod's OpenFlow port is not ready yet", + expEvent: "Warning NetworkNotReady Pod network forwarding rules not installed", + }, { + name: "failed to install OpenFlow entries for updated Port", + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expErr: "failed to add Openflow entries for OVS port test: failure to install flow", + expEvent: "Warning NetworkNotReady Pod network forwarding rules not installed", + }, { + name: "succeeded", + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: nil, + expEvent: "Normal NetworkReady Installed Pod network forwarding rules", + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID, stopCh) + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + + flowInstalled := false + + ifConfig := interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + } + + if tc.ofPortAssigned { + ifConfig.OVSPortConfig.OFPort = int32(1) + } + + if tc.podIfaceIsCached { + configurator.ifaceStore.AddInterface(&ifConfig) + } + + if tc.installFlow { + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + if tc.installOpenFlowErr == nil { + flowInstalled = true + } + } + + err := configurator.updateUnreadyPod(podIfName) + if tc.expErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expErr) + } + + if flowInstalled { + assert.True(t, waiter.waitUntil(5*time.Second)) + } + + var gotEvent string + select { + case gotEvent = <-testClients.recorder.Events: + default: + } + require.Equal(t, tc.expEvent, gotEvent) + }) + } +} + +func TestProcessNextWorkItem(t *testing.T) { + mockRetryInterval(t) + + for _, tc := range []struct { + name string + installOpenFlowErr error + expEvent string + expRequeue bool + }{ + { + name: "failed to install OpenFlow entries for updated Port", + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expRequeue: true, + }, { + name: "succeeded", + installOpenFlowErr: nil, + expRequeue: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID, stopCh) + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + defer configurator.unreadyPortQueue.ShutDown() + + configurator.ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + OFPort: int32(1), + }, + IPs: podIPs, + MAC: podMac, + }) + + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + configurator.unreadyPortQueue.Add(podIfName) + + configurator.processNextWorkItem() + + if tc.installOpenFlowErr != nil { + require.Equal(t, 1, configurator.unreadyPortQueue.Len()) + key, _ := configurator.unreadyPortQueue.Get() + assert.Equal(t, key, podIfName) + } else { + require.Equal(t, 0, configurator.unreadyPortQueue.Len()) + } + }) + } +} + +func TestProcessPortStatusMessage(t *testing.T) { + validOFPort := int32(1) + invalidOFPort := int32(0) + for _, tc := range []struct { + name string + status *openflow15.PortStatus + ovsPortName string + ifaceInStore bool + expEnqueue bool + expOFportNumber *int32 + }{ + { + name: "Add OF port if port status is live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: true, + expOFportNumber: &validOFPort, + }, { + name: "Add OF port with suffix in name", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: true, + expOFportNumber: &validOFPort, + }, { + name: "Ignore OF port if port is not live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LINK_DOWN, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: false, + expOFportNumber: &invalidOFPort, + }, { + name: "Not enqueue OF port status message if the interface config does not exist", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: false, + expEnqueue: false, + expOFportNumber: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + queue := workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "podMonitor") + podCfg := &podConfigurator{ + ifaceStore: interfacestore.NewInterfaceStore(), + statusCh: make(chan *openflow15.PortStatus), + unreadyPortQueue: queue, + containerAccess: newContainerAccessArbitrator(), + } + defer podCfg.unreadyPortQueue.ShutDown() + + if tc.ifaceInStore { + podCfg.ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + }) + } + + podCfg.processPortStatusMessage(tc.status) + if tc.expEnqueue { + require.Equal(t, 1, queue.Len()) + key, _ := queue.Get() + assert.Equal(t, tc.ovsPortName, key) + } else { + require.Equal(t, 0, queue.Len()) + } + + if tc.expOFportNumber != nil { + ifaceCfg, ok := podCfg.ifaceStore.GetInterfaceByName(podIfName) + require.True(t, ok) + assert.Equal(t, *tc.expOFportNumber, ifaceCfg.OFPort) + } + }) + } +} diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 44734e4f20e..ff24e220690 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -18,15 +18,30 @@ package cniserver import ( - "fmt" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/types" - "antrea.io/antrea/pkg/util/k8s" +) + +var ( + workerName = "podConfigurator" +) + +const ( + podNotReadyTimeInSeconds = 30 * time.Second ) // connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously @@ -34,29 +49,16 @@ import ( // CNI call completes. func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error { ovsPortName := ifConfig.InterfaceName + // Add the OVS port into the queue after 30s in case the OFPort is still not ready. This + // operation is performed before we update OVSDB, otherwise we + // need to think about the race condition between the current goroutine with the listener. + // It may generate a duplicated PodIsReady event if the Pod's OpenFlow entries are installed + // before the time, then the library shall merge the event. + pc.unreadyPortQueue.AddAfter(ovsPortName, podNotReadyTimeInSeconds) return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error { if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil { return err } - ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true) - if err != nil { - return err - } - containerID := ifConfig.ContainerID - klog.V(2).Infof("Setting up Openflow entries for container %s", containerID) - if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil { - return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err) - } - // Update interface config with the ofPort. - ifConfig.OVSPortConfig.OFPort = ofPort - // Notify the Pod update event to required components. - event := types.PodUpdate{ - PodName: ifConfig.PodName, - PodNamespace: ifConfig.PodNamespace, - IsAdd: true, - ContainerID: ifConfig.ContainerID, - } - pc.podUpdateNotifier.Notify(event) return nil }) } @@ -75,7 +77,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // Because of this, we need to wait asynchronously for the interface to be created: we create the OVS port // and set the OVS Interface type "" first, and change the OVS Interface type to "internal" to connect to the // container interface after it is created. After OVS connects to the container interface, an OFPort is allocated. - klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) + klog.V(2).InfoS("Adding OVS port for container", "port", ovsPortName, "container", containerID) ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig) portUUID, err := pc.createOVSPort(ovsPortName, ovsAttachInfo, containerConfig.VLANID) if err != nil { @@ -105,7 +107,7 @@ func (pc *podConfigurator) configureInterfaces( // See: https://github.com/kubernetes/kubernetes/issues/57253#issuecomment-358897721. interfaceConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if found { - klog.V(2).Infof("Found an existing OVS port for container %s, returning", containerID) + klog.V(2).InfoS("Found an existing OVS port for container, returning", "container", containerID) mac := interfaceConfig.MAC.String() hostIface := ¤t.Interface{ Name: interfaceConfig.InterfaceName, @@ -128,9 +130,61 @@ func (pc *podConfigurator) configureInterfaces( func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) { for i := range ifConfigs { ifaceConfig := ifConfigs[i] - pod := k8s.NamespacedName(ifaceConfig.PodNamespace, ifaceConfig.PodName) if err := pc.connectInterfaceToOVSAsync(ifaceConfig, containerAccess); err != nil { - klog.Errorf("Failed to reconcile Pod %s: %v", pod, err) + klog.ErrorS(err, "Failed to reconcile Pod", "Pod", klog.KRef(ifaceConfig.PodNamespace, ifaceConfig.PodNamespace)) } } } + +// initPortStatusMonitor has subscribed a channel to listen for the OpenFlow PortStatus message, and it also +// initiates the Pod recorder. +func (pc *podConfigurator) initPortStatusMonitor(podInformer cache.SharedIndexInformer) { + pc.podLister = v1.NewPodLister(podInformer.GetIndexer()) + pc.podListerSynced = podInformer.HasSynced + pc.unreadyPortQueue = workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), workerName) + eventBroadcaster := record.NewBroadcaster() + pc.eventBroadcaster = eventBroadcaster + pc.recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: "AntreaPodConfigurator"}, + ) + pc.statusCh = make(chan *openflow15.PortStatus, 100) + pc.ofClient.SubscribeOFPortStatusMessage(pc.statusCh) +} + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + defer pc.unreadyPortQueue.ShutDown() + + klog.Infof("Starting %s", workerName) + defer klog.Infof("Shutting down %s", workerName) + + if !cache.WaitForNamedCacheSync("podConfigurator", stopCh, pc.podListerSynced) { + return + } + pc.eventBroadcaster.StartStructuredLogging(0) + pc.eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{ + Interface: pc.kubeClient.CoreV1().Events(""), + }) + defer pc.eventBroadcaster.Shutdown() + + go wait.Until(pc.worker, time.Second, stopCh) + + for { + select { + case status := <-pc.statusCh: + klog.V(2).InfoS("Received PortStatus message", "message", status) + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + pc.processPortStatusMessage(status) + case <-stopCh: + return + } + } +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the workqueue. +func (pc *podConfigurator) worker() { + for pc.processNextWorkItem() { + } +} diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index a4176baba15..70e95c93ca2 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -26,7 +26,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient, interfaceStore interfacestore.InterfaceStore) (*podConfigurator, error) { - pc, err := newPodConfigurator(ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil) + pc, err := newPodConfigurator(nil, ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil, nil) if err == nil { pc.isSecondaryNetwork = true } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index bc77dcc7224..253c9ec2065 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -111,6 +112,7 @@ type CNIServer struct { serverVersion string nodeConfig *config.NodeConfig hostProcPathPrefix string + podInformer cache.SharedIndexInformer kubeClient clientset.Interface containerAccess *containerAccessArbitrator podConfigurator *podConfigurator @@ -628,6 +630,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( func New( cniSocket, hostProcPathPrefix string, nodeConfig *config.NodeConfig, + podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, @@ -639,6 +642,7 @@ func New( serverVersion: cni.AntreaCNIVersion, nodeConfig: nodeConfig, hostProcPathPrefix: hostProcPathPrefix, + podInformer: podInformer, kubeClient: kubeClient, containerAccess: newContainerAccessArbitrator(), routeClient: routeClient, @@ -660,9 +664,9 @@ func (s *CNIServer) Initialize( ) error { var err error s.podConfigurator, err = newPodConfigurator( - ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, + s.kubeClient, ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), - s.disableTXChecksumOffload, podUpdateNotifier) + s.disableTXChecksumOffload, podUpdateNotifier, s.podInformer, s.containerAccess) if err != nil { return fmt.Errorf("error during initialize podConfigurator: %v", err) } @@ -676,6 +680,8 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { klog.InfoS("Starting CNI server") defer klog.InfoS("Shutting down CNI server") + go s.podConfigurator.Run(stopCh) + listener, err := util.ListenLocalSocket(s.cniSocket) if err != nil { klog.Fatalf("Failed to bind on %s: %v", s.cniSocket, err) diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 856024dd32c..1c5161c9b51 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -95,7 +95,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -106,7 +106,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -119,7 +119,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + podConfigurator, err := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -201,7 +201,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} @@ -612,18 +612,18 @@ func TestCmdCheck(t *testing.T) { func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 70487b083e8..916084aca90 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -22,13 +22,15 @@ import ( "testing" "time" + "antrea.io/libOpenflow/openflow15" "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - "k8s.io/apimachinery/pkg/util/wait" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -37,12 +39,11 @@ import ( "antrea.io/antrea/pkg/agent/cniserver/types" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" - openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - agenttypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" winnettest "antrea.io/antrea/pkg/agent/util/winnet/testing" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" + "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" ) @@ -173,17 +174,15 @@ type hnsTestUtil struct { hnsEndpoint *hcsshim.HNSEndpoint hcnEndpoint *hcn.HostComputeEndpoint isDocker bool - isAttached bool hnsEndpointCreatErr error endpointAttachErr error } -func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker, isAttached bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { +func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { return &hnsTestUtil{ endpointID: endpointID, existingHnsEndpoints: existingHnsEndpoints, isDocker: isDocker, - isAttached: isAttached, hnsEndpointCreatErr: hnsEndpointCreatErr, endpointAttachErr: endpointAttachErr, } @@ -217,9 +216,6 @@ func (t *hnsTestUtil) deleteHnsEndpoint(endpoint *hcsshim.HNSEndpoint) (*hcsshim func (t *hnsTestUtil) attachEndpointInNamespace(ep *hcn.HostComputeEndpoint, namespace string) error { t.hcnEndpoint.HostComputeNamespace = namespace - if t.endpointAttachErr == nil { - t.addHostInterface() - } return t.endpointAttachErr } @@ -257,9 +253,10 @@ func (t *hnsTestUtil) addHostInterface() { }() } -func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { +func newMockCNIServer(t *testing.T, controller *gomock.Controller, clients *mockClients, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { + kubeClient := fakeclientset.NewSimpleClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient mockRoute = routetest.NewMockInterface(controller) mockWinnet = winnettest.NewMockInterface(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -269,7 +266,8 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier, clients.localPodInformer, cniServer.containerAccess) cniServer.podConfigurator.ifConfigurator.(*ifConfigurator).winnet = mockWinnet return cniServer } @@ -315,7 +313,6 @@ func TestCmdAdd(t *testing.T) { hnsEndpointCreateErr error endpointAttachErr error ifaceExist bool - isAttached bool existingHnsEndpoints []hcsshim.HNSEndpoint endpointExists bool connectOVS bool @@ -341,7 +338,6 @@ func TestCmdAdd(t *testing.T) { oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, - isAttached: true, }, { name: "containerd-attach-failure", podName: "pod10", @@ -364,13 +360,23 @@ func TestCmdAdd(t *testing.T) { ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) + stopCh := make(chan struct{}) + defer close(stopCh) isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() - waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) - server := newMockCNIServer(t, controller, waiter.notifier) + waiter := newAsyncWaiter(tc.podName, tc.infraContainerID, stopCh) + clients := newMockClients(controller, nodeName, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: tc.podName, Namespace: testPodNamespace}, + Spec: corev1.PodSpec{NodeName: nodeName}, + }) + clients.startInformers(stopCh) + + server := newMockCNIServer(t, controller, clients, waiter.notifier) + go server.podConfigurator.Run(stopCh) + requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) @@ -389,10 +395,29 @@ func TestCmdAdd(t *testing.T) { } ovsPortID := generateUUID() if tc.connectOVS { + ofPortNumber := uint32(100) + portStatusCh := server.podConfigurator.statusCh mockOVSBridgeClient.EXPECT().CreatePort(ovsPortName, ovsPortName, gomock.Any()).Return(ovsPortID, nil).Times(1) - mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(ovsPortName, true).Return(int32(100), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + go func() { + time.Sleep(time.Millisecond * 50) + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: ofPortNumber, + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + }() + return nil + }, + ) + mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), uint32(ofPortNumber), gomock.Any(), gomock.Any()).Return(nil) mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) } resp, err := server.CmdAdd(ctx, requestMsg) @@ -421,19 +446,9 @@ func TestCmdAdd(t *testing.T) { _, exists := ifaceStore.GetContainerInterface(containerID) assert.Equal(t, exists, tc.containerIfaceExist) if tc.connectOVS { - waiter.wait() - // Wait for the completion of async function "setInterfaceMTUFunc", otherwise it may lead to the - // race condition failure. - wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, - func(ctx context.Context) (done bool, err error) { - mtuSet, exist := hostIfaces.Load(ovsPortName) - if !exist { - return false, nil - } - return mtuSet.(bool), nil - }) + testUtil.addHostInterface() + assert.True(t, waiter.waitUntil(5*time.Second)) } - waiter.close() }) } } @@ -480,6 +495,8 @@ func TestCmdDel(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) @@ -491,11 +508,13 @@ func TestCmdDel(t *testing.T) { if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) } - testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, true, nil, nil) + testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, nil, nil) testUtil.setFunctions() defer testUtil.restore() - waiter := newAsyncWaiter(testPodNameA, containerID) - server := newMockCNIServer(t, controller, waiter.notifier) + waiter := newAsyncWaiter(testPodNameA, containerID, stopCh) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + server := newMockCNIServer(t, controller, clients, waiter.notifier) ovsPortID := generateUUID() if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(hnsEndpoint) @@ -530,9 +549,8 @@ func TestCmdDel(t *testing.T) { assert.False(t, exists) } if tc.disconnectOVS { - waiter.wait() + assert.True(t, waiter.waitUntil(5*time.Second)) } - waiter.close() }) } } @@ -665,12 +683,16 @@ func TestCmdCheck(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) defer mockGetNetInterfaceByName(tc.netInterface)() - cniserver := newMockCNIServer(t, controller, channel.NewSubscribableChannel("podUpdate", 100)) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + cniserver := newMockCNIServer(t, controller, clients, channel.NewSubscribableChannel("podUpdate", 100)) requestMsg, _ := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.containerID, tc.netns, tc.prevResult) ipamMock.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1) ifaceStore.AddInterface(tc.existingIface) @@ -685,68 +707,38 @@ func TestCmdCheck(t *testing.T) { } } -type asyncWaiter struct { - podName string - containerID string - waitCh chan struct{} - stopCh chan struct{} - notifier *channel.SubscribableChannel -} - -func (w *asyncWaiter) notify(e interface{}) { - podUpdate := e.(agenttypes.PodUpdate) - if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { - w.waitCh <- struct{}{} - } -} - -func (w *asyncWaiter) wait() { - <-w.waitCh -} - -func (w *asyncWaiter) close() { - close(w.waitCh) - close(w.stopCh) -} - -func newAsyncWaiter(podName, containerID string) *asyncWaiter { - waiter := &asyncWaiter{ - podName: podName, - containerID: containerID, - waitCh: make(chan struct{}), - stopCh: make(chan struct{}), - notifier: channel.NewSubscribableChannel("PodUpdate", 100), - } - waiter.notifier.Subscribe(waiter.notify) - go waiter.notifier.Run(waiter.stopCh) - return waiter -} - func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + clients := newMockClients(controller, nodeName, pod1, pod2, pod3) + clients.startInformers(stopCh) + kubeClient := clients.kubeClient mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") - testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) + testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() defer testUtil.restore() + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) + waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier, clients.localPodInformer, cniServer.containerAccess) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} + go cniServer.podConfigurator.Run(stopCh) // Re-install Pod1 flows podFlowsInstalled := make(chan string, 2) @@ -760,8 +752,24 @@ func TestReconcile(t *testing.T) { mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + time.Sleep(time.Millisecond * 50) + portStatusCh := cniServer.podConfigurator.statusCh + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: uint32(5), + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + return nil + }, + ) mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName @@ -778,8 +786,7 @@ func TestReconcile(t *testing.T) { break } } - waiter.wait() - waiter.close() + assert.True(t, waiter.waitUntil(5*time.Second)) } func getHnsEndpoint(id, name string) *hcsshim.HNSEndpoint { diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index bba8cf2068a..5971ee4f008 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -105,6 +105,11 @@ func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) { } } +// UpdateInterface updates interfaceConfig into local cache. +func (c *interfaceCache) UpdateInterface(interfaceConfig *InterfaceConfig) { + c.cache.Update(interfaceConfig) +} + // DeleteInterface deletes interface from local cache. func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { c.cache.Delete(interfaceConfig) diff --git a/pkg/agent/interfacestore/interface_cache_test.go b/pkg/agent/interfacestore/interface_cache_test.go index bfbf2cacca6..69ef7227056 100644 --- a/pkg/agent/interfacestore/interface_cache_test.go +++ b/pkg/agent/interfacestore/interface_cache_test.go @@ -294,3 +294,51 @@ func newExternalEntityInterface(name string, entityIPs []net.IP, entityName stri }, } } + +func TestUpdateStore(t *testing.T) { + store := NewInterfaceStore() + mac, _ := net.ParseMAC("aa:aa:aa:aa:aa:aa") + ifConfig := &InterfaceConfig{ + Type: ContainerInterface, + InterfaceName: "interface1", + IPs: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")}, + MAC: mac, + VLANID: 1023, + OVSPortConfig: &OVSPortConfig{ + PortUUID: "12345678", + }, + ContainerInterfaceConfig: &ContainerInterfaceConfig{ + ContainerID: "aaaaaaa", + PodNamespace: "default", + PodName: "p1", + IFDev: "eth0", + }, + TunnelInterfaceConfig: &TunnelInterfaceConfig{ + Type: AntreaIPsecTunnel, + NodeName: "n1", + LocalIP: net.ParseIP("10.10.10.10"), + RemoteIP: net.ParseIP("20.20.20.20"), + DestinationPort: 443, + RemoteName: "n2", + PSK: "abcdefg", + Csum: true, + }, + EntityInterfaceConfig: &EntityInterfaceConfig{ + EntityName: "e1", + EntityNamespace: "ns1", + UplinkPort: &OVSPortConfig{ + PortUUID: "87654321", + OFPort: 1025, + }, + }, + } + store.AddInterface(ifConfig) + oldCfg, ok := store.GetInterfaceByName("interface1") + require.True(t, ok) + newCfg := oldCfg.DeepCopy() + newCfg.OVSPortConfig.OFPort = 1024 + store.UpdateInterface(newCfg) + ifConfig2, ok := store.GetInterfaceByName("interface1") + require.True(t, ok) + assert.Equal(t, int32(1024), ifConfig2.OFPort) +} diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index e45da47f514..af84f45feee 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -277,3 +277,15 @@ func (mr *MockInterfaceStoreMockRecorder) ListInterfaces() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListInterfaces", reflect.TypeOf((*MockInterfaceStore)(nil).ListInterfaces)) } + +// UpdateInterface mocks base method. +func (m *MockInterfaceStore) UpdateInterface(arg0 *interfacestore.InterfaceConfig) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateInterface", arg0) +} + +// UpdateInterface indicates an expected call of UpdateInterface. +func (mr *MockInterfaceStoreMockRecorder) UpdateInterface(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInterface", reflect.TypeOf((*MockInterfaceStore)(nil).UpdateInterface), arg0) +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 9d0ca2afe57..fcbbb8fc063 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -55,11 +55,13 @@ func (t InterfaceType) String() string { return strconv.Itoa(int(t)) } +// +k8s:deepcopy-gen=true type OVSPortConfig struct { PortUUID string OFPort int32 } +// +k8s:deepcopy-gen=true type ContainerInterfaceConfig struct { ContainerID string PodName string @@ -68,6 +70,7 @@ type ContainerInterfaceConfig struct { IFDev string } +// +k8s:deepcopy-gen=true type TunnelInterfaceConfig struct { Type ovsconfig.TunnelType // Name of the remote Node. @@ -87,6 +90,7 @@ type TunnelInterfaceConfig struct { Csum bool } +// +k8s:deepcopy-gen=true type EntityInterfaceConfig struct { EntityName string EntityNamespace string @@ -94,6 +98,7 @@ type EntityInterfaceConfig struct { UplinkPort *OVSPortConfig } +// +k8s:deepcopy-gen=true type InterfaceConfig struct { Type InterfaceType // Unique name of the interface, also used for the OVS port name. @@ -113,6 +118,7 @@ type InterfaceConfig struct { type InterfaceStore interface { Initialize(interfaces []*InterfaceConfig) AddInterface(interfaceConfig *InterfaceConfig) + UpdateInterface(interfaceConfig *InterfaceConfig) ListInterfaces() []*InterfaceConfig DeleteInterface(interfaceConfig *InterfaceConfig) GetInterface(interfaceKey string) (*InterfaceConfig, bool) diff --git a/pkg/agent/interfacestore/zz_generated.deepcopy.go b/pkg/agent/interfacestore/zz_generated.deepcopy.go new file mode 100644 index 00000000000..5071c0555c4 --- /dev/null +++ b/pkg/agent/interfacestore/zz_generated.deepcopy.go @@ -0,0 +1,155 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package interfacestore + +import ( + net "net" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerInterfaceConfig) DeepCopyInto(out *ContainerInterfaceConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerInterfaceConfig. +func (in *ContainerInterfaceConfig) DeepCopy() *ContainerInterfaceConfig { + if in == nil { + return nil + } + out := new(ContainerInterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EntityInterfaceConfig) DeepCopyInto(out *EntityInterfaceConfig) { + *out = *in + if in.UplinkPort != nil { + in, out := &in.UplinkPort, &out.UplinkPort + *out = new(OVSPortConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EntityInterfaceConfig. +func (in *EntityInterfaceConfig) DeepCopy() *EntityInterfaceConfig { + if in == nil { + return nil + } + out := new(EntityInterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InterfaceConfig) DeepCopyInto(out *InterfaceConfig) { + *out = *in + if in.IPs != nil { + in, out := &in.IPs, &out.IPs + *out = make([]net.IP, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + } + } + if in.MAC != nil { + in, out := &in.MAC, &out.MAC + *out = make(net.HardwareAddr, len(*in)) + copy(*out, *in) + } + if in.OVSPortConfig != nil { + in, out := &in.OVSPortConfig, &out.OVSPortConfig + *out = new(OVSPortConfig) + **out = **in + } + if in.ContainerInterfaceConfig != nil { + in, out := &in.ContainerInterfaceConfig, &out.ContainerInterfaceConfig + *out = new(ContainerInterfaceConfig) + **out = **in + } + if in.TunnelInterfaceConfig != nil { + in, out := &in.TunnelInterfaceConfig, &out.TunnelInterfaceConfig + *out = new(TunnelInterfaceConfig) + (*in).DeepCopyInto(*out) + } + if in.EntityInterfaceConfig != nil { + in, out := &in.EntityInterfaceConfig, &out.EntityInterfaceConfig + *out = new(EntityInterfaceConfig) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InterfaceConfig. +func (in *InterfaceConfig) DeepCopy() *InterfaceConfig { + if in == nil { + return nil + } + out := new(InterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OVSPortConfig) DeepCopyInto(out *OVSPortConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OVSPortConfig. +func (in *OVSPortConfig) DeepCopy() *OVSPortConfig { + if in == nil { + return nil + } + out := new(OVSPortConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TunnelInterfaceConfig) DeepCopyInto(out *TunnelInterfaceConfig) { + *out = *in + if in.LocalIP != nil { + in, out := &in.LocalIP, &out.LocalIP + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + if in.RemoteIP != nil { + in, out := &in.RemoteIP, &out.RemoteIP + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TunnelInterfaceConfig. +func (in *TunnelInterfaceConfig) DeepCopy() *TunnelInterfaceConfig { + if in == nil { + return nil + } + out := new(TunnelInterfaceConfig) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e6d3ec5ef87..08e066b07c9 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -408,6 +408,9 @@ type Client interface { // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error + + // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. + SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) } // GetFlowTableStatus returns an array of flow table status. @@ -1697,3 +1700,7 @@ func (c *client) getMeterStats() { klog.ErrorS(err, "Failed to get OVS meter stats") } } + +func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + c.bridge.SubscribePortStatusConsumer(statusCh) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 658193b1220..ab5bd20dcca 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2906,3 +2906,14 @@ func TestCachedFlowIsDrop(t *testing.T) { msg = flows[0].GetMessage().(*openflow15.FlowMod) assert.False(t, isDropFlow(msg)) } + +func TestSubscribeOFPortStatusMessage(t *testing.T) { + ctrl := gomock.NewController(t) + ch := make(chan *openflow15.PortStatus) + bridge := ovsoftest.NewMockBridge(ctrl) + c := client{ + bridge: bridge, + } + bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) + c.SubscribeOFPortStatusMessage(ch) +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 99c21e47119..9bfe4b78970 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import ( openflow0 "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" ofctrl "antrea.io/ofnet/ofctrl" @@ -848,6 +849,18 @@ func (mr *MockClientMockRecorder) StartPacketInHandler(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartPacketInHandler", reflect.TypeOf((*MockClient)(nil).StartPacketInHandler), arg0) } +// SubscribeOFPortStatusMessage mocks base method. +func (m *MockClient) SubscribeOFPortStatusMessage(arg0 chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribeOFPortStatusMessage", arg0) +} + +// SubscribeOFPortStatusMessage indicates an expected call of SubscribeOFPortStatusMessage. +func (mr *MockClientMockRecorder) SubscribeOFPortStatusMessage(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeOFPortStatusMessage", reflect.TypeOf((*MockClient)(nil).SubscribeOFPortStatusMessage), arg0) +} + // SubscribePacketIn mocks base method. func (m *MockClient) SubscribePacketIn(arg0 byte, arg1 *openflow0.PacketInQueue) error { m.ctrl.T.Helper() diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 1c5c60fd22d..edf8ced802b 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -146,6 +146,9 @@ type Bridge interface { ResumePacket(packetIn *ofctrl.PacketIn) error // BuildPacketOut returns a new PacketOutBuilder. BuildPacketOut() PacketOutBuilder + // SubscribePortStatusConsumer registers a consumer to listen to OpenFlow PortStatus message. + // We only support a single consumer for now. + SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) } // TableStatus represents the status of a specific flow table. The status is useful for debugging. diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c0f6ede4daa..da46f7e5ee8 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -202,9 +202,14 @@ type OFBridge struct { // pktConsumers is a map from PacketIn category to the channel that is used to publish the PacketIn message. pktConsumers sync.Map + // portStatusConsumerCh is a channel to notify agent a PortStatus message is received + portStatusConsumerCh chan *openflow15.PortStatus + // portStatusMutex is to guard the access on portStatusConsumerCh. + portStatusMutex sync.RWMutex + mpReplyChsMutex sync.RWMutex mpReplyChs map[uint32]chan *openflow15.MultipartReply - // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metedata, + // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metadata, // and value is the length configured in this tunnel metadata. tunMetadataLengthMap map[uint16]uint8 } @@ -718,6 +723,31 @@ func (b *OFBridge) RetryInterval() time.Duration { return b.retryInterval } +func (b *OFBridge) PortStatusRcvd(status *openflow15.PortStatus) { + b.portStatusMutex.RLock() + defer b.portStatusMutex.RUnlock() + if b.portStatusConsumerCh == nil { + return + } + // Correspond to MessageStream.outbound log level. + if klog.V(7).Enabled() { + klog.InfoS("Received PortStatus", "portStatus", status) + } else { + klog.V(4).InfoS("Received PortStatus") + } + switch status.Reason { + // We only process add/modified status for now. + case openflow15.PR_ADD, openflow15.PR_MODIFY: + b.portStatusConsumerCh <- status + } +} + +func (b *OFBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + b.portStatusMutex.Lock() + defer b.portStatusMutex.Unlock() + b.portStatusConsumerCh = statusCh +} + func (b *OFBridge) setPacketInFormatTo2() { b.ofSwitch.SetPacketInFormat(openflow15.OFPUTIL_PACKET_IN_NXT2) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 1241cf1d888..570f885b772 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -341,6 +341,18 @@ func (mr *MockBridgeMockRecorder) SubscribePacketIn(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockBridge)(nil).SubscribePacketIn), arg0, arg1) } +// SubscribePortStatusConsumer mocks base method. +func (m *MockBridge) SubscribePortStatusConsumer(arg0 chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribePortStatusConsumer", arg0) +} + +// SubscribePortStatusConsumer indicates an expected call of SubscribePortStatusConsumer. +func (mr *MockBridgeMockRecorder) SubscribePortStatusConsumer(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePortStatusConsumer", reflect.TypeOf((*MockBridge)(nil).SubscribePortStatusConsumer), arg0) +} + // MockTable is a mock of Table interface. type MockTable struct { ctrl *gomock.Controller diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 3ce87ad6b10..046e6640ec0 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -575,6 +575,7 @@ func newTester() *cmdAddDelTester { tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), + nil, k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, @@ -743,6 +744,7 @@ func setupChainTest( server = cniserver.New(testSock, "", testNodeConfig, + nil, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, @@ -933,6 +935,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { testSock, "", testNodeConfig, + nil, k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, From 076894defcdc25685bbc6cc7315f27aef49f2971 Mon Sep 17 00:00:00 2001 From: Wenying Dong Date: Mon, 30 Dec 2024 19:15:01 -0800 Subject: [PATCH 2/4] [Windows] Support OF port state "down" when installing Pod forwarding rules (#6889) On Windows, OVS has an issue which doesn't correctly update the port status after an OpenFlow port is successfully installed. So OVS may send out PortStatus message with Port state as LIND_DOWN, this issue doesn't impact on the datapath forwarding. This change is a workaround to ensure Pod's OpenFlow entries are installed as long as the OpenFlow port is allocated. Signed-off-by: Wenying Dong --- pkg/agent/cniserver/pod_configuration.go | 20 ++++++++++++++++--- pkg/agent/cniserver/pod_configuration_test.go | 2 +- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 93b75b5d34d..95f8ca3de4c 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -680,12 +680,26 @@ func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConf } func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) { - // Update Pod OpenFlow entries only after the OpenFlow port state is live. - if status.Desc.State != openflow15.PS_LIVE { + ofPort := status.Desc.PortNo + state := status.Desc.State + // Update Pod OpenFlow entries only after the OpenFlow port state is live or down. + // Accepting Port state "openflow15.PS_LINK_DOWN" is a workaround for Windows OVS issue https://github.com/openvswitch/ovs-issues/issues/351. + // In which OVS does not correctly implement function netdev_windows_update_flags, so OVS doesn't update ifp_flags + // after a new OpenFlow port is successfully installed. Since this OVS issue doesn't have side impact on datapath + // packets forwarding, antrea-agent will ignore the bad state to ensure the Pod's OpenFlow entries are installed as + // long as the port number is allocated. + if state != openflow15.PS_LIVE && state != openflow15.PS_LINK_DOWN { + klog.InfoS("Ignoring the OVS port status message with undesired state", "ofPort", ofPort, "state", state) + return + } + + if ofPort == 0 { + klog.InfoS("Ignoring the OVS port status message with undesired port number", "ofPort", ofPort, "state", state) return } + ovsPort := string(bytes.Trim(status.Desc.Name, "\x00")) - ofPort := status.Desc.PortNo + klog.InfoS("Processing OVS port status message", "ovsPort", ovsPort, "ofPort", ofPort, "state", state) ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) if !found { diff --git a/pkg/agent/cniserver/pod_configuration_test.go b/pkg/agent/cniserver/pod_configuration_test.go index 10457b97175..21dd929250f 100644 --- a/pkg/agent/cniserver/pod_configuration_test.go +++ b/pkg/agent/cniserver/pod_configuration_test.go @@ -390,7 +390,7 @@ func TestProcessPortStatusMessage(t *testing.T) { PortNo: 1, Length: 72, Name: []byte(fmt.Sprintf("%s\x00", podIfName)), - State: openflow15.PS_LINK_DOWN, + State: openflow15.PS_BLOCKED, }, }, ovsPortName: podIfName, From 85c9631e7968dc6a2ea9f7720ee861b5f6a8d1a9 Mon Sep 17 00:00:00 2001 From: Gavin Xin Date: Tue, 29 Oct 2024 05:02:26 +0800 Subject: [PATCH 3/4] Update Jenkins Scripts for Testbed Migration (#6751) * Upgraded Windows images to restore CI on the latest kubernetes cluster * Updated external node e2e test and jenkins script for testbed migration. Signed-off-by: Gavin Xin --- ci/jenkins/test-vm.sh | 15 ++++++++++----- ci/jenkins/test.sh | 28 ++++++++-------------------- test/e2e/vmagent_test.go | 2 +- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/ci/jenkins/test-vm.sh b/ci/jenkins/test-vm.sh index 380c8cbd4a6..5ccd4640c0a 100755 --- a/ci/jenkins/test-vm.sh +++ b/ci/jenkins/test-vm.sh @@ -48,10 +48,10 @@ Run K8s e2e community tests (Conformance & Network Policy) or Antrea e2e tests o # VM configuration declare -A LINUX_HOSTS_TO_IP declare -A WINDOWS_HOSTS_TO_IP -declare -a LIN_HOSTNAMES=("vmbmtest0-1" "vmbmtest0-redhat-0") -declare -a WIN_HOSTNAMES=("vmbmtest0-win-0") -declare -A LINUX_HOSTS_TO_USERNAME=(["vmbmtest0-1"]="ubuntu" ["vmbmtest0-redhat-0"]="root") -declare -A WINDOWS_HOSTS_TO_USERNAME=(["vmbmtest0-win-0"]="Administrator") +declare -a LIN_HOSTNAMES=("vmbm0-1" "vmbm0-redhat-0") +declare -a WIN_HOSTNAMES=("vmbm0-win-0") +declare -A LINUX_HOSTS_TO_USERNAME=(["vmbm0-1"]="ubuntu" ["vmbm0-redhat-0"]="root") +declare -A WINDOWS_HOSTS_TO_USERNAME=(["vmbm0-win-0"]="Administrator") # To run kubectl cmds export KUBECONFIG=${KUBECONFIG_PATH} @@ -143,10 +143,15 @@ function apply_antrea { fi TEMP_ANTREA_TAR="antrea-image.tar" docker save antrea/antrea-agent-ubuntu:latest antrea/antrea-controller-ubuntu:latest -o $TEMP_ANTREA_TAR - ctr -n k8s.io image import $TEMP_ANTREA_TAR + kubectl get nodes --selector=kubernetes.io/os=linux --no-headers=true -o custom-columns=IP:.status.addresses[0].address | while read -r IP; do + rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" $TEMP_ANTREA_TAR jenkins@${IP}:${WORKDIR}/$TEMP_ANTREA_TAR + ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "crictl rmi --prune; crictl ps --state Exited; ctr -n=k8s.io images import ${WORKDIR}/$TEMP_ANTREA_TAR" || true + done rm $TEMP_ANTREA_TAR echo "====== Applying Antrea yaml ======" ./hack/generate-manifest.sh --feature-gates ExternalNode=true,SupportBundleCollection=true --extra-helm-values "controller.apiNodePort=32767" > ${WORKDIR}/antrea.yml + IP=$(kubectl get nodes --selector=node-role.kubernetes.io/control-plane= --no-headers=true -o custom-columns=IP:.status.addresses[0].address) + rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" ${WORKDIR}/*.yml jenkins@${IP}:${WORKDIR}/ kubectl apply -f ${WORKDIR}/antrea.yml } diff --git a/ci/jenkins/test.sh b/ci/jenkins/test.sh index 0428b1e26cf..1f49250f1b6 100755 --- a/ci/jenkins/test.sh +++ b/ci/jenkins/test.sh @@ -429,12 +429,10 @@ function deliver_antrea_linux { DOCKER_REGISTRY="${DOCKER_REGISTRY}" ./hack/build-antrea-linux-all.sh --pull docker save -o antrea-ubuntu.tar antrea/antrea-agent-ubuntu:latest antrea/antrea-controller-ubuntu:latest echo "===== Pull necessary images on Control-Plane node =====" - harbor_images=("agnhost:2.13" "nginx:1.15-alpine") - antrea_images=("e2eteam/agnhost:2.13" "docker.io/library/nginx:1.15-alpine") - common_images=("registry.k8s.io/e2e-test-images/agnhost:2.29") - k8s_images=("registry.k8s.io/e2e-test-images/agnhost:2.45" "registry.k8s.io/e2e-test-images/jessie-dnsutils:1.5" "registry.k8s.io/e2e-test-images/nginx:1.14-2") - conformance_images=("k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.45" "k8sprow.azurecr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.5" "k8sprow.azurecr.io/kubernetes-e2e-test-images/nginx:1.14-2") - e2e_images=("toolbox:1.3-0" "nginx:1.21.6-alpine") + common_images=("registry.k8s.io/e2e-test-images/agnhost:2.40") + k8s_images=("registry.k8s.io/e2e-test-images/agnhost:2.52" "registry.k8s.io/e2e-test-images/jessie-dnsutils:1.5" "registry.k8s.io/e2e-test-images/nginx:1.14-2") + conformance_images=("k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.52" "k8sprow.azurecr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.5" "k8sprow.azurecr.io/kubernetes-e2e-test-images/nginx:1.14-2") + e2e_images=("toolbox:1.4-0" "nginx:1.21.6-alpine") echo "===== Deliver Antrea YAML to Controller nodes =====" IP=$(kubectl get nodes -o wide --no-headers=true | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 ~ role {print $6}') @@ -456,9 +454,6 @@ function deliver_antrea_linux { rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" antrea-ubuntu.tar jenkins@${IP}:${WORKDIR}/antrea-ubuntu.tar ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "${CLEAN_STALE_IMAGES_CONTAINERD}; ${PRINT_CONTAINERD_STATUS}; ctr -n=k8s.io images import ${WORKDIR}/antrea-ubuntu.tar" || true - for i in "${!harbor_images[@]}"; do - ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "ctr -n=k8s.io images pull --user ${DOCKER_USERNAME}:${DOCKER_PASSWORD} ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} && ctr -n=k8s.io images tag ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} ${antrea_images[i]}" || true - done # Pull necessary images in advance to avoid transient error for image in "${common_images[@]}"; do ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "ctr -n=k8s.io images pull --user ${DOCKER_USERNAME}:${DOCKER_PASSWORD} ${image}" || true @@ -488,22 +483,15 @@ function deliver_antrea_windows { sed -i 's/if (!(Test-Path $AntreaAgentConfigPath))/if ($true)/' hack/windows/Helper.psm1 kubectl get nodes -o wide --no-headers=true | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 !~ role && $1 ~ /win/ {print $1}' | while read WORKER_NAME; do revert_snapshot_windows ${WORKER_NAME} - # Some tests need us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13 image but it is not for windows/amd64 10.0.17763 - # Use e2eteam/agnhost:2.13 instead - harbor_images=("agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2") - antrea_images=("e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "registry.k8s.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "e2eteam/pause:3.2") - k8s_images=("registry.k8s.io/e2e-test-images/agnhost:2.45" "registry.k8s.io/e2e-test-images/jessie-dnsutils:1.5" "registry.k8s.io/e2e-test-images/nginx:1.14-2" "registry.k8s.io/pause:3.8") - conformance_images=("k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.45" "k8sprow.azurecr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.5" "k8sprow.azurecr.io/kubernetes-e2e-test-images/nginx:1.14-2" "k8sprow.azurecr.io/kubernetes-e2e-test-images/pause:3.8") - e2e_images=("toolbox:1.3-0") + k8s_images=("registry.k8s.io/e2e-test-images/agnhost:2.52" "registry.k8s.io/e2e-test-images/jessie-dnsutils:1.5" "registry.k8s.io/e2e-test-images/nginx:1.14-2" "registry.k8s.io/pause:3.10") + conformance_images=("k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.52" "k8sprow.azurecr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.5" "k8sprow.azurecr.io/kubernetes-e2e-test-images/nginx:1.14-2" "registry.k8s.io/e2e-test-images/pause:3.10") + e2e_images=("${DOCKER_REGISTRY}/antrea/toolbox:1.4-0" "registry.k8s.io/e2e-test-images/agnhost:2.40") # Pull necessary images in advance to avoid transient error - for i in "${!harbor_images[@]}"; do - ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "ctr -n k8s.io images pull --user ${DOCKER_USERNAME}:${DOCKER_PASSWORD} ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} && ctr -n k8s.io images tag ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} ${antrea_images[i]}" || true - done for i in "${!k8s_images[@]}"; do ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "ctr -n k8s.io images pull --user ${DOCKER_USERNAME}:${DOCKER_PASSWORD} ${k8s_images[i]} && ctr -n k8s.io images tag ${k8s_images[i]} ${conformance_images[i]}" || true done for image in "${e2e_images[@]}"; do - ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "ctr -n k8s.io images pull --user ${DOCKER_USERNAME}:${DOCKER_PASSWORD} ${DOCKER_REGISTRY}/antrea/${image}" || true + ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "ctr -n k8s.io images pull --user ${DOCKER_USERNAME}:${DOCKER_PASSWORD} ${image}" || true done for i in `seq 2`; do diff --git a/test/e2e/vmagent_test.go b/test/e2e/vmagent_test.go index 673d315288f..2e1d75682b0 100644 --- a/test/e2e/vmagent_test.go +++ b/test/e2e/vmagent_test.go @@ -525,7 +525,7 @@ func testANPOnVMs(t *testing.T, data *TestData, vmList []vmInfo, osType string) }) // Test FQDN rules in ANP t.Run("testANPOnExternalNodeWithFQDN", func(t *testing.T) { - testANPWithFQDN(t, data, "anp-vmagent-fqdn", namespace, *appliedToVM, []string{"www.facebook.com"}, []string{"docs.google.com"}, []string{"github.com"}) + testANPWithFQDN(t, data, "anp-vmagent-fqdn", namespace, *appliedToVM, []string{"docs.amazon.com"}, []string{"docs.google.com"}, []string{"github.com"}) }) } From 8f08909c509370681725dd7c966a106539ce8081 Mon Sep 17 00:00:00 2001 From: Gavin Xin Date: Fri, 9 Aug 2024 02:22:51 +0800 Subject: [PATCH 4/4] Match dstIP in Classifier to address windows promiscuous mode issue (#6528) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When promiscuous mode is enabled, OVS incorrectly forwards packets destined for non-local IP addresses from the uplink to the host interface. Due to IP forwarding being enabled, these packets are re-sent to br-int according to the default route and are eventually forwarded to the uplink. Since the source MAC of these packets has been changed to the local host’s MAC, this can potentially cause errors on the switch. This patch matches dstIP field in ClassifierTable to ensure proper packet handling and preventing unintended forwarding. Signed-off-by: Gavin Xin --- pkg/agent/openflow/pipeline.go | 5 ++--- pkg/agent/openflow/pipeline_other.go | 4 ++++ pkg/agent/openflow/pipeline_windows.go | 5 +++++ pkg/agent/openflow/pod_connectivity_test.go | 4 ++-- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index b5f75df9327..601e910f186 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2967,9 +2967,8 @@ func (f *featurePodConnectivity) hostBridgeLocalFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() return []binding.Flow{ // This generates the flow to forward the packets from uplink port to bridge local port. - ClassifierTable.ofTable.BuildFlow(priorityNormal). - Cookie(cookieID). - MatchInPort(f.uplinkPort). + f.matchUplinkInPortInClassifierTable(ClassifierTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID)). Action().Output(f.hostIfacePort). Done(), // This generates the flow to forward the packets from bridge local port to uplink port. diff --git a/pkg/agent/openflow/pipeline_other.go b/pkg/agent/openflow/pipeline_other.go index 60459fba878..44df8f357ba 100644 --- a/pkg/agent/openflow/pipeline_other.go +++ b/pkg/agent/openflow/pipeline_other.go @@ -25,6 +25,10 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" ) +func (f *featurePodConnectivity) matchUplinkInPortInClassifierTable(flowBuilder binding.FlowBuilder) binding.FlowBuilder { + return flowBuilder.MatchInPort(f.uplinkPort) +} + // hostBridgeUplinkFlows generates the flows that forward traffic between the bridge local port and the uplink port to // support the host traffic. // TODO(gran): sync latest changes from pipeline_windows.go diff --git a/pkg/agent/openflow/pipeline_windows.go b/pkg/agent/openflow/pipeline_windows.go index 2557d3f6ffb..fb2fe300ad0 100644 --- a/pkg/agent/openflow/pipeline_windows.go +++ b/pkg/agent/openflow/pipeline_windows.go @@ -23,6 +23,11 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" ) +// matchUplinkInPortInClassifierTable matches dstIP field to prevent unintended forwarding when promiscuous mode is enabled on Windows. +func (f *featurePodConnectivity) matchUplinkInPortInClassifierTable(flowBuilder binding.FlowBuilder) binding.FlowBuilder { + return flowBuilder.MatchInPort(f.uplinkPort).MatchProtocol(binding.ProtocolIP).MatchDstIP(f.nodeConfig.NodeTransportIPv4Addr.IP) +} + // hostBridgeUplinkFlows generates the flows that forward traffic between the bridge local port and the uplink port to // support the host traffic with outside. func (f *featurePodConnectivity) hostBridgeUplinkFlows() []binding.Flow { diff --git a/pkg/agent/openflow/pod_connectivity_test.go b/pkg/agent/openflow/pod_connectivity_test.go index 327372ebb08..77e3aba9a73 100644 --- a/pkg/agent/openflow/pod_connectivity_test.go +++ b/pkg/agent/openflow/pod_connectivity_test.go @@ -90,7 +90,7 @@ func podConnectivityInitFlows( flows = append(flows, "cookie=0x1010000000000, table=ARPSpoofGuard, priority=200,in_port=32770 actions=output:4294967294", "cookie=0x1010000000000, table=ARPSpoofGuard, priority=200,in_port=4294967294 actions=output:32770", - "cookie=0x1010000000000, table=Classifier, priority=200,in_port=32770 actions=output:4294967294", + "cookie=0x1010000000000, table=Classifier, priority=200,ip,in_port=32770,nw_dst=192.168.77.100 actions=output:4294967294", "cookie=0x1010000000000, table=Classifier, priority=200,in_port=4294967294 actions=output:32770", "cookie=0x1010000000000, table=IngressSecurityClassifier, priority=210,ct_state=-rpl+trk,ip,nw_src=10.10.0.1 actions=goto_table:ConntrackCommit", ) @@ -161,7 +161,7 @@ func podConnectivityInitFlows( "cookie=0x1010000000000, table=ARPSpoofGuard, priority=200,in_port=32770 actions=output:4294967294", "cookie=0x1010000000000, table=ARPSpoofGuard, priority=200,in_port=4294967294 actions=output:32770", "cookie=0x1010000000000, table=Classifier, priority=210,ip,in_port=32770,nw_dst=10.10.0.0/24 actions=set_field:0x4/0xf->reg0,set_field:0x200/0x200->reg0,goto_table:UnSNAT", - "cookie=0x1010000000000, table=Classifier, priority=200,in_port=32770 actions=output:4294967294", + "cookie=0x1010000000000, table=Classifier, priority=200,ip,in_port=32770,nw_dst=192.168.77.100 actions=output:4294967294", "cookie=0x1010000000000, table=Classifier, priority=200,in_port=4294967294 actions=output:32770", "cookie=0x1010000000000, table=SpoofGuard, priority=200,ip,in_port=32769 actions=goto_table:UnSNAT", "cookie=0x1010000000000, table=ConntrackZone, priority=200,ip actions=ct(table=ConntrackState,zone=65520,nat)",