From 891b95ba1b29e9ea586d8e634f996da5e76fb130 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Mon, 15 Aug 2022 23:44:42 +0800 Subject: [PATCH] Do not realize AntreaNetworkPolicy applied to Pods in other Namespace When AntreaNetworkPolicy uses Group as AppliedTo, the Group should not select Pods in other Namespaces, otherwise the policy would be applied to other Namespaces. This was prevented by using a validation when creating AppliedToGroup for Group, which ensures that the Group doesn't have a NamespaceSelector. However, the validation could be bypassed by several approaches, the most straightforward one is to use a parent Group as AppliedTo and make one of its child Groups use NamespaceSelector. It's hard to cover all cases if the validation is only in the phase of creating AppliedToGroup because of its dynamic nature. This patch implements a validation when syncing AppliedToGroup. The validation ensures that the AppliedToGroup cannot have any members in other Namespaces if it's derived from a namespaced Group, regardless of the way by which the members are selected. The error encountered when syncing AppliedToGroup will be reflected in the statuses of the NetworkPolicies that use this AppliedToGroup. This patch also unifies the behavior of ClusterNetworkPolicy and AntreaNetworkPolicy when a ClusterGroup/Group used as AppliedTo contains IPBlocks only: it would be treated like empty AppliedTo in both cases. Fixes #4116 Signed-off-by: Quan Tian --- .../networkpolicy/antreanetworkpolicy.go | 67 +---- .../networkpolicy/antreanetworkpolicy_test.go | 77 ------ .../networkpolicy/clusternetworkpolicy.go | 21 +- .../clusternetworkpolicy_test.go | 67 ----- pkg/controller/networkpolicy/crd_utils.go | 23 ++ .../networkpolicy/crd_utils_test.go | 86 +++++++ .../networkpolicy/networkpolicy_controller.go | 136 ++++++---- .../networkpolicy_controller_test.go | 242 +++++++++++++++++- .../networkpolicy/status_controller.go | 46 ++-- pkg/controller/types/networkpolicy.go | 7 +- 10 files changed, 467 insertions(+), 305 deletions(-) diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy.go b/pkg/controller/networkpolicy/antreanetworkpolicy.go index 1af2ce40c0f..25e0277cd5b 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy.go @@ -24,7 +24,6 @@ import ( "antrea.io/antrea/pkg/apis/controlplane" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" antreatypes "antrea.io/antrea/pkg/controller/types" - "antrea.io/antrea/pkg/util/k8s" ) func getANPReference(anp *crdv1alpha1.NetworkPolicy) *controlplane.NetworkPolicyReference { @@ -87,35 +86,15 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net appliedToGroups := map[string]*antreatypes.AppliedToGroup{} addressGroups := map[string]*antreatypes.AddressGroup{} rules := make([]controlplane.NetworkPolicyRule, 0, len(np.Spec.Ingress)+len(np.Spec.Egress)) - newUnrealizableInternalNetworkPolicy := func(err error) *antreatypes.NetworkPolicy { - return &antreatypes.NetworkPolicy{ - SourceRef: &controlplane.NetworkPolicyReference{ - Type: controlplane.AntreaNetworkPolicy, - Namespace: np.Namespace, - Name: np.Name, - UID: np.UID, - }, - Name: internalNetworkPolicyKeyFunc(np), - UID: np.UID, - Generation: np.Generation, - RealizationError: err, - } - } // Create AppliedToGroup for each AppliedTo present in AntreaNetworkPolicy spec. - atgs, err := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo) - if err != nil { - return newUnrealizableInternalNetworkPolicy(err), nil, nil - } + atgs := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) // Compute NetworkPolicyRule for Ingress Rule. for idx, ingressRule := range np.Spec.Ingress { // Set default action to ALLOW to allow traffic. services, namedPortExists := toAntreaServicesForCRD(ingressRule.Ports, ingressRule.Protocols) // Create AppliedToGroup for each AppliedTo present in the ingress rule. - atgs, err := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo) - if err != nil { - return newUnrealizableInternalNetworkPolicy(err), nil, nil - } + atgs := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) peer, ags := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists) addressGroups = mergeAddressGroups(addressGroups, ags...) @@ -135,10 +114,7 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net // Set default action to ALLOW to allow traffic. services, namedPortExists := toAntreaServicesForCRD(egressRule.Ports, egressRule.Protocols) // Create AppliedToGroup for each AppliedTo present in the egress rule. - atgs, err := n.processAppliedTo(np.Namespace, egressRule.AppliedTo) - if err != nil { - return newUnrealizableInternalNetworkPolicy(err), nil, nil - } + atgs := n.processAppliedTo(np.Namespace, egressRule.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) var peer *controlplane.NetworkPolicyPeer if egressRule.ToServices != nil { @@ -179,16 +155,12 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net return internalNetworkPolicy, appliedToGroups, addressGroups } -func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer) ([]*antreatypes.AppliedToGroup, error) { +func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer) []*antreatypes.AppliedToGroup { var appliedToGroups []*antreatypes.AppliedToGroup for _, at := range appliedTo { var atg *antreatypes.AppliedToGroup if at.Group != "" { - var err error - atg, err = n.createAppliedToGroupForNamespacedGroup(namespace, at.Group) - if err != nil { - return nil, err - } + atg = n.createAppliedToGroupForGroup(namespace, at.Group) } else { atg = n.createAppliedToGroup(namespace, at.PodSelector, at.NamespaceSelector, at.ExternalEntitySelector) } @@ -196,37 +168,16 @@ func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo [ appliedToGroups = append(appliedToGroups, atg) } } - return appliedToGroups, nil + return appliedToGroups } // ErrNetworkPolicyAppliedToUnsupportedGroup is an error response when -// a Group with IPBlocks or NamespaceSelector is used as AppliedTo. +// a Group with Pods in other Namespaces is used as AppliedTo. type ErrNetworkPolicyAppliedToUnsupportedGroup struct { namespace string groupName string } -func (e ErrNetworkPolicyAppliedToUnsupportedGroup) Error() string { - return fmt.Sprintf("group %s/%s with IPBlocks or NamespaceSelector can not be used as AppliedTo", e.namespace, e.groupName) -} - -func (n *NetworkPolicyController) createAppliedToGroupForNamespacedGroup(namespace, groupName string) (*antreatypes.AppliedToGroup, error) { - // Namespaced group uses NAMESPACE/NAME as the key of the corresponding internal group. - key := k8s.NamespacedName(namespace, groupName) - // Find the internal Group corresponding to this Group. - // There is no need to check if the namespaced group exists in groupLister because its existence will eventually be - // reflected in internalGroupStore. - ig, found, _ := n.internalGroupStore.Get(key) - if !found { - // Internal Group is not found, which means the corresponding namespaced group is either not created yet or not - // processed yet. Once the internal Group is created and processed, the sync worker for internal group will - // re-enqueue the ClusterNetworkPolicy processing which will trigger the creation of AppliedToGroup. - return nil, nil - } - intGrp := ig.(*antreatypes.Group) - if len(intGrp.IPBlocks) > 0 || (intGrp.Selector != nil && intGrp.Selector.NamespaceSelector != nil) { - klog.V(2).InfoS("Group with IPBlocks or NamespaceSelector can not be used as AppliedTo", "Group", key) - return nil, ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: namespace, groupName: groupName} - } - return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: key}, nil +func (e *ErrNetworkPolicyAppliedToUnsupportedGroup) Error() string { + return fmt.Sprintf("group %s/%s with Pods in other Namespaces can not be used as AppliedTo", e.namespace, e.groupName) } diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go index 8d579d686a7..296461a2566 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go @@ -15,7 +15,6 @@ package networkpolicy import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -25,7 +24,6 @@ import ( "antrea.io/antrea/pkg/apis/controlplane" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" - crdv1alpha3 "antrea.io/antrea/pkg/apis/crd/v1alpha3" antreatypes "antrea.io/antrea/pkg/controller/types" ) @@ -620,81 +618,6 @@ func TestDeleteANP(t *testing.T) { assert.False(t, done) } -func TestProcessAppliedToGroupsForGroup(t *testing.T) { - selectorA := metav1.LabelSelector{MatchLabels: map[string]string{"foo1": "bar1"}} - cidr := "10.0.0.0/24" - // gA with selector present in cache - gA := crdv1alpha3.Group{ - ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "gA", UID: "uidA"}, - Spec: crdv1alpha3.GroupSpec{ - PodSelector: &selectorA, - }, - } - // gB with IPBlock present in cache - gB := crdv1alpha3.Group{ - ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"}, - Spec: crdv1alpha3.GroupSpec{ - IPBlocks: []crdv1alpha1.IPBlock{ - { - CIDR: cidr, - }, - }, - }, - } - // gC not found in cache - gC := crdv1alpha3.Group{ - ObjectMeta: metav1.ObjectMeta{Namespace: "nsC", Name: "gC", UID: "uidC"}, - Spec: crdv1alpha3.GroupSpec{ - NamespaceSelector: &selectorA, - }, - } - _, npc := newController() - npc.addGroup(&gA) - npc.addGroup(&gB) - npc.gStore.Add(&gA) - npc.gStore.Add(&gB) - tests := []struct { - name string - namespace string - inputG string - expectedAG *antreatypes.AppliedToGroup - expectedErr error - }{ - { - name: "empty-grp-no-result", - namespace: "nsA", - inputG: "", - expectedAG: nil, - }, - { - name: "ipblock-grp-no-result", - namespace: "nsB", - inputG: gB.Name, - expectedAG: nil, - expectedErr: ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: "nsB", groupName: gB.Name}, - }, - { - name: "selector-grp-missing-no-result", - namespace: "nsC", - inputG: gC.Name, - expectedAG: nil, - }, - { - name: "selector-grp", - namespace: "nsA", - inputG: gA.Name, - expectedAG: &antreatypes.AppliedToGroup{UID: gA.UID, Name: fmt.Sprintf("%s/%s", gA.Namespace, gA.Name)}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actualAG, actualErr := npc.createAppliedToGroupForNamespacedGroup(tt.namespace, tt.inputG) - assert.Equal(t, tt.expectedAG, actualAG, "appliedToGroup list does not match") - assert.ErrorIs(t, actualErr, tt.expectedErr) - }) - } -} - // util functions for testing. func getANP() *crdv1alpha1.NetworkPolicy { p10 := float64(10) diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy.go b/pkg/controller/networkpolicy/clusternetworkpolicy.go index 401235dad75..442003c0cb6 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy.go @@ -490,7 +490,7 @@ func (n *NetworkPolicyController) processClusterAppliedTo(appliedTo []crdv1alpha for _, at := range appliedTo { var atg *antreatypes.AppliedToGroup if at.Group != "" { - atg = n.createAppliedToGroupForCG(at.Group) + atg = n.createAppliedToGroupForGroup("", at.Group) } else if at.Service != nil { atg = n.createAppliedToGroupForService(at.Service) } else if at.ServiceAccount != nil { @@ -602,22 +602,3 @@ func (n *NetworkPolicyController) processRefGroupOrClusterGroup(g, namespace str } return nil, ipb } - -func (n *NetworkPolicyController) createAppliedToGroupForCG(clusterGroupName string) *antreatypes.AppliedToGroup { - // Find the internal Group corresponding to this ClusterGroup - // There is no need to check if the ClusterGroup exists in clusterGroupLister because its existence will eventually - // be reflected in internalGroupStore. - ig, found, _ := n.internalGroupStore.Get(clusterGroupName) - if !found { - // Internal Group was not found. Once the internal Group is created, the sync - // worker for internal group will re-enqueue the ClusterNetworkPolicy processing - // which will trigger the creation of AddressGroup. - return nil - } - intGrp := ig.(*antreatypes.Group) - if len(intGrp.IPBlocks) > 0 { - klog.V(2).InfoS("ClusterGroup with IPBlocks will not be processed as AppliedTo", "ClusterGroup", clusterGroupName) - return nil - } - return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: clusterGroupName} -} diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go index a3446b6aa92..d4967ff2b56 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go @@ -1795,73 +1795,6 @@ func TestProcessRefGroupOrClusterGroup(t *testing.T) { } } -func TestProcessAppliedToGroupsForCGs(t *testing.T) { - selectorA := metav1.LabelSelector{MatchLabels: map[string]string{"foo1": "bar1"}} - cidr := "10.0.0.0/24" - // cgA with selector present in cache - cgA := crdv1alpha3.ClusterGroup{ - ObjectMeta: metav1.ObjectMeta{Name: "cgA", UID: "uidA"}, - Spec: crdv1alpha3.GroupSpec{ - NamespaceSelector: &selectorA, - }, - } - // cgB with IPBlock present in cache - cgB := crdv1alpha3.ClusterGroup{ - ObjectMeta: metav1.ObjectMeta{Name: "cgB", UID: "uidB"}, - Spec: crdv1alpha3.GroupSpec{ - IPBlocks: []crdv1alpha1.IPBlock{ - { - CIDR: cidr, - }, - }, - }, - } - // cgC not found in cache - cgC := crdv1alpha3.ClusterGroup{ - ObjectMeta: metav1.ObjectMeta{Name: "cgC", UID: "uidC"}, - Spec: crdv1alpha3.GroupSpec{ - NamespaceSelector: &selectorA, - }, - } - _, npc := newController() - npc.addClusterGroup(&cgA) - npc.addClusterGroup(&cgB) - npc.cgStore.Add(&cgA) - npc.cgStore.Add(&cgB) - tests := []struct { - name string - inputCG string - expectedAG *antreatypes.AppliedToGroup - }{ - { - name: "empty-cgs-no-result", - inputCG: "", - expectedAG: nil, - }, - { - name: "ipblock-cgs-no-result", - inputCG: cgB.Name, - expectedAG: nil, - }, - { - name: "selector-cgs-missing-no-result", - inputCG: cgC.Name, - expectedAG: nil, - }, - { - name: "selector-cgs", - inputCG: cgA.Name, - expectedAG: &antreatypes.AppliedToGroup{UID: cgA.UID, Name: cgA.Name}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actualAG := npc.createAppliedToGroupForCG(tt.inputCG) - assert.Equal(t, tt.expectedAG, actualAG, "appliedToGroup list does not match") - }) - } -} - // util functions for testing. func getCNP() *crdv1alpha1.ClusterNetworkPolicy { diff --git a/pkg/controller/networkpolicy/crd_utils.go b/pkg/controller/networkpolicy/crd_utils.go index 7f57d09d245..f30bb2166a6 100644 --- a/pkg/controller/networkpolicy/crd_utils.go +++ b/pkg/controller/networkpolicy/crd_utils.go @@ -229,6 +229,29 @@ func (n *NetworkPolicyController) createAppliedToGroupForService(service *v1alph return appliedToGroup } +// createAppliedToGroupForGroup creates an AppliedToGroup object corresponding to a ClusterGroup or a Group. +// The namespace parameter is only provided when the group is namespace scoped. +func (n *NetworkPolicyController) createAppliedToGroupForGroup(namespace, group string) *antreatypes.AppliedToGroup { + // Cluster group uses NAME and Namespaced group uses NAMESPACE/NAME as the key of the corresponding internal group. + key := k8s.NamespacedName(namespace, group) + // Find the internal Group corresponding to this ClusterGroup/Group. + // There is no need to check if the ClusterGroup/Group exists in clusterGroupLister/groupLister because its + // existence will eventually be reflected in internalGroupStore. + ig, found, _ := n.internalGroupStore.Get(key) + if !found { + // Internal Group was not found. Once the internal Group is created, the sync worker for internal group will + // re-enqueue the ClusterNetworkPolicy/AntreaNetworkPolicy processing which will call this method again. So it's + // fine to ignore NotFound case. + return nil + } + intGrp := ig.(*antreatypes.Group) + if len(intGrp.IPBlocks) > 0 { + klog.V(2).InfoS("Group with IPBlocks can not be used as AppliedTo", "Group", key) + return nil + } + return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: key} +} + // getTierPriority retrieves the priority associated with the input Tier name. // If the Tier name is empty, by default, the lowest priority Application Tier // is returned. diff --git a/pkg/controller/networkpolicy/crd_utils_test.go b/pkg/controller/networkpolicy/crd_utils_test.go index 3e2bb22c2f9..c82b260630d 100644 --- a/pkg/controller/networkpolicy/crd_utils_test.go +++ b/pkg/controller/networkpolicy/crd_utils_test.go @@ -443,3 +443,89 @@ func TestToAntreaPeerForCRD(t *testing.T) { }) } } + +func TestCreateAppliedToGroupsForGroup(t *testing.T) { + selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} + cidr := "10.0.0.0/24" + // cgA with selector present in cache + clusterGroupWithSelector := &crdv1alpha3.ClusterGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "cgA", UID: "uidA"}, + Spec: crdv1alpha3.GroupSpec{NamespaceSelector: &selector}, + } + // cgB with IPBlock present in cache + clusterGroupWithIPBlock := &crdv1alpha3.ClusterGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "cgB", UID: "uidB"}, + Spec: crdv1alpha3.GroupSpec{IPBlocks: []crdv1alpha1.IPBlock{{CIDR: cidr}}}, + } + groupWithSelector := &crdv1alpha3.Group{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "gA", UID: "uidA"}, + Spec: crdv1alpha3.GroupSpec{PodSelector: &selector}, + } + // gB with IPBlock present in cache + groupWithIPBlock := &crdv1alpha3.Group{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"}, + Spec: crdv1alpha3.GroupSpec{IPBlocks: []crdv1alpha1.IPBlock{{CIDR: cidr}}}, + } + _, npc := newController() + npc.addClusterGroup(clusterGroupWithSelector) + npc.addClusterGroup(clusterGroupWithIPBlock) + npc.addGroup(groupWithSelector) + npc.addGroup(groupWithIPBlock) + tests := []struct { + name string + inputNamespace string + inputGroup string + expectedATG *antreatypes.AppliedToGroup + }{ + { + name: "empty cluster group name", + inputGroup: "", + expectedATG: nil, + }, + { + name: "cluster group with IPBlock", + inputGroup: clusterGroupWithIPBlock.Name, + expectedATG: nil, + }, + { + name: "non-existing cluster group", + inputGroup: "foo", + expectedATG: nil, + }, + { + name: "cluster group with selectors", + inputGroup: clusterGroupWithSelector.Name, + expectedATG: &antreatypes.AppliedToGroup{UID: clusterGroupWithSelector.UID, Name: clusterGroupWithSelector.Name}, + }, + { + name: "empty group name", + inputNamespace: "default", + inputGroup: "", + expectedATG: nil, + }, + { + name: "group with IPBlock", + inputNamespace: groupWithIPBlock.Namespace, + inputGroup: groupWithIPBlock.Name, + expectedATG: nil, + }, + { + name: "non-existing group", + inputNamespace: "foo", + inputGroup: "bar", + expectedATG: nil, + }, + { + name: "group with selectors", + inputNamespace: groupWithSelector.Namespace, + inputGroup: groupWithSelector.Name, + expectedATG: &antreatypes.AppliedToGroup{UID: groupWithSelector.UID, Name: fmt.Sprintf("%s/%s", groupWithSelector.Namespace, groupWithSelector.Name)}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualATG := npc.createAppliedToGroupForGroup(tt.inputNamespace, tt.inputGroup) + assert.Equal(t, tt.expectedATG, actualATG, "appliedToGroup does not match") + }) + } +} diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 88313e70e0e..cdf89089f83 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1243,47 +1243,57 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { } klog.V(2).Infof("Updating existing AppliedToGroup %s on %d Nodes", key, appGroupNodeNames.Len()) } else { - scheduledPodNum, scheduledExtEntityNum := 0, 0 - pods, externalEntities := n.getAppliedToWorkloads(appliedToGroup) - for _, pod := range pods { - if pod.Spec.NodeName == "" || pod.Spec.HostNetwork == true { - // No need to process Pod when it's not scheduled. - // HostNetwork Pods will not be applied to by policies. - continue + pods, externalEntities, err := n.getAppliedToWorkloads(appliedToGroup) + if err != nil { + klog.ErrorS(err, "Error when getting AppliedTo workloads for AppliedToGroup", "AppliedToGroup", appliedToGroup.Name) + updatedAppliedToGroup = &antreatypes.AppliedToGroup{ + UID: appliedToGroup.UID, + Name: appliedToGroup.Name, + Selector: appliedToGroup.Selector, + SyncError: err, } - scheduledPodNum++ - podSet := memberSetByNode[pod.Spec.NodeName] - if podSet == nil { - podSet = controlplane.GroupMemberSet{} + } else { + scheduledPodNum, scheduledExtEntityNum := 0, 0 + for _, pod := range pods { + if pod.Spec.NodeName == "" || pod.Spec.HostNetwork == true { + // No need to process Pod when it's not scheduled. + // HostNetwork Pods will not be applied to by policies. + continue + } + scheduledPodNum++ + podSet := memberSetByNode[pod.Spec.NodeName] + if podSet == nil { + podSet = controlplane.GroupMemberSet{} + } + podSet.Insert(podToGroupMember(pod, false)) + // Update the Pod references by Node. + memberSetByNode[pod.Spec.NodeName] = podSet + // Update the NodeNames in order to set the SpanMeta for AppliedToGroup. + appGroupNodeNames.Insert(pod.Spec.NodeName) } - podSet.Insert(podToGroupMember(pod, false)) - // Update the Pod references by Node. - memberSetByNode[pod.Spec.NodeName] = podSet - // Update the NodeNames in order to set the SpanMeta for AppliedToGroup. - appGroupNodeNames.Insert(pod.Spec.NodeName) - } - for _, extEntity := range externalEntities { - if extEntity.Spec.ExternalNode == "" { - continue + for _, extEntity := range externalEntities { + if extEntity.Spec.ExternalNode == "" { + continue + } + scheduledExtEntityNum++ + entitySet := memberSetByNode[extEntity.Spec.ExternalNode] + if entitySet == nil { + entitySet = controlplane.GroupMemberSet{} + } + entitySet.Insert(externalEntityToGroupMember(extEntity, false)) + memberSetByNode[extEntity.Spec.ExternalNode] = entitySet + appGroupNodeNames.Insert(extEntity.Spec.ExternalNode) } - scheduledExtEntityNum++ - entitySet := memberSetByNode[extEntity.Spec.ExternalNode] - if entitySet == nil { - entitySet = controlplane.GroupMemberSet{} + updatedAppliedToGroup = &antreatypes.AppliedToGroup{ + UID: appliedToGroup.UID, + Name: appliedToGroup.Name, + Selector: appliedToGroup.Selector, + GroupMemberByNode: memberSetByNode, + SpanMeta: antreatypes.SpanMeta{NodeNames: appGroupNodeNames}, } - entitySet.Insert(externalEntityToGroupMember(extEntity, false)) - memberSetByNode[extEntity.Spec.ExternalNode] = entitySet - appGroupNodeNames.Insert(extEntity.Spec.ExternalNode) + klog.V(2).Infof("Updating existing AppliedToGroup %s with %d Pods and %d External Entities on %d Nodes", + key, scheduledPodNum, scheduledExtEntityNum, appGroupNodeNames.Len()) } - updatedAppliedToGroup = &antreatypes.AppliedToGroup{ - UID: appliedToGroup.UID, - Name: appliedToGroup.Name, - Selector: appliedToGroup.Selector, - GroupMemberByNode: memberSetByNode, - SpanMeta: antreatypes.SpanMeta{NodeNames: appGroupNodeNames}, - } - klog.V(2).Infof("Updating existing AppliedToGroup %s with %d Pods and %d External Entities on %d Nodes", - key, scheduledPodNum, scheduledExtEntityNum, appGroupNodeNames.Len()) } n.appliedToGroupStore.Update(updatedAppliedToGroup) // Get all internal NetworkPolicy objects that refers this AppliedToGroup. @@ -1304,15 +1314,30 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { // getAppliedToWorkloads returns a list of workloads (Pods and ExternalEntities) selected by an AppliedToGroup // for standalone selectors or corresponding to a ClusterGroup. -func (n *NetworkPolicyController) getAppliedToWorkloads(g *antreatypes.AppliedToGroup) ([]*v1.Pod, []*v1alpha2.ExternalEntity) { - // Check if an internal Group object exists corresponding to this AppliedToGroup. +func (n *NetworkPolicyController) getAppliedToWorkloads(g *antreatypes.AppliedToGroup) ([]*v1.Pod, []*v1alpha2.ExternalEntity, error) { + // Check if an internal Group object exists corresponding to this AppliedToGroup group, found, _ := n.internalGroupStore.Get(g.Name) if found { // This AppliedToGroup is derived from a ClusterGroup. grp := group.(*antreatypes.Group) - return n.getInternalGroupWorkloads(grp) + pods, ees := n.getInternalGroupWorkloads(grp) + // Namespaced Group can only select entities in the same Namespace as the Group when used as AppliedTo. + if grp.SourceReference.Namespace != "" { + for _, pod := range pods { + if pod.Namespace != grp.SourceReference.Namespace { + return nil, nil, &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: grp.SourceReference.Name, namespace: grp.SourceReference.Namespace} + } + } + for _, ee := range ees { + if ee.Namespace != grp.SourceReference.Namespace { + return nil, nil, &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: grp.SourceReference.Name, namespace: grp.SourceReference.Namespace} + } + } + } + return pods, ees, nil } - return n.groupingInterface.GetEntities(appliedToGroupType, g.Name) + pods, ees := n.groupingInterface.GetEntities(appliedToGroupType, g.Name) + return pods, ees, nil } // getInternalGroupWorkloads returns a list of workloads (Pods and ExternalEntities) selected by a ClusterGroup. @@ -1386,18 +1411,29 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne newInternalNetworkPolicy, newAppliedToGroups, newAddressGroups = n.processNetworkPolicy(knp) } - newNodeNames := sets.NewString() - // Calculate the set of Node names based on the span of the - // AppliedToGroups referenced by this NetworkPolicy. - for appliedToGroupName := range newAppliedToGroups { - appGroupObj, found, _ := n.appliedToGroupStore.Get(appliedToGroupName) - if !found { - continue + newNodeNames, err := func() (sets.String, error) { + nodeNames := sets.NewString() + // Calculate the set of Node names based on the span of the + // AppliedToGroups referenced by this NetworkPolicy. + for appliedToGroupName := range newAppliedToGroups { + appGroupObj, found, _ := n.appliedToGroupStore.Get(appliedToGroupName) + if !found { + continue + } + appGroup := appGroupObj.(*antreatypes.AppliedToGroup) + if appGroup.SyncError != nil { + return nil, appGroup.SyncError + } + utilsets.MergeString(nodeNames, appGroup.SpanMeta.NodeNames) } - appGroup := appGroupObj.(*antreatypes.AppliedToGroup) - utilsets.MergeString(newNodeNames, appGroup.SpanMeta.NodeNames) + return nodeNames, nil + }() + if err != nil { + klog.ErrorS(err, "Error when processing AppliedToGroups for internal NetworkPolicy", "key", key) + newInternalNetworkPolicy.SyncError = err + } else { + newInternalNetworkPolicy.NodeNames = newNodeNames } - newInternalNetworkPolicy.NodeNames = newNodeNames var oldInternalNetworkPolicy *antreatypes.NetworkPolicy oldInternalNetworkPolicyObj, oldInternalPolicyExists, _ := n.internalNetworkPolicyStore.Get(internalNetworkPolicyName) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_test.go index 0a5bf1f6945..704606cb71e 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_test.go @@ -81,6 +81,7 @@ type networkPolicyController struct { serviceStore cache.Store networkPolicyStore cache.Store cnpStore cache.Store + anpStore cache.Store tierStore cache.Store cgStore cache.Store gStore cache.Store @@ -142,6 +143,7 @@ func newController(objects ...runtime.Object) (*fake.Clientset, *networkPolicyCo informerFactory.Core().V1().Services().Informer().GetStore(), informerFactory.Networking().V1().NetworkPolicies().Informer().GetStore(), crdInformerFactory.Crd().V1alpha1().ClusterNetworkPolicies().Informer().GetStore(), + crdInformerFactory.Crd().V1alpha1().NetworkPolicies().Informer().GetStore(), crdInformerFactory.Crd().V1alpha1().Tiers().Informer().GetStore(), crdInformerFactory.Crd().V1alpha3().ClusterGroups().Informer().GetStore(), crdInformerFactory.Crd().V1alpha3().Groups().Informer().GetStore(), @@ -211,6 +213,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) ( informerFactory.Core().V1().Services().Informer().GetStore(), informerFactory.Networking().V1().NetworkPolicies().Informer().GetStore(), cnpInformer.Informer().GetStore(), + anpInformer.Informer().GetStore(), tierInformer.Informer().GetStore(), cgInformer.Informer().GetStore(), groupInformer.Informer().GetStore(), @@ -2600,7 +2603,8 @@ func TestGetAppliedToWorkloads(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualPods, actualEEs := c.getAppliedToWorkloads(tt.inATG) + actualPods, actualEEs, actualErr := c.getAppliedToWorkloads(tt.inATG) + assert.NoError(t, actualErr) assert.Equal(t, tt.expEEs, actualEEs) assert.Equal(t, tt.expPods, actualPods) }) @@ -3139,6 +3143,242 @@ func TestSyncInternalNetworkPolicyConcurrently(t *testing.T) { checkGroupItemExistence(t, c.appliedToGroupStore) } +func TestSyncInternalNetworkPolicyWithGroups(t *testing.T) { + p10 := float64(10) + allowAction := crdv1alpha1.RuleActionAllow + podA := getPod("podA", "nsA", "nodeA", "10.0.0.1", false) + podA.Labels = selectorA.MatchLabels + podB := getPod("podB", "nsB", "nodeB", "10.0.0.2", false) + podB.Labels = selectorA.MatchLabels + selectorBGroup := getNormalizedUID(antreatypes.NewGroupSelector("nsA", &selectorB, nil, nil, nil).NormalizedName) + + tests := []struct { + name string + groups []*v1alpha3.Group + inputPolicy *crdv1alpha1.NetworkPolicy + expectedPolicy *antreatypes.NetworkPolicy + }{ + { + name: "anp with valid group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "groupA"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA")}, + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/groupA"}, + }, + }, + { + name: "anp with valid parent group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "parentGroup"}, + Spec: v1alpha3.GroupSpec{ChildGroups: []v1alpha3.ClusterGroupReference{"groupA"}}, + }, + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "parentGroup"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA")}, + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/parentGroup"}, + }, + }, + { + name: "anp with invalid group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{NamespaceSelector: &metav1.LabelSelector{}, PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "groupA"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/groupA"}, + SyncError: &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: "groupA", namespace: "nsA"}, + }, + }, + { + name: "anp with invalid parent group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "parentGroup"}, + Spec: v1alpha3.GroupSpec{ChildGroups: []v1alpha3.ClusterGroupReference{"groupA"}}, + }, + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{NamespaceSelector: &metav1.LabelSelector{}, PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "parentGroup"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/parentGroup"}, + SyncError: &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: "parentGroup", namespace: "nsA"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, c := newController(podA, podB) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.crdInformerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + go c.groupingInterface.Run(stopCh) + go c.groupingController.Run(stopCh) + go c.Run(stopCh) + + for _, group := range tt.groups { + c.crdClient.CrdV1alpha3().Groups(group.Namespace).Create(context.TODO(), group, metav1.CreateOptions{}) + } + c.crdClient.CrdV1alpha1().NetworkPolicies(tt.inputPolicy.Namespace).Create(context.TODO(), tt.inputPolicy, metav1.CreateOptions{}) + + var gotPolicy *antreatypes.NetworkPolicy + err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { + obj, exists, _ := c.internalNetworkPolicyStore.Get(tt.expectedPolicy.Name) + if !exists { + return false, nil + } + gotPolicy = obj.(*antreatypes.NetworkPolicy) + return reflect.DeepEqual(tt.expectedPolicy, gotPolicy), nil + }) + assert.NoError(t, err, "Expected %#v\ngot %#v", tt.expectedPolicy, gotPolicy) + }) + } +} + func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) { require.Equal(t, len(items), queue.Len()) expectedItems := sets.NewString(items...) diff --git a/pkg/controller/networkpolicy/status_controller.go b/pkg/controller/networkpolicy/status_controller.go index 265489e6858..446fe5c7665 100644 --- a/pkg/controller/networkpolicy/status_controller.go +++ b/pkg/controller/networkpolicy/status_controller.go @@ -268,14 +268,15 @@ func (c *StatusController) syncHandler(key string) error { } internalNP := internalNPObj.(*antreatypes.NetworkPolicy) - // It means the NetworkPolicy hasn't been processed once. Set it to Pending to differentiate from NetworkPolicies - // that spans 0 Node. - if internalNP.SpanMeta.NodeNames == nil { + updateStatus := func(phase crdv1alpha1.NetworkPolicyPhase, currentNodes, desiredNodes int) error { status := &crdv1alpha1.NetworkPolicyStatus{ - Phase: crdv1alpha1.NetworkPolicyPending, - ObservedGeneration: internalNP.Generation, - Conditions: GenerateNetworkPolicyCondition(nil), + Phase: phase, + ObservedGeneration: internalNP.Generation, + CurrentNodesRealized: int32(currentNodes), + DesiredNodesRealized: int32(desiredNodes), + Conditions: GenerateNetworkPolicyCondition(internalNP.SyncError), } + klog.V(2).Infof("Updating NetworkPolicy %s status: %v", internalNP.SourceRef.ToString(), status) if internalNP.SourceRef.Type == controlplane.AntreaNetworkPolicy { return c.npControlInterface.UpdateAntreaNetworkPolicyStatus(internalNP.SourceRef.Namespace, internalNP.SourceRef.Name, status) } @@ -284,16 +285,14 @@ func (c *StatusController) syncHandler(key string) error { // It means the NetworkPolicy has been processed, and marked as unrealizable. It will enter unrealizable phase // instead of being further realized. Antrea-agents will not process further. - if internalNP.RealizationError != nil { - status := &crdv1alpha1.NetworkPolicyStatus{ - Phase: crdv1alpha1.NetworkPolicyPending, - ObservedGeneration: internalNP.Generation, - Conditions: GenerateNetworkPolicyCondition(internalNP.RealizationError), - } - if internalNP.SourceRef.Type == controlplane.AntreaNetworkPolicy { - return c.npControlInterface.UpdateAntreaNetworkPolicyStatus(internalNP.SourceRef.Namespace, internalNP.SourceRef.Name, status) - } - return c.npControlInterface.UpdateAntreaClusterNetworkPolicyStatus(internalNP.SourceRef.Name, status) + if internalNP.SyncError != nil { + return updateStatus(crdv1alpha1.NetworkPolicyPending, 0, 0) + } + + // It means the NetworkPolicy hasn't been processed once. Set it to Pending to differentiate from NetworkPolicies + // that spans 0 Node. + if internalNP.SpanMeta.NodeNames == nil { + return updateStatus(crdv1alpha1.NetworkPolicyPending, 0, 0) } desiredNodes := len(internalNP.SpanMeta.NodeNames) @@ -315,18 +314,7 @@ func (c *StatusController) syncHandler(key string) error { phase = crdv1alpha1.NetworkPolicyRealized } - status := &crdv1alpha1.NetworkPolicyStatus{ - Phase: phase, - ObservedGeneration: internalNP.Generation, - CurrentNodesRealized: int32(currentNodes), - DesiredNodesRealized: int32(desiredNodes), - Conditions: GenerateNetworkPolicyCondition(nil), - } - klog.V(2).Infof("Updating NetworkPolicy %s status: %v", internalNP.SourceRef.ToString(), status) - if internalNP.SourceRef.Type == controlplane.AntreaNetworkPolicy { - return c.npControlInterface.UpdateAntreaNetworkPolicyStatus(internalNP.SourceRef.Namespace, internalNP.SourceRef.Name, status) - } - return c.npControlInterface.UpdateAntreaClusterNetworkPolicyStatus(internalNP.SourceRef.Name, status) + return updateStatus(phase, currentNodes, desiredNodes) } // networkPolicyControlInterface is an interface that knows how to update Antrea NetworkPolicy status. @@ -419,7 +407,7 @@ func GenerateNetworkPolicyCondition(err error) []crdv1alpha1.NetworkPolicyCondit Status: v1.ConditionTrue, LastTransitionTime: v1.Now(), }) - case ErrNetworkPolicyAppliedToUnsupportedGroup: + case *ErrNetworkPolicyAppliedToUnsupportedGroup: conditions = append(conditions, crdv1alpha1.NetworkPolicyCondition{ Type: crdv1alpha1.NetworkPolicyConditionRealizable, Status: v1.ConditionFalse, diff --git a/pkg/controller/types/networkpolicy.go b/pkg/controller/types/networkpolicy.go index 6c77fa86ea4..6a1f4099fca 100644 --- a/pkg/controller/types/networkpolicy.go +++ b/pkg/controller/types/networkpolicy.go @@ -58,6 +58,8 @@ type AppliedToGroup struct { // It will be converted to a slice of GroupMember for transferring according // to client's selection. GroupMemberByNode map[string]controlplane.GroupMemberSet + // SyncError is the Error encountered when syncing this AppliedToGroup. + SyncError error } // AddressGroup describes a set of addresses used as source or destination of Network Policy rules. @@ -101,9 +103,8 @@ type NetworkPolicy struct { // AppliedToPerRule tracks if appliedTo is set per rule basis rather than in policy spec. // Must be false for K8s NetworkPolicy. AppliedToPerRule bool - // RealizationError stores realization error of the internal Network Policy. - // It is set when processing the original Network Policy. - RealizationError error + // SyncError is the Error encountered when syncing this NetworkPolicy. + SyncError error } // GetAddressGroups returns AddressGroups used by this NetworkPolicy.