diff --git a/go.mod b/go.mod index 9605140921d..a6b195d652d 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module antrea.io/antrea go 1.23.0 require ( - antrea.io/libOpenflow v0.14.0 - antrea.io/ofnet v0.12.0 + antrea.io/libOpenflow v0.15.0 + antrea.io/ofnet v0.14.0 github.com/ClickHouse/clickhouse-go/v2 v2.6.1 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Mellanox/sriovnet v1.1.0 @@ -113,10 +113,9 @@ require ( github.com/aws/smithy-go v1.12.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenk/hub v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cenkalti/hub v1.0.1 // indirect - github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect + github.com/cenkalti/hub v1.0.2 // indirect + github.com/cenkalti/rpc2 v1.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/go.sum b/go.sum index ba12ed44a0a..52d5f8aacce 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE= -antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0= -antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao= -antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg= +antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI= +antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ= +antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc= +antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA= -github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= -github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= +github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8= +github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU= +github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k= +github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index e44e739fd61..9715f95d491 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -25,6 +25,7 @@ import ( "github.com/containernetworking/cni/pkg/version" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -76,9 +77,11 @@ type podConfigurator struct { // isSecondaryNetwork is true if this instance of podConfigurator is used to configure // Pod secondary network interfaces. isSecondaryNetwork bool + podIfMonitor *podIfaceMonitor } func newPodConfigurator( + kubeClient clientset.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -93,6 +96,7 @@ func newPodConfigurator( if err != nil { return nil, err } + ifMonitor := newPodInterfaceMonitor(kubeClient, ofClient, ifaceStore, podUpdateNotifier) return &podConfigurator{ ovsBridgeClient: ovsBridgeClient, ofClient: ofClient, @@ -101,6 +105,7 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, + podIfMonitor: ifMonitor, }, nil } diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index fe281f06330..b12267fb18f 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -21,11 +21,14 @@ import ( "fmt" current "github.com/containernetworking/cni/pkg/types/100" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" agenttypes "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" ) // connectInterfaceToOVS connects an existing interface to the OVS bridge. @@ -113,3 +116,16 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName) } } + +type podIfaceMonitor struct { +} + +func newPodInterfaceMonitor(_ clientset.Interface, + _ openflow.Client, + _ interfacestore.InterfaceStore, + _ channel.Notifier, +) *podIfaceMonitor { + return &podIfaceMonitor{} +} + +func (pc *podIfaceMonitor) monitorUnReadyInterface(stopCh <-chan struct{}) {} diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 4f51bd4892b..87c2f71b61f 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" ipamtest "antrea.io/antrea/pkg/agent/cniserver/ipam/testing" @@ -682,12 +683,13 @@ func TestDeleteVLANSecondaryInterface(t *testing.T) { } func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator *fakeInterfaceConfigurator) *podConfigurator { + kubeClient := fakeclientset.NewSimpleClientset() gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + configurator, _ := newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 44734e4f20e..ef9ed3ada99 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -18,45 +18,46 @@ package cniserver import ( + "bytes" + "context" + "encoding/json" "fmt" + "sync" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apitypes "k8s.io/apimachinery/pkg/types" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) +const ( + podNotReadyTimeInSeconds = 30 +) + // connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously // in another goroutine. The function is for containerd runtime. The host interface is created after // CNI call completes. func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error { ovsPortName := ifConfig.InterfaceName + // Add the OVS port into unReadyPorts. This operation is performed before we update OVSDB, otherwise we + // need to think about the race condition between the current goroutine with the listener. + // Note, we may add OVS port into "unReadyOVSPorts" map even if the update OVSDB operation is failed, + // because it is also a case that the Pod's networking is not ready. + pc.podIfMonitor.addUnReadyPodInterface(ifConfig) return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error { if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil { return err } - ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true) - if err != nil { - return err - } - containerID := ifConfig.ContainerID - klog.V(2).Infof("Setting up Openflow entries for container %s", containerID) - if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil { - return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err) - } - // Update interface config with the ofPort. - ifConfig.OVSPortConfig.OFPort = ofPort - // Notify the Pod update event to required components. - event := types.PodUpdate{ - PodName: ifConfig.PodName, - PodNamespace: ifConfig.PodNamespace, - IsAdd: true, - ContainerID: ifConfig.ContainerID, - } - pc.podUpdateNotifier.Notify(event) return nil }) } @@ -134,3 +135,169 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte } } } + +type unReadyPodInfo struct { + podName string + podNamespace string + annotated bool + createTime time.Time +} + +type podIfaceMonitor struct { + kubeClient clientset.Interface + ifaceStore interfacestore.InterfaceStore + ofClient openflow.Client + podUpdateNotifier channel.Notifier + + // unReadyInterfaces is a map to store the OVS ports which is waiting for the PortStatus from OpenFlow switch. + // The key in the map is the OVS port name, and its value is unReadyPodInfo. + // It is used only on Windows now. + unReadyInterfaces sync.Map + statusCh chan *openflow15.PortStatus +} + +func newPodInterfaceMonitor(kubeClient clientset.Interface, + ofClient openflow.Client, + ifaceStore interfacestore.InterfaceStore, + podUpdateNotifier channel.Notifier, +) *podIfaceMonitor { + statusCh := make(chan *openflow15.PortStatus) + ofClient.SubscribeOFPortStatusMessage(statusCh) + return &podIfaceMonitor{ + kubeClient: kubeClient, + ofClient: ofClient, + ifaceStore: ifaceStore, + podUpdateNotifier: podUpdateNotifier, + unReadyInterfaces: sync.Map{}, + statusCh: statusCh, + } +} + +func (m *podIfaceMonitor) monitorUnReadyInterface(stopCh <-chan struct{}) { + klog.Info("Started the monitor to wait for new OpenFlow ports") + go func() { + for { + select { + case <-stopCh: + return + case status := <-m.statusCh: + klog.V(2).InfoS("Received PortStatus message", "message", status) + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + if status.Desc.State == openflow15.PS_LIVE { + m.updateUnReadyPod(status) + } + case <-time.Tick(time.Second * 5): + m.checkUnReadyPods() + } + } + }() +} + +func (m *podIfaceMonitor) updatePodFlows(ifName string, ofPort int32) error { + ifConfig, found := m.ifaceStore.GetInterfaceByName(ifName) + if !found { + klog.Info("Interface config is not found", "name", ifName) + return nil + } + containerID := ifConfig.ContainerID + + // Update interface config with the ofPort. + ifConfig.OVSPortConfig.OFPort = ofPort + m.ifaceStore.UpdateInterface(ifConfig) + + // Install OpenFlow entries for the Pod. + klog.V(2).Infof("Setting up Openflow entries for container %s", containerID) + if err := m.ofClient.InstallPodFlows(ifName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil { + return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err) + } + + // Notify the Pod update event to required components. + event := types.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + m.podUpdateNotifier.Notify(event) + + // Remove the annotation from Pod if exists. + m.updatePodUnreadyAnnotation(ifConfig.PodNamespace, ifConfig.PodName, false) + return nil +} + +func (m *podIfaceMonitor) updatePodUnreadyAnnotation(podNamespace, podName string, addAnnotation bool) { + pod, err := m.kubeClient.CoreV1().Pods(podNamespace).Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + klog.ErrorS(err, "Failed to get Pod when trying to update 'unready' annotations", "Namespace", podNamespace, "Name", podName) + return + } + + annotated := false + if pod.Annotations != nil { + _, annotated = pod.Annotations[types.PodNotReadyAnnotationKey] + } + + if addAnnotation && !annotated { + // Add the annotation on Pod with '"pod.antrea.io/not-ready": ""' + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{types.PodNotReadyAnnotationKey: ""}, + }, + }) + m.kubeClient.CoreV1().Pods(podNamespace).Patch(context.Background(), podName, apitypes.MergePatchType, patch, metav1.PatchOptions{}) + } else if !addAnnotation && annotated { + // Remove the annotation on Pod with '"pod.antrea.io/not-ready": ""' + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{types.PodNotReadyAnnotationKey: nil}, + }, + }) + m.kubeClient.CoreV1().Pods(podNamespace).Patch(context.Background(), podName, apitypes.MergePatchType, patch, metav1.PatchOptions{}) + } +} + +func (m *podIfaceMonitor) updateUnReadyPod(status *openflow15.PortStatus) { + ovsPort := string(bytes.Trim(status.Desc.Name, "\x00")) + obj, found := m.unReadyInterfaces.Load(ovsPort) + if !found { + klog.InfoS("OVS port is not found", "ovsPort", ovsPort) + return + } + podInfo := obj.(*unReadyPodInfo) + ofPort := status.Desc.PortNo + if err := m.updatePodFlows(ovsPort, int32(ofPort)); err != nil { + klog.ErrorS(err, "Failed to update Pod's OpenFlow entries", "PodName", podInfo.podName, "PodNamespace", podInfo.podNamespace, "OVSPort", ovsPort) + return + } + // Delete the Pod from unReadyPods + m.unReadyInterfaces.Delete(ovsPort) +} + +func (m *podIfaceMonitor) checkUnReadyPods() { + m.unReadyInterfaces.Range(func(key, value any) bool { + podInfo := value.(*unReadyPodInfo) + if !podInfo.annotated && time.Now().Sub(podInfo.createTime).Seconds() > podNotReadyTimeInSeconds { + m.updatePodUnreadyAnnotation(podInfo.podNamespace, podInfo.podName, true) + podInfo.annotated = true + m.unReadyInterfaces.Store(key, podInfo) + } + return true + }) +} + +func (m *podIfaceMonitor) addUnReadyPodInterface(ifConfig *interfacestore.InterfaceConfig) { + klog.InfoS("Added OVS port into unready interfaces", "ovsPort", ifConfig.InterfaceName, + "podName", ifConfig.PodName, "podNamespace", ifConfig.PodNamespace) + m.unReadyInterfaces.Store(ifConfig.InterfaceName, &unReadyPodInfo{ + podName: ifConfig.PodName, + podNamespace: ifConfig.PodNamespace, + annotated: false, + createTime: time.Now(), + }) +} + +// getPortStatusCh returns the channel used to receive OpenFlow.PortStatus message. +// This function is added for test. +func (m *podIfaceMonitor) getPortStatusCh() chan *openflow15.PortStatus { + return m.statusCh +} diff --git a/pkg/agent/cniserver/pod_configuration_windows_test.go b/pkg/agent/cniserver/pod_configuration_windows_test.go new file mode 100644 index 00000000000..9e0e14e96b5 --- /dev/null +++ b/pkg/agent/cniserver/pod_configuration_windows_test.go @@ -0,0 +1,333 @@ +//go:build windows +// +build windows + +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "antrea.io/libOpenflow/openflow15" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakeclientset "k8s.io/client-go/kubernetes/fake" + + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/types" +) + +var ( + fakeOFClient *openflowtest.MockClient + + podIPs = []net.IP{net.ParseIP("192.168.9.10")} + podMac, _ = net.ParseMAC("00:15:5D:B2:6F:38") + podIfName = "test" + podName = "iis-7b544f899f-kqdh6" + podNamespace = "default" + podInfraContainerID = "261a1970-5b6c-11ed-8caf-000c294e5d03" + podIfaceConfig = &interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: podNamespace, + PodName: podName, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + } +) + +func TestUpdateUnReadyPod(t *testing.T) { + portStatusMsg := &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte("test"), + State: openflow15.PS_LIVE, + }, + } + + for _, tc := range []struct { + name string + podIfaceUnReady bool + podIfaceIsCached bool + installOpenFlow bool + installOpenFlowErr error + ifConfigUpdated bool + }{ + { + name: "updated Port is not in unready state", + podIfaceUnReady: false, + podIfaceIsCached: false, + installOpenFlow: false, + }, { + name: "updated Port is not cached", + podIfaceUnReady: true, + podIfaceIsCached: false, + installOpenFlow: false, + }, { + name: "failed to install OpenFlow entries for updated Port", + podIfaceUnReady: true, + podIfaceIsCached: true, + ifConfigUpdated: true, + installOpenFlow: true, + installOpenFlowErr: fmt.Errorf("failure to install flow"), + }, { + name: "succeeded", + podIfaceUnReady: true, + podIfaceIsCached: true, + ifConfigUpdated: true, + installOpenFlow: true, + installOpenFlowErr: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + Annotations: map[string]string{ + types.PodNotReadyAnnotationKey: "", + }, + }, + } + fakeKubeClient := fakeclientset.NewClientset(pod) + fakeOFClient = openflowtest.NewMockClient(controller) + fakeOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).Times(1) + fakeIfaceStore := interfacestore.NewInterfaceStore() + waiter := newAsyncWaiter(podName, podInfraContainerID) + monitor := newPodInterfaceMonitor(fakeKubeClient, fakeOFClient, fakeIfaceStore, waiter.notifier) + updated := false + if tc.podIfaceIsCached { + fakeIfaceStore.AddInterface(podIfaceConfig) + } + if tc.podIfaceUnReady { + monitor.addUnReadyPodInterface(podIfaceConfig) + } + if tc.installOpenFlow { + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + if tc.installOpenFlowErr == nil { + updated = true + } + } + + monitor.updateUnReadyPod(portStatusMsg) + + if tc.ifConfigUpdated { + actCfg, found := fakeIfaceStore.GetContainerInterface(podIfaceConfig.ContainerID) + assert.True(t, found) + assert.Equal(t, int32(portStatusMsg.Desc.PortNo), actCfg.OVSPortConfig.OFPort) + } + if updated { + waiter.wait() + annotated, err := checkAnnotation(fakeKubeClient, podNamespace, podName) + require.NoError(t, err) + require.False(t, annotated) + _, found := monitor.unReadyInterfaces.Load(podIfName) + require.False(t, found) + } + }) + } +} + +func TestCheckUnReadyPods(t *testing.T) { + for _, tc := range []struct { + name string + existingPod *corev1.Pod + podInfo *unReadyPodInfo + annotated bool + }{ + { + name: "unready Pod is already annotated", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + Annotations: map[string]string{ + types.PodNotReadyAnnotationKey: "", + }, + }, + }, + podInfo: &unReadyPodInfo{ + podName: podName, + podNamespace: podNamespace, + annotated: true, + }, + annotated: true, + }, { + name: "unready Pod is not annotated and sync time is not up", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + }, + }, + podInfo: &unReadyPodInfo{ + podName: podName, + podNamespace: podNamespace, + annotated: false, + createTime: time.Now(), + }, + annotated: false, + }, { + name: "annotate unready Pod", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + }, + }, + podInfo: &unReadyPodInfo{ + podName: podName, + podNamespace: podNamespace, + annotated: false, + createTime: time.Now().Add((-40) * time.Second), + }, + annotated: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + fakeKubeClient := fakeclientset.NewClientset(tc.existingPod) + fakeOFClient = openflowtest.NewMockClient(controller) + fakeOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).Times(1) + fakeIfaceStore := interfacestore.NewInterfaceStore() + monitor := newPodInterfaceMonitor(fakeKubeClient, fakeOFClient, fakeIfaceStore, nil) + monitor.unReadyInterfaces.Store(podIfName, tc.podInfo) + + monitor.checkUnReadyPods() + + obj, found := monitor.unReadyInterfaces.Load(podIfName) + require.True(t, found) + newPodInfo, _ := obj.(*unReadyPodInfo) + assert.Equal(t, tc.annotated, newPodInfo.annotated) + + annotated, err := checkAnnotation(fakeKubeClient, tc.existingPod.Namespace, tc.existingPod.Name) + require.NoError(t, err) + assert.Equal(t, tc.annotated, annotated) + }) + } +} + +func TestUpdatePodUnreadyAnnotation(t *testing.T) { + for _, tc := range []struct { + name string + existingPod *corev1.Pod + addAnnotation bool + annotated bool + }{ + { + name: "Pod is already annotated", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + Annotations: map[string]string{ + types.PodNotReadyAnnotationKey: "", + }, + }, + }, + addAnnotation: true, + annotated: true, + }, { + name: "Pod needs to add annotation", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + Annotations: map[string]string{ + "unrelated": "", + }, + }, + }, + addAnnotation: true, + annotated: true, + }, { + name: "Pod needs to add annotation, annotations field doesn't exist", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + Annotations: map[string]string{ + "unrelated": "", + }, + }, + }, + addAnnotation: true, + annotated: true, + }, { + name: "Pod has removed annotation", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + }, + }, + addAnnotation: false, + annotated: false, + }, { + name: "Pod needs to remove annotation", + existingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + Annotations: map[string]string{ + types.PodNotReadyAnnotationKey: "", + }, + }, + }, + addAnnotation: false, + annotated: false, + }, + } { + controller := gomock.NewController(t) + fakeKubeClient := fakeclientset.NewClientset(tc.existingPod) + fakeOFClient = openflowtest.NewMockClient(controller) + fakeOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).Times(1) + fakeIfaceStore := interfacestore.NewInterfaceStore() + monitor := newPodInterfaceMonitor(fakeKubeClient, fakeOFClient, fakeIfaceStore, nil) + monitor.updatePodUnreadyAnnotation(tc.existingPod.Namespace, tc.existingPod.Name, tc.addAnnotation) + + annotated, err := checkAnnotation(fakeKubeClient, tc.existingPod.Namespace, tc.existingPod.Name) + require.NoError(t, err) + assert.Equal(t, tc.annotated, annotated) + } +} + +func checkAnnotation(kubeClient *fakeclientset.Clientset, namespace, name string) (bool, error) { + updatedPod, err := kubeClient.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if len(updatedPod.Annotations) == 0 { + return false, nil + } + _, annotated := updatedPod.Annotations[types.PodNotReadyAnnotationKey] + return annotated, nil +} diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index a4176baba15..807c6bc9462 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -26,7 +26,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient, interfaceStore interfacestore.InterfaceStore) (*podConfigurator, error) { - pc, err := newPodConfigurator(ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil) + pc, err := newPodConfigurator(nil, ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil) if err == nil { pc.isSecondaryNetwork = true } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index bc77dcc7224..64f770bc05d 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -660,7 +660,7 @@ func (s *CNIServer) Initialize( ) error { var err error s.podConfigurator, err = newPodConfigurator( - ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, + s.kubeClient, ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), s.disableTXChecksumOffload, podUpdateNotifier) if err != nil { @@ -676,6 +676,8 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { klog.InfoS("Starting CNI server") defer klog.InfoS("Shutting down CNI server") + s.podConfigurator.podIfMonitor.monitorUnReadyInterface(stopCh) + listener, err := util.ListenLocalSocket(s.cniSocket) if err != nil { klog.Fatalf("Failed to bind on %s: %v", s.cniSocket, err) diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 856024dd32c..6afcfbeadf3 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -95,7 +95,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -106,7 +106,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -114,12 +114,13 @@ func TestValidatePrevResult(t *testing.T) { func TestRemoveInterface(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + podConfigurator, err := newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -190,6 +191,7 @@ func TestRemoveInterface(t *testing.T) { } func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ipam.IPAMDriver, ipamType string, enableSecondaryNetworkIPAM, isChaining bool) *CNIServer { + kubeClient := fakeclientset.NewClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -201,7 +203,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} @@ -612,18 +614,18 @@ func TestCmdCheck(t *testing.T) { func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewClientset(pod1, pod2, pod3) mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 70487b083e8..91e8c329d3e 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -22,13 +22,15 @@ import ( "testing" "time" + "antrea.io/libOpenflow/openflow15" "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - "k8s.io/apimachinery/pkg/util/wait" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -43,6 +45,7 @@ import ( "antrea.io/antrea/pkg/agent/util" winnettest "antrea.io/antrea/pkg/agent/util/winnet/testing" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" + "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" ) @@ -173,17 +176,15 @@ type hnsTestUtil struct { hnsEndpoint *hcsshim.HNSEndpoint hcnEndpoint *hcn.HostComputeEndpoint isDocker bool - isAttached bool hnsEndpointCreatErr error endpointAttachErr error } -func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker, isAttached bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { +func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { return &hnsTestUtil{ endpointID: endpointID, existingHnsEndpoints: existingHnsEndpoints, isDocker: isDocker, - isAttached: isAttached, hnsEndpointCreatErr: hnsEndpointCreatErr, endpointAttachErr: endpointAttachErr, } @@ -217,9 +218,6 @@ func (t *hnsTestUtil) deleteHnsEndpoint(endpoint *hcsshim.HNSEndpoint) (*hcsshim func (t *hnsTestUtil) attachEndpointInNamespace(ep *hcn.HostComputeEndpoint, namespace string) error { t.hcnEndpoint.HostComputeNamespace = namespace - if t.endpointAttachErr == nil { - t.addHostInterface() - } return t.endpointAttachErr } @@ -258,6 +256,7 @@ func (t *hnsTestUtil) addHostInterface() { } func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { + kubeClient := fakeclientset.NewClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) mockRoute = routetest.NewMockInterface(controller) @@ -269,7 +268,8 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) cniServer.podConfigurator.ifConfigurator.(*ifConfigurator).winnet = mockWinnet return cniServer } @@ -315,7 +315,6 @@ func TestCmdAdd(t *testing.T) { hnsEndpointCreateErr error endpointAttachErr error ifaceExist bool - isAttached bool existingHnsEndpoints []hcsshim.HNSEndpoint endpointExists bool connectOVS bool @@ -341,7 +340,6 @@ func TestCmdAdd(t *testing.T) { oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, - isAttached: true, }, { name: "containerd-attach-failure", podName: "pod10", @@ -364,13 +362,21 @@ func TestCmdAdd(t *testing.T) { ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) + stopCh := make(chan struct{}) + defer func() { + stopCh <- struct{}{} + }() isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) server := newMockCNIServer(t, controller, waiter.notifier) + server.podConfigurator.podIfMonitor.kubeClient.CoreV1().Pods(testPodNamespace).Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: tc.podName, Namespace: testPodNamespace}}, metav1.CreateOptions{}) + server.podConfigurator.podIfMonitor.monitorUnReadyInterface(stopCh) + requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) @@ -389,10 +395,31 @@ func TestCmdAdd(t *testing.T) { } ovsPortID := generateUUID() if tc.connectOVS { + ofPortNumber := uint32(100) + portStatusCh := server.podConfigurator.podIfMonitor.getPortStatusCh() mockOVSBridgeClient.EXPECT().CreatePort(ovsPortName, ovsPortName, gomock.Any()).Return(ovsPortID, nil).Times(1) - mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(ovsPortName, true).Return(int32(100), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + go func() { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + select { + case <-time.After(time.Millisecond * 50): + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: ofPortNumber, + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + } + }() + return nil + }, + ) + mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), uint32(ofPortNumber), gomock.Any(), gomock.Any()).Return(nil) mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) } resp, err := server.CmdAdd(ctx, requestMsg) @@ -421,17 +448,8 @@ func TestCmdAdd(t *testing.T) { _, exists := ifaceStore.GetContainerInterface(containerID) assert.Equal(t, exists, tc.containerIfaceExist) if tc.connectOVS { + testUtil.addHostInterface() waiter.wait() - // Wait for the completion of async function "setInterfaceMTUFunc", otherwise it may lead to the - // race condition failure. - wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, - func(ctx context.Context) (done bool, err error) { - mtuSet, exist := hostIfaces.Load(ovsPortName) - if !exist { - return false, nil - } - return mtuSet.(bool), nil - }) } waiter.close() }) @@ -491,7 +509,7 @@ func TestCmdDel(t *testing.T) { if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) } - testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, true, nil, nil) + testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, nil, nil) testUtil.setFunctions() defer testUtil.restore() waiter := newAsyncWaiter(testPodNameA, containerID) @@ -724,29 +742,35 @@ func newAsyncWaiter(podName, containerID string) *asyncWaiter { func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewClientset(pod1, pod2, pod3) mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) + stopCh := make(chan struct{}) + defer func() { + stopCh <- struct{}{} + }() defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") - testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) + testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() defer testUtil.restore() + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} + cniServer.podConfigurator.podIfMonitor.monitorUnReadyInterface(stopCh) // Re-install Pod1 flows podFlowsInstalled := make(chan string, 2) @@ -760,8 +784,26 @@ func TestReconcile(t *testing.T) { mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + select { + case <-time.After(time.Millisecond * 50): + portStatusCh := cniServer.podConfigurator.podIfMonitor.getPortStatusCh() + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: uint32(5), + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + } + return nil + }, + ) mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index bba8cf2068a..5971ee4f008 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -105,6 +105,11 @@ func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) { } } +// UpdateInterface updates interfaceConfig into local cache. +func (c *interfaceCache) UpdateInterface(interfaceConfig *InterfaceConfig) { + c.cache.Update(interfaceConfig) +} + // DeleteInterface deletes interface from local cache. func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { c.cache.Delete(interfaceConfig) diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index cf6ad7f2c2d..64347d6709b 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -278,3 +278,15 @@ func (mr *MockInterfaceStoreMockRecorder) ListInterfaces() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListInterfaces", reflect.TypeOf((*MockInterfaceStore)(nil).ListInterfaces)) } + +// UpdateInterface mocks base method. +func (m *MockInterfaceStore) UpdateInterface(interfaceConfig *interfacestore.InterfaceConfig) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateInterface", interfaceConfig) +} + +// UpdateInterface indicates an expected call of UpdateInterface. +func (mr *MockInterfaceStoreMockRecorder) UpdateInterface(interfaceConfig any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInterface", reflect.TypeOf((*MockInterfaceStore)(nil).UpdateInterface), interfaceConfig) +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 9d0ca2afe57..7f0b26c9fc7 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -113,6 +113,7 @@ type InterfaceConfig struct { type InterfaceStore interface { Initialize(interfaces []*InterfaceConfig) AddInterface(interfaceConfig *InterfaceConfig) + UpdateInterface(interfaceConfig *InterfaceConfig) ListInterfaces() []*InterfaceConfig DeleteInterface(interfaceConfig *InterfaceConfig) GetInterface(interfaceKey string) (*InterfaceConfig, bool) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e6e77d0cebd..4cd1134a457 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -408,6 +408,9 @@ type Client interface { // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error + + // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. + SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) } // GetFlowTableStatus returns an array of flow table status. @@ -1697,3 +1700,7 @@ func (c *client) getMeterStats() { klog.ErrorS(err, "Failed to get OVS meter stats") } } + +func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + c.bridge.SubscribePortStatusConsumer(statusCh) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 658193b1220..ab5bd20dcca 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2906,3 +2906,14 @@ func TestCachedFlowIsDrop(t *testing.T) { msg = flows[0].GetMessage().(*openflow15.FlowMod) assert.False(t, isDropFlow(msg)) } + +func TestSubscribeOFPortStatusMessage(t *testing.T) { + ctrl := gomock.NewController(t) + ch := make(chan *openflow15.PortStatus) + bridge := ovsoftest.NewMockBridge(ctrl) + c := client{ + bridge: bridge, + } + bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) + c.SubscribeOFPortStatusMessage(ch) +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index bd3ee7b835a..eafaa90f344 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -36,6 +36,7 @@ import ( openflow0 "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" ofctrl "antrea.io/ofnet/ofctrl" @@ -849,6 +850,18 @@ func (mr *MockClientMockRecorder) StartPacketInHandler(stopCh any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartPacketInHandler", reflect.TypeOf((*MockClient)(nil).StartPacketInHandler), stopCh) } +// SubscribeOFPortStatusMessage mocks base method. +func (m *MockClient) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribeOFPortStatusMessage", statusCh) +} + +// SubscribeOFPortStatusMessage indicates an expected call of SubscribeOFPortStatusMessage. +func (mr *MockClientMockRecorder) SubscribeOFPortStatusMessage(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeOFPortStatusMessage", reflect.TypeOf((*MockClient)(nil).SubscribeOFPortStatusMessage), statusCh) +} + // SubscribePacketIn mocks base method. func (m *MockClient) SubscribePacketIn(reason uint8, pktInQueue *openflow0.PacketInQueue) error { m.ctrl.T.Helper() diff --git a/pkg/agent/types/annotations.go b/pkg/agent/types/annotations.go index cc74150f280..cc685682229 100644 --- a/pkg/agent/types/annotations.go +++ b/pkg/agent/types/annotations.go @@ -41,4 +41,8 @@ const ( // L7FlowExporterAnnotationKey is the key of the L7 network flow export annotation that enables L7 network flow export for annotated Pod or Namespace based on the value of annotation which is direction of traffic. L7FlowExporterAnnotationKey string = "visibility.antrea.io/l7-export" + + // PodNotReadyAnnotationKey represents the key of the Pod annotation that specifies the Pod's networking is not ready. + // This annotation is used only on Windows. + PodNotReadyAnnotationKey string = "pod.antrea.io/not-ready" ) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 1c5c60fd22d..edf8ced802b 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -146,6 +146,9 @@ type Bridge interface { ResumePacket(packetIn *ofctrl.PacketIn) error // BuildPacketOut returns a new PacketOutBuilder. BuildPacketOut() PacketOutBuilder + // SubscribePortStatusConsumer registers a consumer to listen to OpenFlow PortStatus message. + // We only support a single consumer for now. + SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) } // TableStatus represents the status of a specific flow table. The status is useful for debugging. diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c0f6ede4daa..e2f60e098ca 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -202,6 +202,9 @@ type OFBridge struct { // pktConsumers is a map from PacketIn category to the channel that is used to publish the PacketIn message. pktConsumers sync.Map + // portStatusConsumerCh is a channel to notify agent a PortStatus message is received + portStatusConsumerCh chan *openflow15.PortStatus + mpReplyChsMutex sync.RWMutex mpReplyChs map[uint32]chan *openflow15.MultipartReply // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metedata, @@ -718,6 +721,27 @@ func (b *OFBridge) RetryInterval() time.Duration { return b.retryInterval } +func (b *OFBridge) PortStatusRcvd(status *openflow15.PortStatus) { + if b.portStatusConsumerCh == nil { + return + } + // Correspond to MessageStream.outbound log level. + if klog.V(7).Enabled() { + klog.InfoS("Received PortStatus", "portStatus", status) + } else { + klog.V(4).InfoS("Received PortStatus") + } + switch status.Reason { + // We only process add/modified status for now. + case openflow15.PR_ADD, openflow15.PR_MODIFY: + b.portStatusConsumerCh <- status + } +} + +func (b *OFBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + b.portStatusConsumerCh = statusCh +} + func (b *OFBridge) setPacketInFormatTo2() { b.ofSwitch.SetPacketInFormat(openflow15.OFPUTIL_PACKET_IN_NXT2) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index af5033b0b9a..6cc0ab612da 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -342,6 +342,18 @@ func (mr *MockBridgeMockRecorder) SubscribePacketIn(category, pktInQueue any) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockBridge)(nil).SubscribePacketIn), category, pktInQueue) } +// SubscribePortStatusConsumer mocks base method. +func (m *MockBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribePortStatusConsumer", statusCh) +} + +// SubscribePortStatusConsumer indicates an expected call of SubscribePortStatusConsumer. +func (mr *MockBridgeMockRecorder) SubscribePortStatusConsumer(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePortStatusConsumer", reflect.TypeOf((*MockBridge)(nil).SubscribePortStatusConsumer), statusCh) +} + // MockTable is a mock of Table interface. type MockTable struct { ctrl *gomock.Controller