From 136785b2feb3cd3ec4ae2bf38f3eabb8d3c94d17 Mon Sep 17 00:00:00 2001 From: Wenying Dong Date: Wed, 25 Sep 2024 14:29:26 +0800 Subject: [PATCH] [Windows] CNI Server installs OpenFlow entries after PortStatus message is received This change has introduced a worker in `podConfigurator` to listen for the OpenFlow PortStatus message when a new OpenFlow port is allocated in OVS. After receiving the message, antrea-agent Windows will install Pod related OpenFlow entries. If the OpenFlow port is not allocated within 30s after the CmdAdd request is responded, an event with type "NetworkNotReady" is added on the Pod; Whenever the Pod networking forwarding rules are installed, an event with type "NetworkIsReady" is added. Signed-off-by: Wenying Dong --- cmd/antrea-agent/agent.go | 1 + go.mod | 9 +- go.sum | 18 +- hack/update-codegen-dockerized.sh | 3 +- pkg/agent/cniserver/pod_configuration.go | 155 +++++- .../cniserver/pod_configuration_linux.go | 9 + .../cniserver/pod_configuration_linux_test.go | 2 +- pkg/agent/cniserver/pod_configuration_test.go | 469 ++++++++++++++++++ .../cniserver/pod_configuration_windows.go | 109 +++- pkg/agent/cniserver/secondary.go | 2 +- pkg/agent/cniserver/server.go | 10 +- pkg/agent/cniserver/server_linux_test.go | 12 +- pkg/agent/cniserver/server_windows_test.go | 177 +++---- pkg/agent/interfacestore/interface_cache.go | 5 + .../interfacestore/interface_cache_test.go | 48 ++ .../testing/mock_interfacestore.go | 12 + pkg/agent/interfacestore/types.go | 6 + .../interfacestore/zz_generated.deepcopy.go | 155 ++++++ pkg/agent/openflow/client.go | 7 + pkg/agent/openflow/client_test.go | 11 + pkg/agent/openflow/testing/mock_openflow.go | 13 + pkg/ovs/openflow/interfaces.go | 3 + pkg/ovs/openflow/ofctrl_bridge.go | 32 +- pkg/ovs/openflow/testing/mock_openflow.go | 12 + test/integration/agent/cniserver_test.go | 3 + 25 files changed, 1137 insertions(+), 146 deletions(-) create mode 100644 pkg/agent/cniserver/pod_configuration_test.go create mode 100644 pkg/agent/interfacestore/zz_generated.deepcopy.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 60bd61503aa..162e4458237 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -600,6 +600,7 @@ func run(o *Options) error { o.config.CNISocket, o.config.HostProcPathPrefix, nodeConfig, + localPodInformer.Get(), k8sClient, routeClient, isChaining, diff --git a/go.mod b/go.mod index 639898c0693..55be4d9f210 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module antrea.io/antrea go 1.23.0 require ( - antrea.io/libOpenflow v0.14.0 - antrea.io/ofnet v0.12.0 + antrea.io/libOpenflow v0.15.0 + antrea.io/ofnet v0.14.0 github.com/ClickHouse/clickhouse-go/v2 v2.6.1 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Mellanox/sriovnet v1.1.0 @@ -113,10 +113,9 @@ require ( github.com/aws/smithy-go v1.12.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenk/hub v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cenkalti/hub v1.0.1 // indirect - github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect + github.com/cenkalti/hub v1.0.2 // indirect + github.com/cenkalti/rpc2 v1.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/go.sum b/go.sum index ebe91ef77e4..9a639da28e4 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE= -antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0= -antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao= -antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg= +antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI= +antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ= +antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc= +antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA= -github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= -github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= +github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8= +github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU= +github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k= +github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index b421dc28cdf..c0e552be17a 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -169,7 +169,8 @@ function generate_antrea_client_code { "${ANTREA_PKG}/pkg/apis/crd/v1alpha2" \ "${ANTREA_PKG}/pkg/apis/crd/v1beta1" \ "${ANTREA_PKG}/pkg/apis/stats" \ - "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" + "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \ + "${ANTREA_PKG}/pkg/agent/interfacestore" $GOPATH/bin/conversion-gen \ --output-file zz_generated.conversion.go \ diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index e44e739fd61..e1ad05752eb 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -15,16 +15,24 @@ package cniserver import ( + "bytes" "encoding/json" "fmt" "net" "strings" "sync" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/cni/pkg/version" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -61,6 +69,10 @@ const ( var ( getNSPath = util.GetNSPath + // retryInterval is the interval to re-install Pod OpenFlow entries if any error happened. + // Note, using a variable rather than constant for retryInterval because we may use a shorter time in the + // test code. + retryInterval = 5 * time.Second ) type podConfigurator struct { @@ -76,9 +88,19 @@ type podConfigurator struct { // isSecondaryNetwork is true if this instance of podConfigurator is used to configure // Pod secondary network interfaces. isSecondaryNetwork bool + + containerAccess *containerAccessArbitrator + eventBroadcaster record.EventBroadcaster + recorder record.EventRecorder + podListerSynced cache.InformerSynced + podLister v1.PodLister + kubeClient clientset.Interface + unreadyPortQueue workqueue.TypedDelayingInterface[string] + statusCh chan *openflow15.PortStatus } func newPodConfigurator( + kubeClient clientset.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -88,12 +110,14 @@ func newPodConfigurator( isOvsHardwareOffloadEnabled bool, disableTXChecksumOffload bool, podUpdateNotifier channel.Notifier, + podInformer cache.SharedIndexInformer, + containerAccess *containerAccessArbitrator, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload) if err != nil { return nil, err } - return &podConfigurator{ + pc := &podConfigurator{ ovsBridgeClient: ovsBridgeClient, ofClient: ofClient, routeClient: routeClient, @@ -101,7 +125,12 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, - }, nil + kubeClient: kubeClient, + containerAccess: containerAccess, + } + // Initiate the PortStatus message listener. This function is a no-op except on Windows. + pc.initPortStatusMonitor(podInformer) + return pc, nil } func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) { @@ -166,13 +195,13 @@ func getContainerIPsString(ips []net.IP) string { // not created for a Pod interface. func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig { if portData.ExternalIDs == nil { - klog.V(2).Infof("OVS port %s has no external_ids", portData.Name) + klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name) return nil } containerID, found := portData.ExternalIDs[ovsExternalIDContainerID] if !found { - klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID) + klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name) return nil } @@ -187,8 +216,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC]) if err != nil { - klog.Errorf("Failed to parse MAC address from OVS external config %s: %v", - portData.ExternalIDs[ovsExternalIDMAC], err) + klog.ErrorS(err, "Failed to parse MAC address from OVS external config") } podName, _ := portData.ExternalIDs[ovsExternalIDPodName] podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace] @@ -279,7 +307,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s func (pc *podConfigurator) removeInterfaces(containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } @@ -498,7 +526,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int. func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error { containerID := containerConfig.ContainerID - klog.V(2).Infof("Deleting Openflow entries for container %s", containerID) + klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID) if !pc.isSecondaryNetwork { if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil { return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err) @@ -513,6 +541,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil { return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err) } + // Remove container configuration from cache. pc.ifaceStore.DeleteInterface(containerConfig) if !pc.isSecondaryNetwork { @@ -558,7 +587,7 @@ func (pc *podConfigurator) connectInterceptedInterface( func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } for _, ip := range containerConfig.IPs { @@ -570,3 +599,111 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, return pc.disconnectInterfaceFromOVS(containerConfig) // TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy?? } + +func (pc *podConfigurator) processNextWorkItem() bool { + key, quit := pc.unreadyPortQueue.Get() + if quit { + return false + } + defer pc.unreadyPortQueue.Done(key) + + if err := pc.updateUnreadyPod(key); err != nil { + klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key) + // Put the item back on the workqueue to handle any transient errors. + pc.unreadyPortQueue.AddAfter(key, retryInterval) + } + return true +} + +func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error { + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort) + return nil + } + + pc.containerAccess.lockContainer(ifConfig.ContainerID) + defer pc.containerAccess.unlockContainer(ifConfig.ContainerID) + // Get the InterfaceConfig again after the lock to avoid race conditions. + ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort) + return nil + } + + if ifConfig.OFPort == 0 { + // Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated. + // Returns error so that we can have a retry after 5s. + pc.recordPodEvent(ifConfig, false) + return fmt.Errorf("pod's OpenFlow port is not ready yet") + } + + // Install OpenFlow entries for the Pod. + klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort) + if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil { + // Add Pod not-ready event if the pod flows installation fails. + // Returns error so that we can have a retry after 5s. + pc.recordPodEvent(ifConfig, false) + return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err) + } + + // Notify the Pod update event to required components. + event := agenttypes.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) + + pc.recordPodEvent(ifConfig, true) + return nil +} + +func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConfig, installed bool) { + pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName) + if err != nil { + klog.InfoS("Unable to get Pod, skip recording Pod event", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName)) + return + } + + if installed { + // Add normal event to record Pod network is ready. + pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules") + return + } + + pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed") +} + +func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) { + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + if status.Desc.State != openflow15.PS_LIVE { + return + } + ovsPort := string(bytes.Trim(status.Desc.Name, "\x00")) + ofPort := status.Desc.PortNo + + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found", "ovsPort", ovsPort) + return + } + + func() { + pc.containerAccess.lockContainer(ifConfig.ContainerID) + defer pc.containerAccess.unlockContainer(ifConfig.ContainerID) + // Get the InterfaceConfig again after the lock to avoid race conditions. + ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found", "ovsPort", ovsPort) + return + } + // Update interface config with the ofPort. + newIfConfig := ifConfig.DeepCopy() + newIfConfig.OVSPortConfig.OFPort = int32(ofPort) + pc.ifaceStore.UpdateInterface(newIfConfig) + }() + + pc.unreadyPortQueue.Add(ovsPort) +} diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index fe281f06330..ef59a6beeab 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -21,6 +21,7 @@ import ( "fmt" current "github.com/containernetworking/cni/pkg/types/100" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -113,3 +114,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName) } } + +func (pc *podConfigurator) initPortStatusMonitor(_ cache.SharedIndexInformer) { + +} + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + <-stopCh +} diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 4f51bd4892b..bd7703424b1 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_test.go b/pkg/agent/cniserver/pod_configuration_test.go new file mode 100644 index 00000000000..b7bf70170d8 --- /dev/null +++ b/pkg/agent/cniserver/pod_configuration_test.go @@ -0,0 +1,469 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "fmt" + "net" + "testing" + "time" + + "antrea.io/libOpenflow/openflow15" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + coreinformers "k8s.io/client-go/informers/core/v1" + fakeclientset "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" +) + +var ( + podIfName = "test" + podIPs = []net.IP{net.ParseIP("192.168.9.10")} + podMac, _ = net.ParseMAC("00:15:5D:B2:6F:38") + podInfraContainerID = "261a1970-5b6c-11ed-8caf-000c294e5d03" + + pod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPodNameA, + Namespace: testPodNamespace, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + } + + portStatusMsg = &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + } +) + +type mockClients struct { + kubeClient *fakeclientset.Clientset + localPodInformer cache.SharedIndexInformer + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + ofClient *openflowtest.MockClient + recorder *record.FakeRecorder +} + +func newMockClients(ctrl *gomock.Controller, nodeName string, objects ...runtime.Object) *mockClients { + kubeClient := fakeclientset.NewClientset() + for _, obj := range objects { + if obj != nil { + kubeClient.Tracker().Add(obj) + } + } + + localPodInformer := coreinformers.NewFilteredPodInformer( + kubeClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + }, + ) + podLister := corelisters.NewPodLister(localPodInformer.GetIndexer()) + ofClient := openflowtest.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(100) + recorder.IncludeObject = false + + return &mockClients{ + kubeClient: kubeClient, + localPodInformer: localPodInformer, + podLister: podLister, + podListerSynced: localPodInformer.HasSynced, + ofClient: ofClient, + recorder: recorder, + } +} + +func (c *mockClients) startInformers(stopCh chan struct{}) { + go c.localPodInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, c.localPodInformer.HasSynced) +} + +type asyncWaiter struct { + podName string + containerID string + waitCh chan struct{} + notifier *channel.SubscribableChannel +} + +func (w *asyncWaiter) notify(e interface{}) { + podUpdate := e.(types.PodUpdate) + if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { + w.waitCh <- struct{}{} + } +} + +func (w *asyncWaiter) waitUntil(timeout time.Duration) bool { + select { + case <-w.waitCh: + return true + case <-time.After(timeout): + return false + } +} + +func newAsyncWaiter(podName, containerID string, stopCh chan struct{}) *asyncWaiter { + waiter := &asyncWaiter{ + podName: podName, + containerID: containerID, + waitCh: make(chan struct{}), + notifier: channel.NewSubscribableChannel("PodUpdate", 100), + } + waiter.notifier.Subscribe(waiter.notify) + go waiter.notifier.Run(stopCh) + return waiter +} + +func mockRetryInterval(t *testing.T) { + oriRetryInterval := retryInterval + retryInterval = time.Millisecond * 500 + t.Cleanup(func() { + retryInterval = oriRetryInterval + }) +} + +func newTestPodConfigurator(testClients *mockClients, waiter *asyncWaiter) *podConfigurator { + interfaceStore := interfacestore.NewInterfaceStore() + eventBroadcaster := record.NewBroadcaster() + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podConfigurator", + }, + ) + podCfg := &podConfigurator{ + kubeClient: testClients.kubeClient, + ofClient: testClients.ofClient, + podLister: testClients.podLister, + podListerSynced: testClients.podListerSynced, + ifaceStore: interfaceStore, + eventBroadcaster: eventBroadcaster, + recorder: testClients.recorder, + unreadyPortQueue: queue, + containerAccess: newContainerAccessArbitrator(), + } + if waiter != nil { + podCfg.podUpdateNotifier = waiter.notifier + } + return podCfg +} + +func TestUpdateUnreadyPod(t *testing.T) { + mockRetryInterval(t) + + for _, tc := range []struct { + name string + ofPortAssigned bool + podIfaceIsCached bool + installFlow bool + flowInstalled bool + installOpenFlowErr error + expErr string + expEvent string + }{ + { + name: "updated Port is not in interface store", + podIfaceIsCached: false, + installFlow: false, + }, { + name: "OpenFlow port is not assigned", + podIfaceIsCached: true, + ofPortAssigned: false, + installFlow: false, + expErr: "pod's OpenFlow port is not ready yet", + expEvent: "Warning NetworkNotReady Pod network forwarding rules not installed", + }, { + name: "failed to install OpenFlow entries for updated Port", + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expErr: "failed to add Openflow entries for OVS port test: failure to install flow", + expEvent: "Warning NetworkNotReady Pod network forwarding rules not installed", + }, { + name: "succeeded", + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: nil, + expEvent: "Normal NetworkReady Installed Pod network forwarding rules", + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID, stopCh) + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + + flowInstalled := false + + ifConfig := interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + } + + if tc.ofPortAssigned { + ifConfig.OVSPortConfig.OFPort = int32(1) + } + + if tc.podIfaceIsCached { + configurator.ifaceStore.AddInterface(&ifConfig) + } + + if tc.installFlow { + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + if tc.installOpenFlowErr == nil { + flowInstalled = true + } + } + + err := configurator.updateUnreadyPod(podIfName) + if tc.expErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expErr) + } + + if flowInstalled { + assert.True(t, waiter.waitUntil(5*time.Second)) + } + + if tc.expEvent != "" { + actEvt := <-testClients.recorder.Events + require.Equal(t, tc.expEvent, actEvt) + } + }) + } +} + +func TestProcessNextWorkItem(t *testing.T) { + mockRetryInterval(t) + + for _, tc := range []struct { + name string + installOpenFlowErr error + expEvent string + expRequeue bool + }{ + { + name: "failed to install OpenFlow entries for updated Port", + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expRequeue: true, + }, { + name: "succeeded", + installOpenFlowErr: nil, + expRequeue: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID, stopCh) + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + defer configurator.unreadyPortQueue.ShutDown() + + configurator.ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + OFPort: int32(1), + }, + IPs: podIPs, + MAC: podMac, + }) + + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + configurator.unreadyPortQueue.Add(podIfName) + + configurator.processNextWorkItem() + + time.Sleep(600 * time.Millisecond) + if tc.installOpenFlowErr != nil { + require.Equal(t, 1, configurator.unreadyPortQueue.Len()) + key, _ := configurator.unreadyPortQueue.Get() + assert.Equal(t, key, podIfName) + } else { + require.Equal(t, 0, configurator.unreadyPortQueue.Len()) + } + }) + } +} + +func TestProcessPortStatusMessage(t *testing.T) { + validOFPort := int32(1) + invalidOFPort := int32(0) + for _, tc := range []struct { + name string + status *openflow15.PortStatus + ovsPortName string + ifaceInStore bool + expEnqueue bool + expOFportNumber *int32 + }{ + { + name: "Add OF port if port status is live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: true, + expOFportNumber: &validOFPort, + }, { + name: "Add OF port with suffix in name", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: true, + expOFportNumber: &validOFPort, + }, { + name: "Ignore OF port if port is not live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LINK_DOWN, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: false, + expOFportNumber: &invalidOFPort, + }, { + name: "Not enqueue OF port status message if the interface config does not exist", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: false, + expEnqueue: false, + expOFportNumber: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podMonitor", + }) + podCfg := &podConfigurator{ + ifaceStore: interfacestore.NewInterfaceStore(), + statusCh: make(chan *openflow15.PortStatus), + unreadyPortQueue: queue, + containerAccess: newContainerAccessArbitrator(), + } + defer podCfg.unreadyPortQueue.ShutDown() + + if tc.ifaceInStore { + podCfg.ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + }) + } + + podCfg.processPortStatusMessage(tc.status) + time.Sleep(600 * time.Millisecond) + if tc.expEnqueue { + require.Equal(t, 1, queue.Len()) + key, _ := queue.Get() + assert.Equal(t, tc.ovsPortName, key) + } else { + require.Equal(t, 0, queue.Len()) + } + + if tc.expOFportNumber != nil { + ifaceCfg, ok := podCfg.ifaceStore.GetInterfaceByName(podIfName) + require.True(t, ok) + assert.Equal(t, *tc.expOFportNumber, ifaceCfg.OFPort) + } + }) + } +} diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 44734e4f20e..e21998ed6a5 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -18,15 +18,30 @@ package cniserver import ( - "fmt" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/types" - "antrea.io/antrea/pkg/util/k8s" +) + +var ( + workerName = "podConfigurator" +) + +const ( + podNotReadyTimeInSeconds = 30 * time.Second ) // connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously @@ -34,29 +49,16 @@ import ( // CNI call completes. func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error { ovsPortName := ifConfig.InterfaceName + // Add the OVS port into the queue after 30s in case the OFPort is still not ready. This + // operation is performed before we update OVSDB, otherwise we + // need to think about the race condition between the current goroutine with the listener. + // It may generate a duplicated PodIsReady event if the Pod's OpenFlow entries are installed + // before the time, then the library shall merge the event. + pc.unreadyPortQueue.AddAfter(ovsPortName, podNotReadyTimeInSeconds) return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error { if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil { return err } - ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true) - if err != nil { - return err - } - containerID := ifConfig.ContainerID - klog.V(2).Infof("Setting up Openflow entries for container %s", containerID) - if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil { - return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err) - } - // Update interface config with the ofPort. - ifConfig.OVSPortConfig.OFPort = ofPort - // Notify the Pod update event to required components. - event := types.PodUpdate{ - PodName: ifConfig.PodName, - PodNamespace: ifConfig.PodNamespace, - IsAdd: true, - ContainerID: ifConfig.ContainerID, - } - pc.podUpdateNotifier.Notify(event) return nil }) } @@ -75,7 +77,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // Because of this, we need to wait asynchronously for the interface to be created: we create the OVS port // and set the OVS Interface type "" first, and change the OVS Interface type to "internal" to connect to the // container interface after it is created. After OVS connects to the container interface, an OFPort is allocated. - klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) + klog.V(2).InfoS("Adding OVS port for container", "port", ovsPortName, "container", containerID) ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig) portUUID, err := pc.createOVSPort(ovsPortName, ovsAttachInfo, containerConfig.VLANID) if err != nil { @@ -105,7 +107,7 @@ func (pc *podConfigurator) configureInterfaces( // See: https://github.com/kubernetes/kubernetes/issues/57253#issuecomment-358897721. interfaceConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if found { - klog.V(2).Infof("Found an existing OVS port for container %s, returning", containerID) + klog.V(2).InfoS("Found an existing OVS port for container, returning", "container", containerID) mac := interfaceConfig.MAC.String() hostIface := ¤t.Interface{ Name: interfaceConfig.InterfaceName, @@ -128,9 +130,64 @@ func (pc *podConfigurator) configureInterfaces( func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) { for i := range ifConfigs { ifaceConfig := ifConfigs[i] - pod := k8s.NamespacedName(ifaceConfig.PodNamespace, ifaceConfig.PodName) if err := pc.connectInterfaceToOVSAsync(ifaceConfig, containerAccess); err != nil { - klog.Errorf("Failed to reconcile Pod %s: %v", pod, err) + klog.ErrorS(err, "Failed to reconcile Pod", "Pod", klog.KRef(ifaceConfig.PodNamespace, ifaceConfig.PodNamespace)) } } } + +// initPortStatusMonitor has subscribed a channel to listen for the OpenFlow PortStatus message, and it also +// initiates the Pod recorder. +func (pc *podConfigurator) initPortStatusMonitor(podInformer cache.SharedIndexInformer) { + pc.podLister = v1.NewPodLister(podInformer.GetIndexer()) + pc.podListerSynced = podInformer.HasSynced + pc.unreadyPortQueue = workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: workerName, + }, + ) + eventBroadcaster := record.NewBroadcaster() + pc.eventBroadcaster = eventBroadcaster + pc.recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: "AntreaPodConfigurator"}, + ) + pc.statusCh = make(chan *openflow15.PortStatus, 100) + pc.ofClient.SubscribeOFPortStatusMessage(pc.statusCh) +} + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + defer pc.unreadyPortQueue.ShutDown() + + klog.Infof("Starting %s", workerName) + defer klog.Infof("Shutting down %s", workerName) + + if !cache.WaitForNamedCacheSync("podConfigurator", stopCh, pc.podListerSynced) { + return + } + pc.eventBroadcaster.StartStructuredLogging(0) + pc.eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{ + Interface: pc.kubeClient.CoreV1().Events(""), + }) + defer pc.eventBroadcaster.Shutdown() + + go wait.Until(pc.worker, time.Second, stopCh) + + for { + select { + case status := <-pc.statusCh: + klog.V(2).InfoS("Received PortStatus message", "message", status) + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + pc.processPortStatusMessage(status) + case <-stopCh: + return + } + } +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the workqueue. +func (pc *podConfigurator) worker() { + for pc.processNextWorkItem() { + } +} diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index a4176baba15..70e95c93ca2 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -26,7 +26,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient, interfaceStore interfacestore.InterfaceStore) (*podConfigurator, error) { - pc, err := newPodConfigurator(ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil) + pc, err := newPodConfigurator(nil, ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil, nil) if err == nil { pc.isSecondaryNetwork = true } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index bc77dcc7224..253c9ec2065 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -111,6 +112,7 @@ type CNIServer struct { serverVersion string nodeConfig *config.NodeConfig hostProcPathPrefix string + podInformer cache.SharedIndexInformer kubeClient clientset.Interface containerAccess *containerAccessArbitrator podConfigurator *podConfigurator @@ -628,6 +630,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( func New( cniSocket, hostProcPathPrefix string, nodeConfig *config.NodeConfig, + podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, @@ -639,6 +642,7 @@ func New( serverVersion: cni.AntreaCNIVersion, nodeConfig: nodeConfig, hostProcPathPrefix: hostProcPathPrefix, + podInformer: podInformer, kubeClient: kubeClient, containerAccess: newContainerAccessArbitrator(), routeClient: routeClient, @@ -660,9 +664,9 @@ func (s *CNIServer) Initialize( ) error { var err error s.podConfigurator, err = newPodConfigurator( - ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, + s.kubeClient, ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), - s.disableTXChecksumOffload, podUpdateNotifier) + s.disableTXChecksumOffload, podUpdateNotifier, s.podInformer, s.containerAccess) if err != nil { return fmt.Errorf("error during initialize podConfigurator: %v", err) } @@ -676,6 +680,8 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { klog.InfoS("Starting CNI server") defer klog.InfoS("Shutting down CNI server") + go s.podConfigurator.Run(stopCh) + listener, err := util.ListenLocalSocket(s.cniSocket) if err != nil { klog.Fatalf("Failed to bind on %s: %v", s.cniSocket, err) diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 856024dd32c..1914fdba965 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -95,7 +95,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -106,7 +106,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -119,7 +119,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + podConfigurator, err := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -201,7 +201,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} @@ -612,18 +612,18 @@ func TestCmdCheck(t *testing.T) { func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewClientset(pod1, pod2, pod3) mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 70487b083e8..e64a255256c 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -22,13 +22,15 @@ import ( "testing" "time" + "antrea.io/libOpenflow/openflow15" "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - "k8s.io/apimachinery/pkg/util/wait" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -37,12 +39,11 @@ import ( "antrea.io/antrea/pkg/agent/cniserver/types" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" - openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - agenttypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" winnettest "antrea.io/antrea/pkg/agent/util/winnet/testing" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" + "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" ) @@ -173,17 +174,15 @@ type hnsTestUtil struct { hnsEndpoint *hcsshim.HNSEndpoint hcnEndpoint *hcn.HostComputeEndpoint isDocker bool - isAttached bool hnsEndpointCreatErr error endpointAttachErr error } -func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker, isAttached bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { +func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { return &hnsTestUtil{ endpointID: endpointID, existingHnsEndpoints: existingHnsEndpoints, isDocker: isDocker, - isAttached: isAttached, hnsEndpointCreatErr: hnsEndpointCreatErr, endpointAttachErr: endpointAttachErr, } @@ -217,9 +216,6 @@ func (t *hnsTestUtil) deleteHnsEndpoint(endpoint *hcsshim.HNSEndpoint) (*hcsshim func (t *hnsTestUtil) attachEndpointInNamespace(ep *hcn.HostComputeEndpoint, namespace string) error { t.hcnEndpoint.HostComputeNamespace = namespace - if t.endpointAttachErr == nil { - t.addHostInterface() - } return t.endpointAttachErr } @@ -257,9 +253,10 @@ func (t *hnsTestUtil) addHostInterface() { }() } -func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { +func newMockCNIServer(t *testing.T, controller *gomock.Controller, clients *mockClients, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { + kubeClient := fakeclientset.NewClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient mockRoute = routetest.NewMockInterface(controller) mockWinnet = winnettest.NewMockInterface(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -269,7 +266,8 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier, clients.localPodInformer, cniServer.containerAccess) cniServer.podConfigurator.ifConfigurator.(*ifConfigurator).winnet = mockWinnet return cniServer } @@ -315,7 +313,6 @@ func TestCmdAdd(t *testing.T) { hnsEndpointCreateErr error endpointAttachErr error ifaceExist bool - isAttached bool existingHnsEndpoints []hcsshim.HNSEndpoint endpointExists bool connectOVS bool @@ -341,7 +338,6 @@ func TestCmdAdd(t *testing.T) { oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, - isAttached: true, }, { name: "containerd-attach-failure", podName: "pod10", @@ -364,13 +360,23 @@ func TestCmdAdd(t *testing.T) { ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) + stopCh := make(chan struct{}) + defer close(stopCh) isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() - waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) - server := newMockCNIServer(t, controller, waiter.notifier) + waiter := newAsyncWaiter(tc.podName, tc.infraContainerID, stopCh) + clients := newMockClients(controller, nodeName, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: tc.podName, Namespace: testPodNamespace}, + Spec: corev1.PodSpec{NodeName: nodeName}, + }) + clients.startInformers(stopCh) + + server := newMockCNIServer(t, controller, clients, waiter.notifier) + go server.podConfigurator.Run(stopCh) + requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) @@ -389,10 +395,29 @@ func TestCmdAdd(t *testing.T) { } ovsPortID := generateUUID() if tc.connectOVS { + ofPortNumber := uint32(100) + portStatusCh := server.podConfigurator.statusCh mockOVSBridgeClient.EXPECT().CreatePort(ovsPortName, ovsPortName, gomock.Any()).Return(ovsPortID, nil).Times(1) - mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(ovsPortName, true).Return(int32(100), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + go func() { + time.Sleep(time.Millisecond * 50) + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: ofPortNumber, + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + }() + return nil + }, + ) + mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), uint32(ofPortNumber), gomock.Any(), gomock.Any()).Return(nil) mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) } resp, err := server.CmdAdd(ctx, requestMsg) @@ -421,19 +446,9 @@ func TestCmdAdd(t *testing.T) { _, exists := ifaceStore.GetContainerInterface(containerID) assert.Equal(t, exists, tc.containerIfaceExist) if tc.connectOVS { - waiter.wait() - // Wait for the completion of async function "setInterfaceMTUFunc", otherwise it may lead to the - // race condition failure. - wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, - func(ctx context.Context) (done bool, err error) { - mtuSet, exist := hostIfaces.Load(ovsPortName) - if !exist { - return false, nil - } - return mtuSet.(bool), nil - }) + testUtil.addHostInterface() + assert.True(t, waiter.waitUntil(5*time.Second)) } - waiter.close() }) } } @@ -480,6 +495,8 @@ func TestCmdDel(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) @@ -491,11 +508,13 @@ func TestCmdDel(t *testing.T) { if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) } - testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, true, nil, nil) + testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, nil, nil) testUtil.setFunctions() defer testUtil.restore() - waiter := newAsyncWaiter(testPodNameA, containerID) - server := newMockCNIServer(t, controller, waiter.notifier) + waiter := newAsyncWaiter(testPodNameA, containerID, stopCh) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + server := newMockCNIServer(t, controller, clients, waiter.notifier) ovsPortID := generateUUID() if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(hnsEndpoint) @@ -530,9 +549,8 @@ func TestCmdDel(t *testing.T) { assert.False(t, exists) } if tc.disconnectOVS { - waiter.wait() + assert.True(t, waiter.waitUntil(5*time.Second)) } - waiter.close() }) } } @@ -665,12 +683,16 @@ func TestCmdCheck(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) defer mockGetNetInterfaceByName(tc.netInterface)() - cniserver := newMockCNIServer(t, controller, channel.NewSubscribableChannel("podUpdate", 100)) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + cniserver := newMockCNIServer(t, controller, clients, channel.NewSubscribableChannel("podUpdate", 100)) requestMsg, _ := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.containerID, tc.netns, tc.prevResult) ipamMock.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1) ifaceStore.AddInterface(tc.existingIface) @@ -685,68 +707,38 @@ func TestCmdCheck(t *testing.T) { } } -type asyncWaiter struct { - podName string - containerID string - waitCh chan struct{} - stopCh chan struct{} - notifier *channel.SubscribableChannel -} - -func (w *asyncWaiter) notify(e interface{}) { - podUpdate := e.(agenttypes.PodUpdate) - if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { - w.waitCh <- struct{}{} - } -} - -func (w *asyncWaiter) wait() { - <-w.waitCh -} - -func (w *asyncWaiter) close() { - close(w.waitCh) - close(w.stopCh) -} - -func newAsyncWaiter(podName, containerID string) *asyncWaiter { - waiter := &asyncWaiter{ - podName: podName, - containerID: containerID, - waitCh: make(chan struct{}), - stopCh: make(chan struct{}), - notifier: channel.NewSubscribableChannel("PodUpdate", 100), - } - waiter.notifier.Subscribe(waiter.notify) - go waiter.notifier.Run(waiter.stopCh) - return waiter -} - func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + clients := newMockClients(controller, nodeName, pod1, pod2, pod3) + clients.startInformers(stopCh) + kubeClient := clients.kubeClient mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") - testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) + testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() defer testUtil.restore() + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) + waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier, clients.localPodInformer, cniServer.containerAccess) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} + go cniServer.podConfigurator.Run(stopCh) // Re-install Pod1 flows podFlowsInstalled := make(chan string, 2) @@ -760,8 +752,26 @@ func TestReconcile(t *testing.T) { mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + select { + case <-time.After(time.Millisecond * 50): + portStatusCh := cniServer.podConfigurator.statusCh + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: uint32(5), + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + } + return nil + }, + ) mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName @@ -778,8 +788,7 @@ func TestReconcile(t *testing.T) { break } } - waiter.wait() - waiter.close() + assert.True(t, waiter.waitUntil(5*time.Second)) } func getHnsEndpoint(id, name string) *hcsshim.HNSEndpoint { diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index bba8cf2068a..5971ee4f008 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -105,6 +105,11 @@ func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) { } } +// UpdateInterface updates interfaceConfig into local cache. +func (c *interfaceCache) UpdateInterface(interfaceConfig *InterfaceConfig) { + c.cache.Update(interfaceConfig) +} + // DeleteInterface deletes interface from local cache. func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { c.cache.Delete(interfaceConfig) diff --git a/pkg/agent/interfacestore/interface_cache_test.go b/pkg/agent/interfacestore/interface_cache_test.go index bfbf2cacca6..69ef7227056 100644 --- a/pkg/agent/interfacestore/interface_cache_test.go +++ b/pkg/agent/interfacestore/interface_cache_test.go @@ -294,3 +294,51 @@ func newExternalEntityInterface(name string, entityIPs []net.IP, entityName stri }, } } + +func TestUpdateStore(t *testing.T) { + store := NewInterfaceStore() + mac, _ := net.ParseMAC("aa:aa:aa:aa:aa:aa") + ifConfig := &InterfaceConfig{ + Type: ContainerInterface, + InterfaceName: "interface1", + IPs: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")}, + MAC: mac, + VLANID: 1023, + OVSPortConfig: &OVSPortConfig{ + PortUUID: "12345678", + }, + ContainerInterfaceConfig: &ContainerInterfaceConfig{ + ContainerID: "aaaaaaa", + PodNamespace: "default", + PodName: "p1", + IFDev: "eth0", + }, + TunnelInterfaceConfig: &TunnelInterfaceConfig{ + Type: AntreaIPsecTunnel, + NodeName: "n1", + LocalIP: net.ParseIP("10.10.10.10"), + RemoteIP: net.ParseIP("20.20.20.20"), + DestinationPort: 443, + RemoteName: "n2", + PSK: "abcdefg", + Csum: true, + }, + EntityInterfaceConfig: &EntityInterfaceConfig{ + EntityName: "e1", + EntityNamespace: "ns1", + UplinkPort: &OVSPortConfig{ + PortUUID: "87654321", + OFPort: 1025, + }, + }, + } + store.AddInterface(ifConfig) + oldCfg, ok := store.GetInterfaceByName("interface1") + require.True(t, ok) + newCfg := oldCfg.DeepCopy() + newCfg.OVSPortConfig.OFPort = 1024 + store.UpdateInterface(newCfg) + ifConfig2, ok := store.GetInterfaceByName("interface1") + require.True(t, ok) + assert.Equal(t, int32(1024), ifConfig2.OFPort) +} diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index cf6ad7f2c2d..64347d6709b 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -278,3 +278,15 @@ func (mr *MockInterfaceStoreMockRecorder) ListInterfaces() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListInterfaces", reflect.TypeOf((*MockInterfaceStore)(nil).ListInterfaces)) } + +// UpdateInterface mocks base method. +func (m *MockInterfaceStore) UpdateInterface(interfaceConfig *interfacestore.InterfaceConfig) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateInterface", interfaceConfig) +} + +// UpdateInterface indicates an expected call of UpdateInterface. +func (mr *MockInterfaceStoreMockRecorder) UpdateInterface(interfaceConfig any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInterface", reflect.TypeOf((*MockInterfaceStore)(nil).UpdateInterface), interfaceConfig) +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 9d0ca2afe57..fcbbb8fc063 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -55,11 +55,13 @@ func (t InterfaceType) String() string { return strconv.Itoa(int(t)) } +// +k8s:deepcopy-gen=true type OVSPortConfig struct { PortUUID string OFPort int32 } +// +k8s:deepcopy-gen=true type ContainerInterfaceConfig struct { ContainerID string PodName string @@ -68,6 +70,7 @@ type ContainerInterfaceConfig struct { IFDev string } +// +k8s:deepcopy-gen=true type TunnelInterfaceConfig struct { Type ovsconfig.TunnelType // Name of the remote Node. @@ -87,6 +90,7 @@ type TunnelInterfaceConfig struct { Csum bool } +// +k8s:deepcopy-gen=true type EntityInterfaceConfig struct { EntityName string EntityNamespace string @@ -94,6 +98,7 @@ type EntityInterfaceConfig struct { UplinkPort *OVSPortConfig } +// +k8s:deepcopy-gen=true type InterfaceConfig struct { Type InterfaceType // Unique name of the interface, also used for the OVS port name. @@ -113,6 +118,7 @@ type InterfaceConfig struct { type InterfaceStore interface { Initialize(interfaces []*InterfaceConfig) AddInterface(interfaceConfig *InterfaceConfig) + UpdateInterface(interfaceConfig *InterfaceConfig) ListInterfaces() []*InterfaceConfig DeleteInterface(interfaceConfig *InterfaceConfig) GetInterface(interfaceKey string) (*InterfaceConfig, bool) diff --git a/pkg/agent/interfacestore/zz_generated.deepcopy.go b/pkg/agent/interfacestore/zz_generated.deepcopy.go new file mode 100644 index 00000000000..5071c0555c4 --- /dev/null +++ b/pkg/agent/interfacestore/zz_generated.deepcopy.go @@ -0,0 +1,155 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package interfacestore + +import ( + net "net" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerInterfaceConfig) DeepCopyInto(out *ContainerInterfaceConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerInterfaceConfig. +func (in *ContainerInterfaceConfig) DeepCopy() *ContainerInterfaceConfig { + if in == nil { + return nil + } + out := new(ContainerInterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EntityInterfaceConfig) DeepCopyInto(out *EntityInterfaceConfig) { + *out = *in + if in.UplinkPort != nil { + in, out := &in.UplinkPort, &out.UplinkPort + *out = new(OVSPortConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EntityInterfaceConfig. +func (in *EntityInterfaceConfig) DeepCopy() *EntityInterfaceConfig { + if in == nil { + return nil + } + out := new(EntityInterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InterfaceConfig) DeepCopyInto(out *InterfaceConfig) { + *out = *in + if in.IPs != nil { + in, out := &in.IPs, &out.IPs + *out = make([]net.IP, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + } + } + if in.MAC != nil { + in, out := &in.MAC, &out.MAC + *out = make(net.HardwareAddr, len(*in)) + copy(*out, *in) + } + if in.OVSPortConfig != nil { + in, out := &in.OVSPortConfig, &out.OVSPortConfig + *out = new(OVSPortConfig) + **out = **in + } + if in.ContainerInterfaceConfig != nil { + in, out := &in.ContainerInterfaceConfig, &out.ContainerInterfaceConfig + *out = new(ContainerInterfaceConfig) + **out = **in + } + if in.TunnelInterfaceConfig != nil { + in, out := &in.TunnelInterfaceConfig, &out.TunnelInterfaceConfig + *out = new(TunnelInterfaceConfig) + (*in).DeepCopyInto(*out) + } + if in.EntityInterfaceConfig != nil { + in, out := &in.EntityInterfaceConfig, &out.EntityInterfaceConfig + *out = new(EntityInterfaceConfig) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InterfaceConfig. +func (in *InterfaceConfig) DeepCopy() *InterfaceConfig { + if in == nil { + return nil + } + out := new(InterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OVSPortConfig) DeepCopyInto(out *OVSPortConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OVSPortConfig. +func (in *OVSPortConfig) DeepCopy() *OVSPortConfig { + if in == nil { + return nil + } + out := new(OVSPortConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TunnelInterfaceConfig) DeepCopyInto(out *TunnelInterfaceConfig) { + *out = *in + if in.LocalIP != nil { + in, out := &in.LocalIP, &out.LocalIP + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + if in.RemoteIP != nil { + in, out := &in.RemoteIP, &out.RemoteIP + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TunnelInterfaceConfig. +func (in *TunnelInterfaceConfig) DeepCopy() *TunnelInterfaceConfig { + if in == nil { + return nil + } + out := new(TunnelInterfaceConfig) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e6e77d0cebd..4cd1134a457 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -408,6 +408,9 @@ type Client interface { // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error + + // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. + SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) } // GetFlowTableStatus returns an array of flow table status. @@ -1697,3 +1700,7 @@ func (c *client) getMeterStats() { klog.ErrorS(err, "Failed to get OVS meter stats") } } + +func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + c.bridge.SubscribePortStatusConsumer(statusCh) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 658193b1220..ab5bd20dcca 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2906,3 +2906,14 @@ func TestCachedFlowIsDrop(t *testing.T) { msg = flows[0].GetMessage().(*openflow15.FlowMod) assert.False(t, isDropFlow(msg)) } + +func TestSubscribeOFPortStatusMessage(t *testing.T) { + ctrl := gomock.NewController(t) + ch := make(chan *openflow15.PortStatus) + bridge := ovsoftest.NewMockBridge(ctrl) + c := client{ + bridge: bridge, + } + bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) + c.SubscribeOFPortStatusMessage(ch) +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index bd3ee7b835a..eafaa90f344 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -36,6 +36,7 @@ import ( openflow0 "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" ofctrl "antrea.io/ofnet/ofctrl" @@ -849,6 +850,18 @@ func (mr *MockClientMockRecorder) StartPacketInHandler(stopCh any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartPacketInHandler", reflect.TypeOf((*MockClient)(nil).StartPacketInHandler), stopCh) } +// SubscribeOFPortStatusMessage mocks base method. +func (m *MockClient) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribeOFPortStatusMessage", statusCh) +} + +// SubscribeOFPortStatusMessage indicates an expected call of SubscribeOFPortStatusMessage. +func (mr *MockClientMockRecorder) SubscribeOFPortStatusMessage(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeOFPortStatusMessage", reflect.TypeOf((*MockClient)(nil).SubscribeOFPortStatusMessage), statusCh) +} + // SubscribePacketIn mocks base method. func (m *MockClient) SubscribePacketIn(reason uint8, pktInQueue *openflow0.PacketInQueue) error { m.ctrl.T.Helper() diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 1c5c60fd22d..edf8ced802b 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -146,6 +146,9 @@ type Bridge interface { ResumePacket(packetIn *ofctrl.PacketIn) error // BuildPacketOut returns a new PacketOutBuilder. BuildPacketOut() PacketOutBuilder + // SubscribePortStatusConsumer registers a consumer to listen to OpenFlow PortStatus message. + // We only support a single consumer for now. + SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) } // TableStatus represents the status of a specific flow table. The status is useful for debugging. diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c0f6ede4daa..da46f7e5ee8 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -202,9 +202,14 @@ type OFBridge struct { // pktConsumers is a map from PacketIn category to the channel that is used to publish the PacketIn message. pktConsumers sync.Map + // portStatusConsumerCh is a channel to notify agent a PortStatus message is received + portStatusConsumerCh chan *openflow15.PortStatus + // portStatusMutex is to guard the access on portStatusConsumerCh. + portStatusMutex sync.RWMutex + mpReplyChsMutex sync.RWMutex mpReplyChs map[uint32]chan *openflow15.MultipartReply - // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metedata, + // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metadata, // and value is the length configured in this tunnel metadata. tunMetadataLengthMap map[uint16]uint8 } @@ -718,6 +723,31 @@ func (b *OFBridge) RetryInterval() time.Duration { return b.retryInterval } +func (b *OFBridge) PortStatusRcvd(status *openflow15.PortStatus) { + b.portStatusMutex.RLock() + defer b.portStatusMutex.RUnlock() + if b.portStatusConsumerCh == nil { + return + } + // Correspond to MessageStream.outbound log level. + if klog.V(7).Enabled() { + klog.InfoS("Received PortStatus", "portStatus", status) + } else { + klog.V(4).InfoS("Received PortStatus") + } + switch status.Reason { + // We only process add/modified status for now. + case openflow15.PR_ADD, openflow15.PR_MODIFY: + b.portStatusConsumerCh <- status + } +} + +func (b *OFBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + b.portStatusMutex.Lock() + defer b.portStatusMutex.Unlock() + b.portStatusConsumerCh = statusCh +} + func (b *OFBridge) setPacketInFormatTo2() { b.ofSwitch.SetPacketInFormat(openflow15.OFPUTIL_PACKET_IN_NXT2) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index af5033b0b9a..6cc0ab612da 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -342,6 +342,18 @@ func (mr *MockBridgeMockRecorder) SubscribePacketIn(category, pktInQueue any) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockBridge)(nil).SubscribePacketIn), category, pktInQueue) } +// SubscribePortStatusConsumer mocks base method. +func (m *MockBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribePortStatusConsumer", statusCh) +} + +// SubscribePortStatusConsumer indicates an expected call of SubscribePortStatusConsumer. +func (mr *MockBridgeMockRecorder) SubscribePortStatusConsumer(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePortStatusConsumer", reflect.TypeOf((*MockBridge)(nil).SubscribePortStatusConsumer), statusCh) +} + // MockTable is a mock of Table interface. type MockTable struct { ctrl *gomock.Controller diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 3ce87ad6b10..046e6640ec0 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -575,6 +575,7 @@ func newTester() *cmdAddDelTester { tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), + nil, k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, @@ -743,6 +744,7 @@ func setupChainTest( server = cniserver.New(testSock, "", testNodeConfig, + nil, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, @@ -933,6 +935,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { testSock, "", testNodeConfig, + nil, k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450},