diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 1bef425d4b5..28f8d607528 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -20,6 +20,7 @@ import ( "net" "time" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/sets" @@ -78,6 +79,7 @@ import ( "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/podstore" + utilwait "antrea.io/antrea/pkg/util/wait" "antrea.io/antrea/pkg/version" ) @@ -219,9 +221,12 @@ func run(o *Options) error { // Create an ifaceStore that caches network interfaces managed by this node. ifaceStore := interfacestore.NewInterfaceStore() - // networkReadyCh is used to notify that the Node's network is ready. - // Functions that rely on the Node's network should wait for the channel to close. - networkReadyCh := make(chan struct{}) + // podNetworkWait is used to wait and notify that preconditions for Pod network are ready. + // Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement + // it when finished. + // Processes that enable Pod network should wait for it. + podNetworkWait := utilwait.NewGroup() + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. @@ -266,7 +271,7 @@ func run(o *Options) error { wireguardConfig, egressConfig, serviceConfig, - networkReadyCh, + podNetworkWait, stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, @@ -446,6 +451,7 @@ func run(o *Options) error { antreaClientProvider, ofClient, ifaceStore, + afero.NewOsFs(), nodeKey, podUpdateChannel, externalEntityUpdateChannel, @@ -465,6 +471,7 @@ func run(o *Options) error { gwPort, tunPort, nodeConfig, + podNetworkWait, ) if err != nil { return fmt.Errorf("error creating new NetworkPolicy controller: %v", err) @@ -536,7 +543,7 @@ func run(o *Options) error { enableAntreaIPAM, o.config.DisableTXChecksumOffload, networkConfig, - networkReadyCh) + podNetworkWait) if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { cniPodInfoStore = cnipodcache.NewCNIPodInfoStore() diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 6f5a453db93..8a368bcfc32 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -23,7 +23,6 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/containernetworking/plugins/pkg/ip" @@ -58,6 +57,7 @@ import ( "antrea.io/antrea/pkg/util/env" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -119,9 +119,9 @@ type Initializer struct { l7NetworkPolicyConfig *config.L7NetworkPolicyConfig enableL7NetworkPolicy bool connectUplinkToBridge bool - // networkReadyCh should be closed once the Node's network is ready. + // podNetworkWait should be decremented once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - networkReadyCh chan<- struct{} + podNetworkWait *utilwait.Group stopCh <-chan struct{} nodeType config.NodeType externalNodeNamespace string @@ -142,7 +142,7 @@ func NewInitializer( wireGuardConfig *config.WireGuardConfig, egressConfig *config.EgressConfig, serviceConfig *config.ServiceConfig, - networkReadyCh chan<- struct{}, + podNetworkWait *utilwait.Group, stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, @@ -165,7 +165,7 @@ func NewInitializer( egressConfig: egressConfig, serviceConfig: serviceConfig, l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, stopCh: stopCh, nodeType: nodeType, externalNodeNamespace: externalNodeNamespace, @@ -403,9 +403,6 @@ func (i *Initializer) restorePortConfigs() error { // Initialize sets up agent initial configurations. func (i *Initializer) Initialize() error { klog.Info("Setting up node network") - // wg is used to wait for the asynchronous initialization. - var wg sync.WaitGroup - if err := i.initNodeLocalConfig(); err != nil { return err } @@ -481,10 +478,10 @@ func (i *Initializer) Initialize() error { } if i.nodeType == config.K8sNode { - wg.Add(1) + i.podNetworkWait.Increment() // routeClient.Initialize() should be after i.setupOVSBridge() which // creates the host gateway interface. - if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil { + if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil { return err } @@ -492,12 +489,6 @@ func (i *Initializer) Initialize() error { if err := i.initOpenFlowPipeline(); err != nil { return err } - - // The Node's network is ready only when both synchronous and asynchronous initialization are done. - go func() { - wg.Wait() - close(i.networkReadyCh) - }() } else { // Install OpenFlow entries on OVS bridge. if err := i.initOpenFlowPipeline(); err != nil { diff --git a/pkg/agent/cniserver/interface_configuration_linux_test.go b/pkg/agent/cniserver/interface_configuration_linux_test.go index 1452dee54c3..e9672866a1b 100644 --- a/pkg/agent/cniserver/interface_configuration_linux_test.go +++ b/pkg/agent/cniserver/interface_configuration_linux_test.go @@ -127,7 +127,7 @@ func (ns *fakeNS) clear() { } func createNS(t *testing.T, waitForComplete bool) *fakeNS { - nsPath := generateUUID(t) + nsPath := generateUUID() fakeNs := &fakeNS{path: nsPath, fd: uintptr(unsafe.Pointer(&nsPath)), waitCompleted: waitForComplete, stopCh: make(chan struct{})} validNSs.Store(nsPath, fakeNs) return fakeNs diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 1e0782dfe88..855ff506dc1 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" + "antrea.io/antrea/pkg/util/wait" ) type vethPair struct { @@ -416,7 +417,7 @@ func parsePrevResult(conf *types.NetworkConfig) error { return nil } -func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error { +func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error { // desiredPods is the set of Pods that should be present, based on the // current list of Pods got from the Kubernetes API. desiredPods := sets.New[string]() @@ -441,21 +442,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain missingIfConfigs = append(missingIfConfigs, containerConfig) continue } - // This interface matches an existing Pod. - // We rely on the interface cache / store - which is initialized from the persistent - // OVSDB - to map the Pod to its interface configuration. The interface - // configuration includes the parameters we need to replay the flows. - klog.V(4).Infof("Syncing interface %s for Pod %s", containerConfig.InterfaceName, namespacedName) - if err := pc.ofClient.InstallPodFlows( - containerConfig.InterfaceName, - containerConfig.IPs, - containerConfig.MAC, - uint32(containerConfig.OFPort), - containerConfig.VLANID, - nil, - ); err != nil { - klog.Errorf("Error when re-installing flows for Pod %s", namespacedName) - } + go func(containerID, pod, namespace string) { + // Do not install Pod flows until all preconditions are met. + podNetworkWait.Wait() + // To avoid race condition with CNIServer CNI event handlers. + containerAccess.lockContainer(containerID) + defer containerAccess.unlockContainer(containerID) + + containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID) + if !exists { + klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, pod), "containerID", containerID) + return + } + // This interface matches an existing Pod. + // We rely on the interface cache / store - which is initialized from the persistent + // OVSDB - to map the Pod to its interface configuration. The interface + // configuration includes the parameters we need to replay the flows. + klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, pod), "iface", containerConfig.InterfaceName) + if err := pc.ofClient.InstallPodFlows( + containerConfig.InterfaceName, + containerConfig.IPs, + containerConfig.MAC, + uint32(containerConfig.OFPort), + containerConfig.VLANID, + nil, + ); err != nil { + klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, pod)) + } + }(containerConfig.ContainerID, containerConfig.PodName, containerConfig.PodNamespace) } else { // clean-up and delete interface klog.V(4).Infof("Deleting interface %s", containerConfig.InterfaceName) diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 66b94f7f6e9..206b7cd7801 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -136,7 +136,7 @@ func TestConnectInterceptedInterface(t *testing.T) { testPodName := "test-pod" podNamespace := testPodNamespace hostInterfaceName := util.GenerateContainerInterfaceName(testPodName, testPodNamespace, testPodInfraContainerID) - containerID := generateUUID(t) + containerID := generateUUID() containerNetNS := "container-ns" containerDev := "eth0" @@ -210,7 +210,7 @@ func TestConnectInterceptedInterface(t *testing.T) { if tc.migratedRoute { mockRoute.EXPECT().MigrateRoutesToGw(hostInterfaceName).Return(tc.migrateRouteErr) } - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.connectedOVS { mockOVSBridgeClient.EXPECT().CreatePort(hostInterfaceName, gomock.Any(), gomock.Any()).Return(ovsPortID, tc.createOVSPortErr).Times(1) if tc.createOVSPortErr == nil { @@ -239,7 +239,7 @@ func TestConnectInterceptedInterface(t *testing.T) { func TestCreateOVSPort(t *testing.T) { controller := gomock.NewController(t) - containerID := generateUUID(t) + containerID := generateUUID() podName := "p0" podNamespace := testPodNamespace @@ -271,10 +271,10 @@ func TestCreateOVSPort(t *testing.T) { containerConfig := buildContainerConfig(tc.portName, containerID, podName, podNamespace, ¤t.Interface{Mac: "01:02:03:04:05:06"}, ipamResult.IPs, tc.vlanID) attachInfo := BuildOVSPortExternalIDs(containerConfig) if tc.createOVSPort { - mockOVSBridgeClient.EXPECT().CreatePort(tc.portName, tc.portName, attachInfo).Times(1).Return(generateUUID(t), nil) + mockOVSBridgeClient.EXPECT().CreatePort(tc.portName, tc.portName, attachInfo).Times(1).Return(generateUUID(), nil) } if tc.createOVSAccessPort { - mockOVSBridgeClient.EXPECT().CreateAccessPort(tc.portName, tc.portName, attachInfo, tc.vlanID).Times(1).Return(generateUUID(t), nil) + mockOVSBridgeClient.EXPECT().CreateAccessPort(tc.portName, tc.portName, attachInfo, tc.vlanID).Times(1).Return(generateUUID(), nil) } _, err := podConfigurator.createOVSPort(tc.portName, attachInfo, tc.vlanID) assert.NoError(t, err) @@ -283,8 +283,8 @@ func TestCreateOVSPort(t *testing.T) { } func TestParseOVSPortInterfaceConfig(t *testing.T) { - containerID := generateUUID(t) - portUUID := generateUUID(t) + containerID := generateUUID() + portUUID := generateUUID() ofPort := int32(1) containerIPs := "1.1.1.2,aabb:1122::101:102" parsedIPs := []net.IP{net.ParseIP("1.1.1.2"), net.ParseIP("aabb:1122::101:102")} @@ -398,14 +398,14 @@ func TestParseOVSPortInterfaceConfig(t *testing.T) { func TestCheckHostInterface(t *testing.T) { controller := gomock.NewController(t) hostIfaceName := "port1" - containerID := generateUUID(t) + containerID := generateUUID() containerIntf := ¤t.Interface{Name: ifname, Sandbox: netns, Mac: "01:02:03:04:05:06"} interfaces := []*current.Interface{containerIntf, {Name: hostIfaceName}} containeIPs := ipamResult.IPs ifaceMAC, _ := net.ParseMAC("01:02:03:04:05:06") containerInterface := interfacestore.NewContainerInterface(hostIfaceName, containerID, "pod1", testPodNamespace, ifaceMAC, []net.IP{containerIP}, 1) containerInterface.OVSPortConfig = &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), + PortUUID: generateUUID(), OFPort: int32(10), } @@ -454,7 +454,7 @@ func TestCheckHostInterface(t *testing.T) { func TestConfigureSriovSecondaryInterface(t *testing.T) { controller := gomock.NewController(t) - containerID := generateUUID(t) + containerID := generateUUID() containerNS := "containerNS" for _, tc := range []struct { diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index dc3135d5123..04f1ed1e018 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -45,6 +45,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -118,8 +119,8 @@ type CNIServer struct { disableTXChecksumOffload bool secondaryNetworkEnabled bool networkConfig *config.NetworkConfig - // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. - networkReadyCh <-chan struct{} + // podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. + podNetworkWait *wait.Group } var supportedCNIVersionSet map[string]bool @@ -441,11 +442,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* return resp, err } - select { - case <-time.After(networkReadyTimeout): - klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) + if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil { + klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) return s.tryAgainLaterResponse(), nil - case <-s.networkReadyCh: } result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}} @@ -634,7 +633,7 @@ func New( routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, networkConfig *config.NetworkConfig, - networkReadyCh <-chan struct{}, + podNetworkWait *wait.Group, ) *CNIServer { return &CNIServer{ cniSocket: cniSocket, @@ -650,7 +649,7 @@ func New( disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, networkConfig: networkConfig, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, } } @@ -773,7 +772,7 @@ func (s *CNIServer) reconcile() error { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - return s.podConfigurator.reconcile(pods.Items, s.containerAccess) + return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait) } func init() { diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 9acabb19443..9eb2f6b808c 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "testing" + "time" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" @@ -27,9 +28,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -348,7 +346,7 @@ func TestCmdAdd(t *testing.T) { if tc.addLocalIPAMRoute { mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(tc.addLocalIPAMRouteError).Times(1) } - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.connectOVS { mockOVSBridgeClient.EXPECT().CreatePort(hostInterfaceName, gomock.Any(), gomock.Any()).Return(ovsPortID, nil).Times(1) mockOVSBridgeClient.EXPECT().GetOFPort(hostInterfaceName, false).Return(int32(100), nil).Times(1) @@ -394,7 +392,7 @@ func TestCmdAdd(t *testing.T) { func TestCmdDel(t *testing.T) { controller := gomock.NewController(t) ipamMock := ipamtest.NewMockIPAMDriver(controller) - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() ovsPort := int32(100) ctx := context.TODO() @@ -543,7 +541,7 @@ func TestCmdDel(t *testing.T) { func TestCmdCheck(t *testing.T) { controller := gomock.NewController(t) ipamMock := ipamtest.NewMockIPAMDriver(controller) - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() ovsPort := int32(100) ctx := context.TODO() @@ -635,98 +633,33 @@ func TestReconcile(t *testing.T) { mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - nodeName := "node1" cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, channel.NewSubscribableChannel("PodUpdate", 100), nil, false) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - pods := []runtime.Object{ - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p1", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p2", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - HostNetwork: true, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p4", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - } - containerIfaces := map[string]*interfacestore.InterfaceConfig{ - "iface1": { - InterfaceName: "iface1", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(3), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p1", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface3": { - InterfaceName: "iface3", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(4), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p3", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface4": { - InterfaceName: "iface4", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(-1), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p4", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - } - kubeClient := fakeclientset.NewSimpleClientset(pods...) + kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient - for _, containerIface := range containerIfaces { + for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - mockOFClient.EXPECT().InstallPodFlows("iface1", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - iface := containerIfaces["iface3"] - mockOFClient.EXPECT().UninstallPodFlows("iface3").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().DeletePort(iface.PortUUID).Return(nil).Times(1) + podFlowsInstalled := make(chan struct{}) + mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil). + Do(func(_ string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + close(podFlowsInstalled) + }).Times(1) + mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1) + mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1) mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) err := cniServer.reconcile() assert.NoError(t, err) - _, exists := ifaceStore.GetInterfaceByName("iface3") + _, exists := ifaceStore.GetInterfaceByName(staleInterface.InterfaceName) assert.False(t, exists) + select { + case <-podFlowsInstalled: + case <-time.After(500 * time.Millisecond): + t.Errorf("InstallPodFlows for %s should be called but was not", normalInterface.InterfaceName) + } } diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index 037ee24834e..6ec5edc4ce8 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -29,6 +29,8 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "antrea.io/antrea/pkg/agent/cniserver/ipam" ipamtest "antrea.io/antrea/pkg/agent/cniserver/ipam/testing" @@ -43,6 +45,8 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + utilip "antrea.io/antrea/pkg/util/ip" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -75,6 +79,80 @@ var ( ifaceStore interfacestore.InterfaceStore emptyResponse = &cnipb.CniCmdResponse{CniResult: []byte("")} + + nodeName = "node1" + gwMAC = utilip.MustParseMAC("00:00:11:11:11:11") + pod1 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p1", + Namespace: testPodNamespace, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + } + pod2 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p2", + Namespace: testPodNamespace, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + HostNetwork: true, + }, + } + pod3 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p3", + Namespace: testPodNamespace, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + } + normalInterface = &interfacestore.InterfaceConfig{ + InterfaceName: "iface1", + Type: interfacestore.ContainerInterface, + IPs: []net.IP{net.ParseIP("1.1.1.1")}, + MAC: utilip.MustParseMAC("00:11:22:33:44:01"), + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(3), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: pod1.Name, + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } + staleInterface = &interfacestore.InterfaceConfig{ + InterfaceName: "iface3", + Type: interfacestore.ContainerInterface, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(4), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: "non-existing-pod", + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } + unconnectedInterface = &interfacestore.InterfaceConfig{ + InterfaceName: "iface4", + Type: interfacestore.ContainerInterface, + IPs: []net.IP{net.ParseIP("1.1.1.2")}, + MAC: utilip.MustParseMAC("00:11:22:33:44:02"), + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(-1), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: pod3.Name, + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } ) func TestLoadNetConfig(t *testing.T) { @@ -664,15 +742,13 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[ } func newCNIServer(t *testing.T) *CNIServer { - networkReadyCh := make(chan struct{}) cniServer := &CNIServer{ cniSocket: testSocket, nodeConfig: testNodeConfig, serverVersion: cni.AntreaCNIVersion, containerAccess: newContainerAccessArbitrator(), - networkReadyCh: networkReadyCh, + podNetworkWait: wait.NewGroup(), } - close(networkReadyCh) cniServer.supportedCNIVersions = buildVersionSet() cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer @@ -696,7 +772,7 @@ func generateNetworkConfiguration(name, cniVersion, cniType, ipamType string) *t } func newRequest(args string, netCfg *types.NetworkConfig, path string, t *testing.T) (*cnipb.CniCmdRequest, string) { - containerID := generateUUID(t) + containerID := generateUUID() networkConfig, err := json.Marshal(netCfg) if err != nil { t.Error("Failed to generate Network configuration") @@ -715,11 +791,8 @@ func newRequest(args string, netCfg *types.NetworkConfig, path string, t *testin return cmdRequest, containerID } -func generateUUID(t *testing.T) string { - newID, err := uuid.NewUUID() - if err != nil { - t.Fatal("Failed to generate UUID") - } +func generateUUID() string { + newID, _ := uuid.NewUUID() return newID.String() } diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 27b3e68d180..2a7f38f4d72 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -28,9 +28,6 @@ import ( current "github.com/containernetworking/cni/pkg/types/100" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" fakeclientset "k8s.io/client-go/kubernetes/fake" @@ -326,8 +323,8 @@ func TestCmdAdd(t *testing.T) { dockerInfraContainer := "261a1970-5b6c-11ed-8caf-000c294e5d03" dockerWorkContainer := "261e579a-5b6c-11ed-8caf-000c294e5d03" - unknownInfraContainer := generateUUID(t) - containerdInfraContainer := generateUUID(t) + unknownInfraContainer := generateUUID() + containerdInfraContainer := generateUUID() defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() @@ -450,7 +447,7 @@ func TestCmdAdd(t *testing.T) { podName: "pod8", containerID: containerdInfraContainer, infraContainerID: containerdInfraContainer, - netns: generateUUID(t), + netns: generateUUID(), ipamAdd: true, connectOVS: true, containerIfaceExist: true, @@ -459,7 +456,7 @@ func TestCmdAdd(t *testing.T) { podName: "pod9", containerID: containerdInfraContainer, infraContainerID: containerdInfraContainer, - netns: generateUUID(t), + netns: generateUUID(), oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, @@ -469,7 +466,7 @@ func TestCmdAdd(t *testing.T) { podName: "pod10", containerID: containerdInfraContainer, infraContainerID: containerdInfraContainer, - netns: generateUUID(t), + netns: generateUUID(), ipamDel: true, oriIPAMResult: oriIPAMResult, endpointAttachErr: fmt.Errorf("unable to attach HnsEndpoint"), @@ -483,14 +480,14 @@ func TestCmdAdd(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(t), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) server := newMockCNIServer(t, controller, waiter.notifier) requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { - server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(t), ovsPortName)) + server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) } if tc.oriIPAMResult != nil { ipam.AddIPAMResult(tc.infraContainerID, tc.oriIPAMResult) @@ -501,7 +498,7 @@ func TestCmdAdd(t *testing.T) { if tc.ipamDel { ipamMock.EXPECT().Del(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1) } - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.connectOVS { if isDocker { mockOVSBridgeClient.EXPECT().CreateInternalPort(ovsPortName, int32(0), gomock.Any(), gomock.Any()).Return(ovsPortID, nil).Times(1) @@ -615,7 +612,7 @@ func TestCmdDel(t *testing.T) { t.Run(tc.name, func(t *testing.T) { isDocker := isDockerContainer(tc.netns) requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.containerID, tc.netns, nil) - hnsEndpoint := getHnsEndpoint(generateUUID(t), ovsPortName) + hnsEndpoint := getHnsEndpoint(generateUUID(), ovsPortName) var existingHnsEndpoints []hcsshim.HNSEndpoint if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) @@ -625,7 +622,7 @@ func TestCmdDel(t *testing.T) { defer testUtil.restore() waiter := newAsyncWaiter(tc.podName, tc.containerID) server := newMockCNIServer(t, controller, waiter.notifier) - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(hnsEndpoint) } @@ -683,7 +680,7 @@ func TestCmdCheck(t *testing.T) { defer mockSetInterfaceMTU(nil)() defer mockListHnsEndpoint(nil, nil)() defer mockGetNetInterfaceAddrs(containerIPNet, nil)() - defer mockGetHnsEndpointByName(generateUUID(t), mac)() + defer mockGetHnsEndpointByName(generateUUID(), mac)() wrapperIPAMResult := func(ipamResult current.Result, interfaces []*current.Interface) *current.Result { result := ipamResult @@ -721,7 +718,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod0-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "pod0-6631b7_eth0", Mac: "11:22:33:44:33:22", Sandbox: "none"}, }), - existingIface: wrapperContainerInterface("pod0-6631b7", containerID, "pod0", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod0-6631b7", containerID, "pod0", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod0-6631b7)", HardwareAddr: mac, @@ -737,7 +734,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod1-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "pod1-6631b7_eth0", Mac: "11:22:33:44:33:22", Sandbox: "invalid-namespace"}, }), - existingIface: wrapperContainerInterface("pod1-6631b7", containerID, "pod1", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod1-6631b7", containerID, "pod1", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod1-6631b7)", HardwareAddr: mac, @@ -759,7 +756,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod2-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "eth0", Mac: "11:22:33:44:33:22", Sandbox: "none"}, }), - existingIface: wrapperContainerInterface("pod2-6631b7", containerID, "pod2", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod2-6631b7", containerID, "pod2", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod2-6631b7)", HardwareAddr: mac, @@ -781,7 +778,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod3-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "pod3-6631b7_eth0", Mac: "11:22:33:44:33:33", Sandbox: "none"}, }), - existingIface: wrapperContainerInterface("pod3-6631b7", containerID, "pod3", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod3-6631b7", containerID, "pod3", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod3-6631b7)", HardwareAddr: mac, @@ -856,10 +853,10 @@ func TestReconcile(t *testing.T) { mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - nodeName := "node1" + defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() - missingEndpoint := getHnsEndpoint(generateUUID(t), "iface4") + missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() @@ -867,105 +864,45 @@ func TestReconcile(t *testing.T) { cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - pods := []runtime.Object{ - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p1", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p2", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - HostNetwork: true, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p4", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - } - kubeClient := fakeclientset.NewSimpleClientset(pods...) + kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient - containerIfaces := map[string]*interfacestore.InterfaceConfig{ - "iface1": { - InterfaceName: "iface1", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(3), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p1", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface3": { - InterfaceName: "iface3", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(4), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p3", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface4": { - InterfaceName: "iface4", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(-1), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p4", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - } - for _, containerIface := range containerIfaces { + for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - pod4IfaceName := "iface4" - pod4Iface := containerIfaces["iface4"] - waiter := newAsyncWaiter(pod4Iface.PodName, pod4Iface.ContainerID) + waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, waiter.notifier, nil, false) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} // Re-install Pod1 flows - mockOFClient.EXPECT().InstallPodFlows("iface1", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + podFlowsInstalled := make(chan string, 2) + mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil). + Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + podFlowsInstalled <- interfaceName + }).Times(1) // Uninstall Pod3 flows which is deleted. - iface := containerIfaces["iface3"] - mockOFClient.EXPECT().UninstallPodFlows("iface3").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().DeletePort(iface.PortUUID).Return(nil).Times(1) + mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1) + mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1) mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 - hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", pod4IfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(pod4IfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(pod4IfaceName, true).Return(int32(5), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(pod4IfaceName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) + mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). + Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + podFlowsInstalled <- interfaceName + }).Times(1) err := cniServer.reconcile() assert.NoError(t, err) _, exists := ifaceStore.GetInterfaceByName("iface3") assert.False(t, exists) + for i := 0; i < 2; i++ { + select { + case <-podFlowsInstalled: + case <-time.After(500 * time.Millisecond): + t.Errorf("InstallPodFlows should be called 2 times but was only called %d times", i) + break + } + } waiter.wait() waiter.close() } diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 70dd3a711f0..2eb162f7a37 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -551,13 +551,14 @@ func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error { // PatchAddressGroup updates a cached *v1beta.AddressGroup. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { +// It returns a copy of the patched AddressGroup, or an error if the AddressGroup doesn't exist. +func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) (*v1beta.AddressGroup, error) { c.addressSetLock.Lock() defer c.addressSetLock.Unlock() groupMemberSet, exists := c.addressSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { groupMemberSet.Insert(&patch.AddedGroupMembers[i]) @@ -567,7 +568,16 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { } c.onAddressGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(groupMemberSet)) + for _, member := range groupMemberSet { + members = append(members, *member) + } + group := &v1beta.AddressGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAddressGroup deletes a cached *v1beta.AddressGroup. @@ -639,13 +649,14 @@ func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error // PatchAppliedToGroup updates a cached *v1beta.AppliedToGroupPatch. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error { +// It returns a copy of the patched AppliedToGroup, or an error if the AppliedToGroup doesn't exist. +func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) (*v1beta.AppliedToGroup, error) { c.appliedToSetLock.Lock() defer c.appliedToSetLock.Unlock() memberSet, exists := c.appliedToSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { memberSet.Insert(&patch.AddedGroupMembers[i]) @@ -654,7 +665,16 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error memberSet.Delete(&patch.RemovedGroupMembers[i]) } c.onAppliedToGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(memberSet)) + for _, member := range memberSet { + members = append(members, *member) + } + group := &v1beta.AppliedToGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 0ced8235e26..dc68f2f5b13 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -1039,7 +1039,7 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAppliedToGroup(tt.args) + ret, err := c.PatchAppliedToGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1048,6 +1048,9 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { } actualPods, _ := c.appliedToSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedPods, actualPods.Items(), "stored Pods not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualPods)) + } }) } } @@ -1116,7 +1119,7 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAddressGroup(tt.args) + ret, err := c.PatchAddressGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1125,6 +1128,9 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { } actualAddresses, _ := c.addressSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedAddresses, actualAddresses.Items(), "stored addresses not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualAddresses)) + } }) } } diff --git a/pkg/agent/controller/networkpolicy/filestore.go b/pkg/agent/controller/networkpolicy/filestore.go new file mode 100644 index 00000000000..702a6b163f8 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore.go @@ -0,0 +1,134 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/spf13/afero" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" +) + +// fileStore encodes and stores runtime.Objects in files. Each object will be stored in a separate file under the given +// directory. +type fileStore struct { + fs afero.Fs + // The directory to store the files. + dir string + // serializer knows how to encode and decode the objects. + serializer runtime.Serializer +} + +func newFileStore(fs afero.Fs, dir string, serializer runtime.Serializer) (*fileStore, error) { + s := &fileStore{ + fs: fs, + dir: dir, + serializer: serializer, + } + klog.V(2).InfoS("Creating directory for NetworkPolicy cache", "dir", dir) + if err := s.fs.MkdirAll(dir, 0o600); err != nil { + return nil, err + } + return s, nil +} + +// save stores the given object in file with the object's UID as the file name, overwriting any existing content if the +// file already exists. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) save(item runtime.Object) error { + object := item.(metav1.Object) + path := filepath.Join(s.dir, string(object.GetUID())) + file, err := s.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("error opening file for writing object %v: %w", object.GetUID(), err) + } + defer file.Close() + // Encode may update the object's GroupVersionKind in-place during serialization. + err = s.serializer.Encode(item, file) + if err != nil { + return fmt.Errorf("error writing object %v to file: %w", object.GetUID(), err) + } + return nil +} + +// delete removes the file with the object's UID as the file name if it exists. +func (s fileStore) delete(item runtime.Object) error { + object := item.(metav1.Object) + path := filepath.Join(s.dir, string(object.GetUID())) + err := s.fs.Remove(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + return nil +} + +// replaceAll replaces all files under the directory with the given objects. Existing files not in the given objects +// will be removed. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) replaceAll(items []runtime.Object) error { + if err := s.fs.RemoveAll(s.dir); err != nil { + return err + } + if err := s.fs.MkdirAll(s.dir, 0o600); err != nil { + return err + } + for _, item := range items { + if err := s.save(item); err != nil { + return err + } + } + return nil +} + +func (s fileStore) loadAll() ([]runtime.Object, error) { + var objects []runtime.Object + err := afero.Walk(s.fs, s.dir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err2 := s.fs.Open(path) + if err2 != nil { + return err2 + } + defer file.Close() + data, err2 := io.ReadAll(file) + if err2 != nil { + return err2 + } + + object, gkv, err2 := s.serializer.Decode(data, nil, nil) + // If the data is corrupted somehow, we still want to load other data and continue the process. + if err2 != nil { + klog.ErrorS(err2, "Failed to decode data from file, ignore it", "file", path) + return nil + } + // Note: we haven't stored a different version so far but version conversion should be performed when the used + // version is upgraded in the future. + klog.V(2).InfoS("Loaded object from file", "gkv", gkv, "object", object) + objects = append(objects, object) + return nil + }) + if err != nil { + return nil, err + } + return objects, nil +} diff --git a/pkg/agent/controller/networkpolicy/filestore_test.go b/pkg/agent/controller/networkpolicy/filestore_test.go new file mode 100644 index 00000000000..71ef6a59c13 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore_test.go @@ -0,0 +1,190 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" + "k8s.io/apimachinery/pkg/types" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +const ( + testDataPath = "/var/run/antrea-test/file-store" +) + +// Set it to NewMemMapFs as the file system may be not writable. +// Change it to NewOsFs to evaluate performance when writing to disk. +var newFS = afero.NewMemMapFs + +func newFakeFileStore(tb testing.TB, dir string) *fileStore { + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + // Create a new FS for every fileStore in case of interaction between tests. + fs := afero.NewBasePathFs(newFS(), testDataPath) + s, err := newFileStore(fs, dir, codec) + assert.NoError(tb, err) + return s +} + +func TestFileStore(t *testing.T) { + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + policy3 := newNetworkPolicy("policy3", "uid3", []string{"addressGroup3"}, nil, []string{"appliedToGroup3"}, nil) + updatedPolicy2 := policy2.DeepCopy() + updatedPolicy2.AppliedToGroups = []string{"foo"} + + tests := []struct { + name string + ops func(*fileStore) + expectedObjects []runtime.Object + }{ + { + name: "add", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(policy3) + }, + expectedObjects: []runtime.Object{policy1, policy2, policy3}, + }, + { + name: "update", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(updatedPolicy2) + }, + expectedObjects: []runtime.Object{policy1, updatedPolicy2}, + }, + { + name: "delete", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.delete(policy2) + }, + expectedObjects: []runtime.Object{policy1}, + }, + { + name: "replace", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.replaceAll([]runtime.Object{updatedPolicy2, policy3}) + }, + expectedObjects: []runtime.Object{updatedPolicy2, policy3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newFakeFileStore(t, networkPoliciesDir) + tt.ops(s) + gotObjects, err := s.loadAll() + require.NoError(t, err) + assert.Equal(t, tt.expectedObjects, gotObjects) + }) + } +} + +func BenchmarkFileStoreAddNetworkPolicy(b *testing.B) { + policy := newNetworkPolicy("policy1", types.UID(uuid.New().String()), []string{uuid.New().String()}, nil, []string{uuid.New().String()}, nil) + s := newFakeFileStore(b, networkPoliciesDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(policy) + } +} + +func BenchmarkFileStoreAddAppliedToGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 100) + for i := 0; i < 100; i++ { + members = append(members, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", i), "namespace")) + } + atg := newAppliedToGroup(uuid.New().String(), members) + s := newFakeFileStore(b, appliedToGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(atg) + } +} + +func BenchmarkFileStoreAddAddressGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 1000) + for i := 0; i < 1000; i++ { + members = append(members, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", i), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(uuid.New().String(), members) + s := newFakeFileStore(b, addressGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(ag) + } +} + +func BenchmarkFileStoreReplaceAll(b *testing.B) { + nps := make([]runtime.Object, 0, 1000) + atgs := make([]runtime.Object, 0, 1000) + ags := make([]runtime.Object, 0, 1000) + for i := 0; i < 1000; i++ { + policyName := uuid.New().String() + addressGroupName := uuid.New().String() + appliedToGroupName := uuid.New().String() + nps = append(nps, newNetworkPolicy(policyName, types.UID(policyName), []string{addressGroupName}, nil, []string{appliedToGroupName}, nil)) + + var atgMembers []v1beta2.GroupMember + for j := 0; j < 100; j++ { + atgMembers = append(atgMembers, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", j), "namespace")) + } + atg := newAppliedToGroup(appliedToGroupName, atgMembers) + atgs = append(atgs, atg) + + var agMembers []v1beta2.GroupMember + podNum := 100 + if i < 10 { + podNum = 10000 + } else if i < 110 { + podNum = 1000 + } + for j := 0; j < podNum; j++ { + agMembers = append(agMembers, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", j), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(addressGroupName, agMembers) + ags = append(ags, ag) + } + + networkPolicyStore := newFakeFileStore(b, networkPoliciesDir) + appliedToGroupStore := newFakeFileStore(b, appliedToGroupsDir) + addressGroupStore := newFakeFileStore(b, addressGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + networkPolicyStore.replaceAll(nps) + appliedToGroupStore.replaceAll(atgs) + addressGroupStore.replaceAll(ags) + } +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 813b3ab0f07..71b6ecf9003 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -23,9 +23,12 @@ import ( "time" "antrea.io/ofnet/ofctrl" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" @@ -39,9 +42,11 @@ import ( "antrea.io/antrea/pkg/agent/openflow" proxytypes "antrea.io/antrea/pkg/agent/proxy/types" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/controlplane/install" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -58,6 +63,13 @@ const ( dnsInterceptRuleID = uint32(1) ) +const ( + dataPath = "/var/run/antrea/networkpolicy" + networkPoliciesDir = "network-policies" + appliedToGroupsDir = "applied-to-groups" + addressGroupsDir = "address-groups" +) + type L7RuleReconciler interface { AddRule(ruleID, policyName string, vlanID uint32, l7Protocols []v1beta2.L7Protocol, enableLogging bool) error DeleteRule(ruleID string, vlanID uint32) error @@ -65,6 +77,15 @@ type L7RuleReconciler interface { var emptyWatch = watch.NewEmptyWatch() +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + install.Install(scheme) +} + type packetInAction func(*ofctrl.PacketIn) error // Controller is responsible for watching Antrea AddressGroups, AppliedToGroups, @@ -125,10 +146,17 @@ type Controller struct { fullSyncGroup sync.WaitGroup ifaceStore interfacestore.InterfaceStore // denyConnStore is for storing deny connections for flow exporter. - denyConnStore *connections.DenyConnectionStore - gwPort uint32 - tunPort uint32 - nodeConfig *config.NodeConfig + denyConnStore *connections.DenyConnectionStore + gwPort uint32 + tunPort uint32 + nodeConfig *config.NodeConfig + podNetworkWait *utilwait.Group + + // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect + // to antrea-controller on startup. + networkPolicyStore *fileStore + appliedToGroupStore *fileStore + addressGroupStore *fileStore logPacketAction packetInAction rejectRequestAction packetInAction @@ -139,6 +167,7 @@ type Controller struct { func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, + fs afero.Fs, nodeName string, podUpdateSubscriber channel.Subscriber, externalEntityUpdateSubscriber channel.Subscriber, @@ -156,7 +185,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, v4Enabled bool, v6Enabled bool, gwPort, tunPort uint32, - nodeConfig *config.NodeConfig) (*Controller, error) { + nodeConfig *config.NodeConfig, + podNetworkWait *utilwait.Group) (*Controller, error) { idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID) c := &Controller{ antreaClientProvider: antreaClientGetter, @@ -172,6 +202,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, gwPort: gwPort, tunPort: tunPort, nodeConfig: nodeConfig, + podNetworkWait: podNetworkWait.Increment(), } if l7NetworkPolicyEnabled { @@ -179,8 +210,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.l7VlanIDAllocator = newL7VlanIDAllocator() } + var err error if antreaPolicyEnabled { - var err error if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort); err != nil { return nil, err } @@ -192,6 +223,23 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled, multicastEnabled) c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalEntityUpdateSubscriber, groupIDUpdates, nodeType) + + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + fs = afero.NewBasePathFs(fs, dataPath) + c.networkPolicyStore, err = newFileStore(fs, networkPoliciesDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for NetworkPolicy: %w", err) + } + c.appliedToGroupStore, err = newFileStore(fs, appliedToGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AppliedToGroup: %w", err) + } + c.addressGroupStore, err = newFileStore(fs, addressGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AddressGroup: %w", err) + } + if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } @@ -238,6 +286,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } c.ruleCache.AddNetworkPolicy(policy) klog.InfoS("NetworkPolicy applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) return nil @@ -252,6 +305,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } updated := c.ruleCache.UpdateNetworkPolicy(policy) // If any rule or the generation changes, we ensure statusManager will resync the policy's status once, in // case the changes don't cause any actual rule update but the whole policy's generation is changed. @@ -272,6 +330,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } c.ruleCache.DeleteNetworkPolicy(policy) klog.InfoS("NetworkPolicy no longer applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to delete the NetworkPolicy from file", "policyName", policy.SourceRef.ToString()) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -296,9 +357,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.statusManager.Resync(policies[i].UID) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicies to files") + } c.ruleCache.ReplaceNetworkPolicies(policies) return nil }, + FallbackFunc: c.networkPolicyStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -317,15 +384,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) + } c.ruleCache.AddAppliedToGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AppliedToGroupPatch) + patch, ok := obj.(*v1beta2.AppliedToGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAppliedToGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAppliedToGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -334,6 +414,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } c.ruleCache.DeleteAppliedToGroup(group) + if err := c.appliedToGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AppliedToGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -345,9 +428,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.appliedToGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroups to files") + } c.ruleCache.ReplaceAppliedToGroups(groups) return nil }, + FallbackFunc: c.appliedToGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -366,15 +455,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) + } c.ruleCache.AddAddressGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AddressGroupPatch) + patch, ok := obj.(*v1beta2.AddressGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AddressGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAddressGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAddressGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -383,6 +485,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } c.ruleCache.DeleteAddressGroup(group) + if err := c.addressGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AddressGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -394,9 +499,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.addressGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroups to files") + } c.ruleCache.ReplaceAddressGroups(groups) return nil }, + FallbackFunc: c.addressGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -506,6 +617,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { klog.Infof("All watchers have completed full sync, installing flows for init events") // Batch install all rules in queue after fullSync is finished. c.processAllItemsInQueue() + c.podNetworkWait.Done() klog.Infof("Starting NetworkPolicy workers now") defer c.queue.ShutDown() @@ -744,6 +856,8 @@ type watcher struct { DeleteFunc func(obj runtime.Object) error // ReplaceFunc is the function that handles init events. ReplaceFunc func(objs []runtime.Object) error + // FallbackFunc is the function that provides the data when it can't start the watch successfully. + FallbackFunc func() ([]runtime.Object, error) // connected represents whether the watch has connected to apiserver successfully. connected bool // lock protects connected. @@ -766,17 +880,46 @@ func (w *watcher) setConnected(connected bool) { w.connected = connected } +// fallback gets init events from the FallbackFunc if the watcher hasn't been synced once. +func (w *watcher) fallback() { + // If the watcher has been synced once, the fallback data source doesn't have newer data, do nothing. + if w.fullSynced { + return + } + klog.InfoS("Getting init events for %s from fallback", w.objectType) + objects, err := w.FallbackFunc() + if err != nil { + klog.ErrorS(err, "Failed to get init events for %s from fallback", w.objectType) + return + } + if err := w.ReplaceFunc(objects); err != nil { + klog.ErrorS(err, "Failed to handle init events") + return + } + w.onFullSync() +} + +func (w *watcher) onFullSync() { + if !w.fullSynced { + w.fullSynced = true + // Notify fullSyncWaitGroup that all events before bookmark is handled + w.fullSyncWaitGroup.Done() + } +} + func (w *watcher) watch() { klog.Infof("Starting watch for %s", w.objectType) watcher, err := w.watchFunc() if err != nil { klog.Warningf("Failed to start watch for %s: %v", w.objectType, err) + w.fallback() return } // Watch method doesn't return error but "emptyWatch" in case of some partial data errors, // e.g. timeout error. Make sure that watcher is not empty and log warning otherwise. if reflect.TypeOf(watcher) == reflect.TypeOf(emptyWatch) { klog.Warningf("Failed to start watch for %s, please ensure antrea service is reachable for the agent", w.objectType) + w.fallback() return } @@ -817,11 +960,7 @@ loop: klog.Errorf("Failed to handle init events: %v", err) return } - if !w.fullSynced { - w.fullSynced = true - // Notify fullSyncWaitGroup that all events before bookmark is handled - w.fullSyncWaitGroup.Done() - } + w.onFullSync() for { select { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 54c591c1e91..5eb33646735 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -15,17 +15,21 @@ package networkpolicy import ( + "encoding/base64" "fmt" "net" + "os" "strings" "sync" "testing" "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" @@ -43,6 +47,7 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const testNamespace = "ns1" @@ -71,7 +76,8 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + fs := afero.NewMemMapFs() + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}, wait.NewGroup()) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil @@ -146,14 +152,16 @@ var _ Reconciler = &mockReconciler{} func newAddressGroup(name string, addresses []v1beta2.GroupMember) *v1beta2.AddressGroup { return &v1beta2.AddressGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AddressGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: addresses, } } func newAppliedToGroup(name string, pods []v1beta2.GroupMember) *v1beta2.AppliedToGroup { return &v1beta2.AppliedToGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AppliedToGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: pods, } } @@ -165,6 +173,7 @@ func newNetworkPolicy(name string, uid types.UID, from, to, appliedTo []string, } networkPolicyRule1 := newPolicyRule(dir, from, to, services) return &v1beta2.NetworkPolicy{ + TypeMeta: v1.TypeMeta{Kind: "NetworkPolicy", APIVersion: "controlplane.antrea.io/v1beta2"}, ObjectMeta: v1.ObjectMeta{UID: uid, Name: string(uid)}, Rules: []v1beta2.NetworkPolicyRule{networkPolicyRule1}, AppliedToGroups: appliedTo, @@ -507,6 +516,176 @@ func TestAddNetworkPolicyWithMultipleRules(t *testing.T) { assert.Equal(t, 1, controller.GetAppliedToGroupNum()) } +func writeToFile(t *testing.T, fs afero.Fs, dir, file string, base64Str string) { + data, err := base64.StdEncoding.DecodeString(base64Str) + require.NoError(t, err) + f, err := fs.OpenFile(dir+"/"+file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + require.NoError(t, err) + defer f.Close() + _, err = f.Write(data) + require.NoError(t, err) +} + +func TestFallbackToFileStore(t *testing.T) { + prepareMockTables() + tests := []struct { + name string + initFileStore func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) + expectedRule *CompletedRule + }{ + { + name: "same storage version", + initFileStore: func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) { + networkPolicyStore.save(newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil)) + appliedToGroupStore.save(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "namespace")})) + addressGroupStore.save(newAddressGroup("addressGroup1", []v1beta2.GroupMember{*newAddressGroupPodMember("pod2", "namespace", "192.168.0.1")})) + }, + expectedRule: &CompletedRule{ + rule: &rule{ + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}}, + MaxPriority: -1, + AppliedToGroups: []string{"appliedToGroup1"}, + PolicyUID: "uid1", + PolicyName: "uid1", + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: testNamespace, + Name: "policy1", + UID: "uid1", + }, + }, + FromAddresses: v1beta2.NewGroupMemberSet(newAddressGroupPodMember("pod2", "namespace", "192.168.0.1")), + TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMemberPod("pod1", "namespace")), + }, + }, + { + // The test is to ensure compatibility with v1beta2 storage version if one day the used version is upgraded. + name: "compatible with v1beta2", + initFileStore: func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) { + // The bytes of v1beta2 objects serialized in protobuf. + // They are not supposed to be updated when bumping up the used version. + base64EncodedPolicy := "azhzAAovCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDU5ldHdvcmtQb2xpY3kSdAoYCgR1aWQxEgAaACIAKgR1aWQxMgA4AEIAEh8KAkluEg8KDWFkZHJlc3NHcm91cDEaACgAOABKAFoAGg9hcHBsaWVkVG9Hcm91cDEyJgoQSzhzTmV0d29ya1BvbGljeRIDbnMxGgdwb2xpY3kxIgR1aWQxGgAiAA==" + base64EncodedAppliedToGroup := "azhzAAowCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDkFwcGxpZWRUb0dyb3VwEkUKLgoPYXBwbGllZFRvR3JvdXAxEgAaACIAKg9hcHBsaWVkVG9Hcm91cDEyADgAQgASEwoRCgRwb2QxEgluYW1lc3BhY2UaACIA" + base64EncodedAddressGroup := "azhzAAouCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDEFkZHJlc3NHcm91cBJTCioKDWFkZHJlc3NHcm91cDESABoAIgAqDWFkZHJlc3NHcm91cDEyADgAQgASJQoRCgRwb2QyEgluYW1lc3BhY2UaEAAAAAAAAAAAAAD//8CoAAEaACIA" + writeToFile(t, networkPolicyStore.fs, networkPoliciesDir, "uid1", base64EncodedPolicy) + writeToFile(t, appliedToGroupStore.fs, appliedToGroupsDir, "appliedToGroup1", base64EncodedAppliedToGroup) + writeToFile(t, addressGroupStore.fs, addressGroupsDir, "addressGroup1", base64EncodedAddressGroup) + }, + expectedRule: &CompletedRule{ + rule: &rule{ + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}}, + MaxPriority: -1, + AppliedToGroups: []string{"appliedToGroup1"}, + PolicyUID: "uid1", + PolicyName: "uid1", + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: testNamespace, + Name: "policy1", + UID: "uid1", + }, + }, + FromAddresses: v1beta2.NewGroupMemberSet( + &v1beta2.GroupMember{ + Pod: &v1beta2.PodReference{Name: "pod2", Namespace: "namespace"}, + IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("192.168.0.1"))}, + }, + ), + TargetMembers: v1beta2.NewGroupMemberSet( + &v1beta2.GroupMember{ + Pod: &v1beta2.PodReference{Name: "pod1", Namespace: "namespace"}, + }, + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, fmt.Errorf("network unavailable"))) + + tt.initFileStore(controller.networkPolicyStore, controller.appliedToGroupStore, controller.addressGroupStore) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + // Rule ID is a hash value, we don't care about its exact value. + actualRule.ID = "" + assert.Equal(t, tt.expectedRule, actualRule) + case <-time.After(time.Second): + t.Fatal("Expected one rule update, got timeout") + } + }) + } +} + +func TestOverrideFileStore(t *testing.T) { + prepareMockTables() + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, nil)) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, nil)) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, nil)) + + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + atgMember1 := newAppliedToGroupMemberPod("pod1", "namespace") + atgMember2 := newAppliedToGroupMemberPod("pod2", "namespace") + agMember1 := newAddressGroupPodMember("pod3", "namespace", "192.168.0.1") + agMember2 := newAddressGroupPodMember("pod4", "namespace", "192.168.0.2") + atg1 := newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*atgMember1}) + atg2 := newAppliedToGroup("appliedToGroup2", []v1beta2.GroupMember{*atgMember2}) + ag1 := newAddressGroup("addressGroup1", []v1beta2.GroupMember{*agMember1}) + ag2 := newAddressGroup("addressGroup2", []v1beta2.GroupMember{*agMember2}) + controller.networkPolicyStore.save(policy1) + controller.appliedToGroupStore.save(atg1) + controller.addressGroupStore.save(ag1) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + networkPolicyWatcher.Add(policy2) + networkPolicyWatcher.Action(watch.Bookmark, nil) + addressGroupWatcher.Add(ag2) + addressGroupWatcher.Action(watch.Bookmark, nil) + appliedToGroupWatcher.Add(atg2) + appliedToGroupWatcher.Action(watch.Bookmark, nil) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + assert.Equal(t, v1beta2.NewGroupMemberSet(atgMember2), actualRule.TargetMembers) + assert.Equal(t, v1beta2.NewGroupMemberSet(agMember2), actualRule.FromAddresses) + assert.Equal(t, policy2.SourceRef, actualRule.SourceRef) + case <-time.After(time.Second): + t.Fatal("Expected one rule update, got timeout") + } + + objects, err := controller.appliedToGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{atg2}, objects) + objects, err = controller.addressGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{ag2}, objects) + objects, err = controller.networkPolicyStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{policy2}, objects) +} + func TestNetworkPolicyMetrics(t *testing.T) { prepareMockTables() // Initialize NetworkPolicy metrics (prometheus) diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 4480575dace..69dcaede88a 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -155,6 +155,7 @@ func makeTestClusterIPService(svcPortName *k8sproxy.ServicePortName, nested bool, labels map[string]string) *corev1.Service { return makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.Type = corev1.ServiceTypeClusterIP svc.Spec.ClusterIP = clusterIP.String() svc.Spec.Ports = []corev1.ServicePort{{ Name: svcPortName.Port, @@ -2672,6 +2673,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP net.IP, + externalIP net.IP, ep1IP net.IP, ep2IP net.IP, svcType corev1.ServiceType, @@ -2683,12 +2685,17 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, var svc, updatedSvc *corev1.Service switch svcType { + case corev1.ServiceTypeClusterIP: + // ExternalTrafficPolicy defaults to Cluster. + svc = makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc = svc.DeepCopy() + updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) } makeServiceMap(fp, svc) @@ -2719,6 +2726,14 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, Protocol: bindingProtocol, ClusterGroupID: 1, }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: externalIP, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + ClusterGroupID: 1, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2750,6 +2765,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, expectedLocalEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort), @@ -2757,6 +2773,17 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, LocalGroupID: 2, ClusterGroupID: 1, }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: externalIP, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + LocalGroupID: 2, + ClusterGroupID: 1, + TrafficPolicyLocal: true, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) @@ -2798,19 +2825,25 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, func TestServiceExternalTrafficPolicyUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { + t.Run("ClusterIP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv4, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeClusterIP, false) + }) t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) }) t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) }) }) t.Run("IPv6", func(t *testing.T) { + t.Run("ClusterIP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv6, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeClusterIP, true) + }) t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) }) t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) }) }) } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index e4c6c4dfcec..584c777a466 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -182,15 +182,17 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { return fmt.Errorf("failed to initialize ip routes: %v", err) } + // Ensure IPv4 forwarding is enabled if it is a dual-stack or IPv4-only cluster. + if c.nodeConfig.NodeIPv4Addr != nil { + if err := sysctl.EnsureSysctlNetValue("ipv4/ip_forward", 1); err != nil { + return fmt.Errorf("failed to enable IPv4 forwarding: %w", err) + } + } + // Ensure IPv6 forwarding is enabled if it is a dual-stack or IPv6-only cluster. if c.nodeConfig.NodeIPv6Addr != nil { - sysctlFilename := "ipv6/conf/all/forwarding" - v, err := sysctl.GetSysctlNet(sysctlFilename) - if err != nil { - return fmt.Errorf("failed to read value of sysctl file: %s", sysctlFilename) - } - if v != 1 { - return fmt.Errorf("IPv6 forwarding is not enabled") + if err := sysctl.EnsureSysctlNetValue("ipv6/conf/all/forwarding", 1); err != nil { + return fmt.Errorf("failed to enable IPv6 forwarding: %w", err) } } diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index bd82ea18989..58563e2274c 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -195,6 +195,14 @@ func MustParseCIDR(cidr string) *net.IPNet { return ipNet } +func MustParseMAC(mac string) net.HardwareAddr { + addr, err := net.ParseMAC(mac) + if err != nil { + panic(fmt.Errorf("cannot parse '%v': %v", mac, err)) + } + return addr +} + // IPNetEqual returns if the provided IPNets are the same subnet. func IPNetEqual(ipNet1, ipNet2 *net.IPNet) bool { if ipNet1 == nil && ipNet2 == nil { diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go new file mode 100644 index 00000000000..6897ec2fb24 --- /dev/null +++ b/pkg/util/wait/wait.go @@ -0,0 +1,85 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "time" + + "k8s.io/utils/clock" +) + +// Group allows to wait for a collection of goroutines to finish with a timeout or a stop channel. +type Group struct { + wg *sync.WaitGroup + doneCh chan struct{} + once sync.Once + clock clock.Clock +} + +func NewGroup() *Group { + return newGroupWithClock(clock.RealClock{}) +} + +func newGroupWithClock(clock clock.Clock) *Group { + return &Group{ + wg: &sync.WaitGroup{}, + doneCh: make(chan struct{}), + clock: clock, + } +} + +func (g *Group) Increment() *Group { + g.wg.Add(1) + return g +} + +func (g *Group) Done() { + g.wg.Done() +} + +func (g *Group) wait() { + g.once.Do(func() { + go func() { + g.wg.Wait() + close(g.doneCh) + }() + }) +} + +func (g *Group) WaitWithTimeout(timeout time.Duration) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-g.clock.After(timeout): + return fmt.Errorf("timeout waiting for group") + } +} + +func (g *Group) WaitUntil(stopCh <-chan struct{}) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-stopCh: + return fmt.Errorf("stopCh closed, stop waiting") + } +} + +func (g *Group) Wait() { + g.wg.Wait() +} diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go new file mode 100644 index 00000000000..f471685b1d9 --- /dev/null +++ b/pkg/util/wait/wait_test.go @@ -0,0 +1,133 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clock "k8s.io/utils/clock/testing" +) + +func TestGroupWaitWithTimeout(t *testing.T) { + const timeout = 100 * time.Millisecond + tests := []struct { + name string + add int + processFn func(group *Group, fakeClock *clock.FakeClock) + expectWaitErr bool + }{ + { + name: "add only", + add: 1, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add greater than done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add equal to done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout / 2) + group.Done() + }, + expectWaitErr: false, + }, + { + name: "add with delay", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout * 2) + group.Done() + }, + expectWaitErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + g := newGroupWithClock(fakeClock) + for i := 0; i < tt.add; i++ { + g.Increment() + } + resCh := make(chan error, 1) + go func() { + resCh <- g.WaitWithTimeout(timeout) + }() + require.Eventually(t, func() bool { + return fakeClock.HasWaiters() + }, 1*time.Second, 10*time.Millisecond) + tt.processFn(g, fakeClock) + err := <-resCh + if tt.expectWaitErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGroupWait(t *testing.T) { + g := NewGroup() + g.Increment() + returnedCh := make(chan struct{}) + go func() { + g.Wait() + close(returnedCh) + }() + select { + case <-time.After(100 * time.Millisecond): + case <-returnedCh: + t.Errorf("Wait should not return before it's done") + } + g.Done() + select { + case <-time.After(500 * time.Millisecond): + t.Errorf("Wait should return after it's done") + case <-returnedCh: + } +} + +func TestGroupWaitUntil(t *testing.T) { + g := NewGroup() + g.Increment() + stopCh := make(chan struct{}) + go func() { + time.Sleep(100 * time.Millisecond) + close(stopCh) + }() + err := g.WaitUntil(stopCh) + assert.Error(t, err) + + stopCh = make(chan struct{}) + g.Done() + err = g.WaitUntil(stopCh) + assert.NoError(t, err) +} diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 421be42bd58..0310e5d7c58 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -96,6 +96,10 @@ func TestNetworkPolicy(t *testing.T) { skipIfProxyDisabled(t) testAllowHairpinService(t, data) }) + t.Run("testNetworkPolicyAfterAgentRestart", func(t *testing.T) { + t.Cleanup(exportLogsForSubtest(t, data)) + testNetworkPolicyAfterAgentRestart(t, data) + }) } func testNetworkPolicyStats(t *testing.T, data *TestData) { @@ -704,6 +708,101 @@ func testNetworkPolicyResyncAfterRestart(t *testing.T, data *TestData) { } } +// The test validates that Pods can't bypass NetworkPolicy when antrea-agent restarts. +func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { + workerNode := workerNodeName(1) + var isolatedPod, deniedPod, allowedPod string + var isolatedPodIPs, deniedPodIPs, allowedPodIPs *PodIPs + var wg sync.WaitGroup + createTestPod := func(prefix string) (string, *PodIPs) { + defer wg.Done() + podName, podIPs, cleanup := createAndWaitForPod(t, data, data.createNginxPodOnNode, prefix, workerNode, data.testNamespace, false) + t.Cleanup(cleanup) + return podName, podIPs + } + wg.Add(3) + go func() { + isolatedPod, isolatedPodIPs = createTestPod("test-isolated") + }() + go func() { + deniedPod, deniedPodIPs = createTestPod("test-denied") + }() + go func() { + allowedPod, allowedPodIPs = createTestPod("test-allowed") + }() + wg.Wait() + + allowedPeer := networkingv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": allowedPod}}, + } + netpol, err := data.createNetworkPolicy("test-isolated", &networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": isolatedPod}}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{From: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + Egress: []networkingv1.NetworkPolicyEgressRule{{To: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + }) + require.NoError(t, err) + t.Cleanup(func() { data.deleteNetworkpolicy(netpol) }) + + checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) { + var wg sync.WaitGroup + checkOne := func(clientPod, serverPod string, serverIP *net.IP) { + defer wg.Done() + if serverIP != nil { + cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"} + _, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd) + if expectErr && err == nil { + t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) + } else if !expectErr && err != nil { + t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err) + } + } + } + wg.Add(4) + go checkOne(isolatedPod, testPod, testPodIPs.ipv4) + go checkOne(isolatedPod, testPod, testPodIPs.ipv6) + go checkOne(testPod, isolatedPod, isolatedPodIPs.ipv4) + go checkOne(testPod, isolatedPod, isolatedPodIPs.ipv6) + wg.Wait() + } + + scaleFunc := func(replicas int32) { + scale, err := data.clientset.AppsV1().Deployments(antreaNamespace).GetScale(context.TODO(), antreaDeployment, metav1.GetOptions{}) + require.NoError(t, err) + scale.Spec.Replicas = replicas + _, err = data.clientset.AppsV1().Deployments(antreaNamespace).UpdateScale(context.TODO(), antreaDeployment, scale, metav1.UpdateOptions{}) + require.NoError(t, err) + } + + // Scale antrea-controller to 0 so antrea-agent will lose connection with antrea-controller. + scaleFunc(0) + t.Cleanup(func() { scaleFunc(1) }) + + // Restart the antrea-agent. + _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) + require.NoError(t, err) + + // While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully. + for i := 0; i < 5; i++ { + checkFunc(deniedPod, deniedPodIPs, true) + } + + antreaPod, err := data.getAntreaPodOnNode(workerNode) + require.NoError(t, err) + // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse) + waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue) + // Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working. + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) + + // Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller. + scaleFunc(1) + // Make sure antrea-agent connects to antrea-controller. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue) + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) +} + func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) { serverPort := int32(80) _, serverIPs, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", "", data.testNamespace, false) @@ -1039,8 +1138,9 @@ func waitForAgentCondition(t *testing.T, data *TestData, podName string, conditi t.Logf("cmds: %s", cmds) stdout, _, err := runAntctl(podName, cmds, data) + // The server may not be available yet. if err != nil { - return true, err + return false, nil } var agentInfo agentinfo.AntreaAgentInfoResponse err = json.Unmarshal([]byte(stdout), &agentInfo) diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 0681056a134..f0991828d15 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -57,6 +57,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -295,7 +296,7 @@ type cmdAddDelTester struct { targetNS ns.NetNS request *cnimsg.CniCmdRequest vethName string - networkReadyCh chan struct{} + podNetworkWait *wait.Group } func (tester *cmdAddDelTester) setNS(testNS ns.NetNS, targetNS ns.NetNS) { @@ -564,14 +565,14 @@ func (tester *cmdAddDelTester) cmdDelTest(tc testCase, dataDir string) { func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() - tester.networkReadyCh = make(chan struct{}) + tester.podNetworkWait = wait.NewGroup() tester.server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - tester.networkReadyCh) + tester.podNetworkWait.Increment()) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) ctx := context.Background() tester.ctx = ctx @@ -607,7 +608,7 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ovsServiceMock.EXPECT().GetOFPort(ovsPortname, false).Return(int32(10), nil).AnyTimes() ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) - close(tester.networkReadyCh) + tester.podNetworkWait.Done() // Test ips allocation prevResult, err := tester.cmdAddTest(tc, dataDir) testRequire.Nil(err) @@ -726,15 +727,14 @@ func setupChainTest( if newServer { routeMock = routetest.NewMockInterface(controller) - networkReadyCh := make(chan struct{}) - close(networkReadyCh) + podNetworkWait := wait.NewGroup() server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - networkReadyCh) + podNetworkWait) } else { server = inServer } diff --git a/third_party/proxy/util/service.go b/third_party/proxy/util/service.go index 883c373d47f..9056f7b4626 100644 --- a/third_party/proxy/util/service.go +++ b/third_party/proxy/util/service.go @@ -37,10 +37,15 @@ package util import v1 "k8s.io/api/core/v1" +func ExternallyAccessible(service *v1.Service) bool { + return service.Spec.Type == v1.ServiceTypeLoadBalancer || + service.Spec.Type == v1.ServiceTypeNodePort || + (service.Spec.Type == v1.ServiceTypeClusterIP && len(service.Spec.ExternalIPs) > 0) +} + // ExternalPolicyLocal checks if service has ETP = Local. func ExternalPolicyLocal(service *v1.Service) bool { - if service.Spec.Type != v1.ServiceTypeLoadBalancer && - service.Spec.Type != v1.ServiceTypeNodePort { + if !ExternallyAccessible(service) { return false } return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal