diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy.go b/pkg/controller/networkpolicy/antreanetworkpolicy.go index 1af2ce40c0f..25e0277cd5b 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy.go @@ -24,7 +24,6 @@ import ( "antrea.io/antrea/pkg/apis/controlplane" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" antreatypes "antrea.io/antrea/pkg/controller/types" - "antrea.io/antrea/pkg/util/k8s" ) func getANPReference(anp *crdv1alpha1.NetworkPolicy) *controlplane.NetworkPolicyReference { @@ -87,35 +86,15 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net appliedToGroups := map[string]*antreatypes.AppliedToGroup{} addressGroups := map[string]*antreatypes.AddressGroup{} rules := make([]controlplane.NetworkPolicyRule, 0, len(np.Spec.Ingress)+len(np.Spec.Egress)) - newUnrealizableInternalNetworkPolicy := func(err error) *antreatypes.NetworkPolicy { - return &antreatypes.NetworkPolicy{ - SourceRef: &controlplane.NetworkPolicyReference{ - Type: controlplane.AntreaNetworkPolicy, - Namespace: np.Namespace, - Name: np.Name, - UID: np.UID, - }, - Name: internalNetworkPolicyKeyFunc(np), - UID: np.UID, - Generation: np.Generation, - RealizationError: err, - } - } // Create AppliedToGroup for each AppliedTo present in AntreaNetworkPolicy spec. - atgs, err := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo) - if err != nil { - return newUnrealizableInternalNetworkPolicy(err), nil, nil - } + atgs := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) // Compute NetworkPolicyRule for Ingress Rule. for idx, ingressRule := range np.Spec.Ingress { // Set default action to ALLOW to allow traffic. services, namedPortExists := toAntreaServicesForCRD(ingressRule.Ports, ingressRule.Protocols) // Create AppliedToGroup for each AppliedTo present in the ingress rule. - atgs, err := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo) - if err != nil { - return newUnrealizableInternalNetworkPolicy(err), nil, nil - } + atgs := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) peer, ags := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists) addressGroups = mergeAddressGroups(addressGroups, ags...) @@ -135,10 +114,7 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net // Set default action to ALLOW to allow traffic. services, namedPortExists := toAntreaServicesForCRD(egressRule.Ports, egressRule.Protocols) // Create AppliedToGroup for each AppliedTo present in the egress rule. - atgs, err := n.processAppliedTo(np.Namespace, egressRule.AppliedTo) - if err != nil { - return newUnrealizableInternalNetworkPolicy(err), nil, nil - } + atgs := n.processAppliedTo(np.Namespace, egressRule.AppliedTo) appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...) var peer *controlplane.NetworkPolicyPeer if egressRule.ToServices != nil { @@ -179,16 +155,12 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net return internalNetworkPolicy, appliedToGroups, addressGroups } -func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer) ([]*antreatypes.AppliedToGroup, error) { +func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer) []*antreatypes.AppliedToGroup { var appliedToGroups []*antreatypes.AppliedToGroup for _, at := range appliedTo { var atg *antreatypes.AppliedToGroup if at.Group != "" { - var err error - atg, err = n.createAppliedToGroupForNamespacedGroup(namespace, at.Group) - if err != nil { - return nil, err - } + atg = n.createAppliedToGroupForGroup(namespace, at.Group) } else { atg = n.createAppliedToGroup(namespace, at.PodSelector, at.NamespaceSelector, at.ExternalEntitySelector) } @@ -196,37 +168,16 @@ func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo [ appliedToGroups = append(appliedToGroups, atg) } } - return appliedToGroups, nil + return appliedToGroups } // ErrNetworkPolicyAppliedToUnsupportedGroup is an error response when -// a Group with IPBlocks or NamespaceSelector is used as AppliedTo. +// a Group with Pods in other Namespaces is used as AppliedTo. type ErrNetworkPolicyAppliedToUnsupportedGroup struct { namespace string groupName string } -func (e ErrNetworkPolicyAppliedToUnsupportedGroup) Error() string { - return fmt.Sprintf("group %s/%s with IPBlocks or NamespaceSelector can not be used as AppliedTo", e.namespace, e.groupName) -} - -func (n *NetworkPolicyController) createAppliedToGroupForNamespacedGroup(namespace, groupName string) (*antreatypes.AppliedToGroup, error) { - // Namespaced group uses NAMESPACE/NAME as the key of the corresponding internal group. - key := k8s.NamespacedName(namespace, groupName) - // Find the internal Group corresponding to this Group. - // There is no need to check if the namespaced group exists in groupLister because its existence will eventually be - // reflected in internalGroupStore. - ig, found, _ := n.internalGroupStore.Get(key) - if !found { - // Internal Group is not found, which means the corresponding namespaced group is either not created yet or not - // processed yet. Once the internal Group is created and processed, the sync worker for internal group will - // re-enqueue the ClusterNetworkPolicy processing which will trigger the creation of AppliedToGroup. - return nil, nil - } - intGrp := ig.(*antreatypes.Group) - if len(intGrp.IPBlocks) > 0 || (intGrp.Selector != nil && intGrp.Selector.NamespaceSelector != nil) { - klog.V(2).InfoS("Group with IPBlocks or NamespaceSelector can not be used as AppliedTo", "Group", key) - return nil, ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: namespace, groupName: groupName} - } - return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: key}, nil +func (e *ErrNetworkPolicyAppliedToUnsupportedGroup) Error() string { + return fmt.Sprintf("group %s/%s with Pods in other Namespaces can not be used as AppliedTo", e.namespace, e.groupName) } diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go index 8d579d686a7..296461a2566 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go @@ -15,7 +15,6 @@ package networkpolicy import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -25,7 +24,6 @@ import ( "antrea.io/antrea/pkg/apis/controlplane" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" - crdv1alpha3 "antrea.io/antrea/pkg/apis/crd/v1alpha3" antreatypes "antrea.io/antrea/pkg/controller/types" ) @@ -620,81 +618,6 @@ func TestDeleteANP(t *testing.T) { assert.False(t, done) } -func TestProcessAppliedToGroupsForGroup(t *testing.T) { - selectorA := metav1.LabelSelector{MatchLabels: map[string]string{"foo1": "bar1"}} - cidr := "10.0.0.0/24" - // gA with selector present in cache - gA := crdv1alpha3.Group{ - ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "gA", UID: "uidA"}, - Spec: crdv1alpha3.GroupSpec{ - PodSelector: &selectorA, - }, - } - // gB with IPBlock present in cache - gB := crdv1alpha3.Group{ - ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"}, - Spec: crdv1alpha3.GroupSpec{ - IPBlocks: []crdv1alpha1.IPBlock{ - { - CIDR: cidr, - }, - }, - }, - } - // gC not found in cache - gC := crdv1alpha3.Group{ - ObjectMeta: metav1.ObjectMeta{Namespace: "nsC", Name: "gC", UID: "uidC"}, - Spec: crdv1alpha3.GroupSpec{ - NamespaceSelector: &selectorA, - }, - } - _, npc := newController() - npc.addGroup(&gA) - npc.addGroup(&gB) - npc.gStore.Add(&gA) - npc.gStore.Add(&gB) - tests := []struct { - name string - namespace string - inputG string - expectedAG *antreatypes.AppliedToGroup - expectedErr error - }{ - { - name: "empty-grp-no-result", - namespace: "nsA", - inputG: "", - expectedAG: nil, - }, - { - name: "ipblock-grp-no-result", - namespace: "nsB", - inputG: gB.Name, - expectedAG: nil, - expectedErr: ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: "nsB", groupName: gB.Name}, - }, - { - name: "selector-grp-missing-no-result", - namespace: "nsC", - inputG: gC.Name, - expectedAG: nil, - }, - { - name: "selector-grp", - namespace: "nsA", - inputG: gA.Name, - expectedAG: &antreatypes.AppliedToGroup{UID: gA.UID, Name: fmt.Sprintf("%s/%s", gA.Namespace, gA.Name)}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actualAG, actualErr := npc.createAppliedToGroupForNamespacedGroup(tt.namespace, tt.inputG) - assert.Equal(t, tt.expectedAG, actualAG, "appliedToGroup list does not match") - assert.ErrorIs(t, actualErr, tt.expectedErr) - }) - } -} - // util functions for testing. func getANP() *crdv1alpha1.NetworkPolicy { p10 := float64(10) diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy.go b/pkg/controller/networkpolicy/clusternetworkpolicy.go index 401235dad75..442003c0cb6 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy.go @@ -490,7 +490,7 @@ func (n *NetworkPolicyController) processClusterAppliedTo(appliedTo []crdv1alpha for _, at := range appliedTo { var atg *antreatypes.AppliedToGroup if at.Group != "" { - atg = n.createAppliedToGroupForCG(at.Group) + atg = n.createAppliedToGroupForGroup("", at.Group) } else if at.Service != nil { atg = n.createAppliedToGroupForService(at.Service) } else if at.ServiceAccount != nil { @@ -602,22 +602,3 @@ func (n *NetworkPolicyController) processRefGroupOrClusterGroup(g, namespace str } return nil, ipb } - -func (n *NetworkPolicyController) createAppliedToGroupForCG(clusterGroupName string) *antreatypes.AppliedToGroup { - // Find the internal Group corresponding to this ClusterGroup - // There is no need to check if the ClusterGroup exists in clusterGroupLister because its existence will eventually - // be reflected in internalGroupStore. - ig, found, _ := n.internalGroupStore.Get(clusterGroupName) - if !found { - // Internal Group was not found. Once the internal Group is created, the sync - // worker for internal group will re-enqueue the ClusterNetworkPolicy processing - // which will trigger the creation of AddressGroup. - return nil - } - intGrp := ig.(*antreatypes.Group) - if len(intGrp.IPBlocks) > 0 { - klog.V(2).InfoS("ClusterGroup with IPBlocks will not be processed as AppliedTo", "ClusterGroup", clusterGroupName) - return nil - } - return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: clusterGroupName} -} diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go index a3446b6aa92..d4967ff2b56 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go @@ -1795,73 +1795,6 @@ func TestProcessRefGroupOrClusterGroup(t *testing.T) { } } -func TestProcessAppliedToGroupsForCGs(t *testing.T) { - selectorA := metav1.LabelSelector{MatchLabels: map[string]string{"foo1": "bar1"}} - cidr := "10.0.0.0/24" - // cgA with selector present in cache - cgA := crdv1alpha3.ClusterGroup{ - ObjectMeta: metav1.ObjectMeta{Name: "cgA", UID: "uidA"}, - Spec: crdv1alpha3.GroupSpec{ - NamespaceSelector: &selectorA, - }, - } - // cgB with IPBlock present in cache - cgB := crdv1alpha3.ClusterGroup{ - ObjectMeta: metav1.ObjectMeta{Name: "cgB", UID: "uidB"}, - Spec: crdv1alpha3.GroupSpec{ - IPBlocks: []crdv1alpha1.IPBlock{ - { - CIDR: cidr, - }, - }, - }, - } - // cgC not found in cache - cgC := crdv1alpha3.ClusterGroup{ - ObjectMeta: metav1.ObjectMeta{Name: "cgC", UID: "uidC"}, - Spec: crdv1alpha3.GroupSpec{ - NamespaceSelector: &selectorA, - }, - } - _, npc := newController() - npc.addClusterGroup(&cgA) - npc.addClusterGroup(&cgB) - npc.cgStore.Add(&cgA) - npc.cgStore.Add(&cgB) - tests := []struct { - name string - inputCG string - expectedAG *antreatypes.AppliedToGroup - }{ - { - name: "empty-cgs-no-result", - inputCG: "", - expectedAG: nil, - }, - { - name: "ipblock-cgs-no-result", - inputCG: cgB.Name, - expectedAG: nil, - }, - { - name: "selector-cgs-missing-no-result", - inputCG: cgC.Name, - expectedAG: nil, - }, - { - name: "selector-cgs", - inputCG: cgA.Name, - expectedAG: &antreatypes.AppliedToGroup{UID: cgA.UID, Name: cgA.Name}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actualAG := npc.createAppliedToGroupForCG(tt.inputCG) - assert.Equal(t, tt.expectedAG, actualAG, "appliedToGroup list does not match") - }) - } -} - // util functions for testing. func getCNP() *crdv1alpha1.ClusterNetworkPolicy { diff --git a/pkg/controller/networkpolicy/crd_utils.go b/pkg/controller/networkpolicy/crd_utils.go index 7f57d09d245..f30bb2166a6 100644 --- a/pkg/controller/networkpolicy/crd_utils.go +++ b/pkg/controller/networkpolicy/crd_utils.go @@ -229,6 +229,29 @@ func (n *NetworkPolicyController) createAppliedToGroupForService(service *v1alph return appliedToGroup } +// createAppliedToGroupForGroup creates an AppliedToGroup object corresponding to a ClusterGroup or a Group. +// The namespace parameter is only provided when the group is namespace scoped. +func (n *NetworkPolicyController) createAppliedToGroupForGroup(namespace, group string) *antreatypes.AppliedToGroup { + // Cluster group uses NAME and Namespaced group uses NAMESPACE/NAME as the key of the corresponding internal group. + key := k8s.NamespacedName(namespace, group) + // Find the internal Group corresponding to this ClusterGroup/Group. + // There is no need to check if the ClusterGroup/Group exists in clusterGroupLister/groupLister because its + // existence will eventually be reflected in internalGroupStore. + ig, found, _ := n.internalGroupStore.Get(key) + if !found { + // Internal Group was not found. Once the internal Group is created, the sync worker for internal group will + // re-enqueue the ClusterNetworkPolicy/AntreaNetworkPolicy processing which will call this method again. So it's + // fine to ignore NotFound case. + return nil + } + intGrp := ig.(*antreatypes.Group) + if len(intGrp.IPBlocks) > 0 { + klog.V(2).InfoS("Group with IPBlocks can not be used as AppliedTo", "Group", key) + return nil + } + return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: key} +} + // getTierPriority retrieves the priority associated with the input Tier name. // If the Tier name is empty, by default, the lowest priority Application Tier // is returned. diff --git a/pkg/controller/networkpolicy/crd_utils_test.go b/pkg/controller/networkpolicy/crd_utils_test.go index 3e2bb22c2f9..c82b260630d 100644 --- a/pkg/controller/networkpolicy/crd_utils_test.go +++ b/pkg/controller/networkpolicy/crd_utils_test.go @@ -443,3 +443,89 @@ func TestToAntreaPeerForCRD(t *testing.T) { }) } } + +func TestCreateAppliedToGroupsForGroup(t *testing.T) { + selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} + cidr := "10.0.0.0/24" + // cgA with selector present in cache + clusterGroupWithSelector := &crdv1alpha3.ClusterGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "cgA", UID: "uidA"}, + Spec: crdv1alpha3.GroupSpec{NamespaceSelector: &selector}, + } + // cgB with IPBlock present in cache + clusterGroupWithIPBlock := &crdv1alpha3.ClusterGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "cgB", UID: "uidB"}, + Spec: crdv1alpha3.GroupSpec{IPBlocks: []crdv1alpha1.IPBlock{{CIDR: cidr}}}, + } + groupWithSelector := &crdv1alpha3.Group{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "gA", UID: "uidA"}, + Spec: crdv1alpha3.GroupSpec{PodSelector: &selector}, + } + // gB with IPBlock present in cache + groupWithIPBlock := &crdv1alpha3.Group{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"}, + Spec: crdv1alpha3.GroupSpec{IPBlocks: []crdv1alpha1.IPBlock{{CIDR: cidr}}}, + } + _, npc := newController() + npc.addClusterGroup(clusterGroupWithSelector) + npc.addClusterGroup(clusterGroupWithIPBlock) + npc.addGroup(groupWithSelector) + npc.addGroup(groupWithIPBlock) + tests := []struct { + name string + inputNamespace string + inputGroup string + expectedATG *antreatypes.AppliedToGroup + }{ + { + name: "empty cluster group name", + inputGroup: "", + expectedATG: nil, + }, + { + name: "cluster group with IPBlock", + inputGroup: clusterGroupWithIPBlock.Name, + expectedATG: nil, + }, + { + name: "non-existing cluster group", + inputGroup: "foo", + expectedATG: nil, + }, + { + name: "cluster group with selectors", + inputGroup: clusterGroupWithSelector.Name, + expectedATG: &antreatypes.AppliedToGroup{UID: clusterGroupWithSelector.UID, Name: clusterGroupWithSelector.Name}, + }, + { + name: "empty group name", + inputNamespace: "default", + inputGroup: "", + expectedATG: nil, + }, + { + name: "group with IPBlock", + inputNamespace: groupWithIPBlock.Namespace, + inputGroup: groupWithIPBlock.Name, + expectedATG: nil, + }, + { + name: "non-existing group", + inputNamespace: "foo", + inputGroup: "bar", + expectedATG: nil, + }, + { + name: "group with selectors", + inputNamespace: groupWithSelector.Namespace, + inputGroup: groupWithSelector.Name, + expectedATG: &antreatypes.AppliedToGroup{UID: groupWithSelector.UID, Name: fmt.Sprintf("%s/%s", groupWithSelector.Namespace, groupWithSelector.Name)}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualATG := npc.createAppliedToGroupForGroup(tt.inputNamespace, tt.inputGroup) + assert.Equal(t, tt.expectedATG, actualATG, "appliedToGroup does not match") + }) + } +} diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 88313e70e0e..7bb0de5aacb 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1243,47 +1243,57 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { } klog.V(2).Infof("Updating existing AppliedToGroup %s on %d Nodes", key, appGroupNodeNames.Len()) } else { - scheduledPodNum, scheduledExtEntityNum := 0, 0 - pods, externalEntities := n.getAppliedToWorkloads(appliedToGroup) - for _, pod := range pods { - if pod.Spec.NodeName == "" || pod.Spec.HostNetwork == true { - // No need to process Pod when it's not scheduled. - // HostNetwork Pods will not be applied to by policies. - continue + pods, externalEntities, err := n.getAppliedToWorkloads(appliedToGroup) + if err != nil { + klog.ErrorS(err, "Error when getting AppliedTo workloads for AppliedToGroup", "AppliedToGroup", appliedToGroup.Name) + updatedAppliedToGroup = &antreatypes.AppliedToGroup{ + UID: appliedToGroup.UID, + Name: appliedToGroup.Name, + Selector: appliedToGroup.Selector, + SyncError: err, } - scheduledPodNum++ - podSet := memberSetByNode[pod.Spec.NodeName] - if podSet == nil { - podSet = controlplane.GroupMemberSet{} + } else { + scheduledPodNum, scheduledExtEntityNum := 0, 0 + for _, pod := range pods { + if pod.Spec.NodeName == "" || pod.Spec.HostNetwork == true { + // No need to process Pod when it's not scheduled. + // HostNetwork Pods will not be applied to by policies. + continue + } + scheduledPodNum++ + podSet := memberSetByNode[pod.Spec.NodeName] + if podSet == nil { + podSet = controlplane.GroupMemberSet{} + } + podSet.Insert(podToGroupMember(pod, false)) + // Update the Pod references by Node. + memberSetByNode[pod.Spec.NodeName] = podSet + // Update the NodeNames in order to set the SpanMeta for AppliedToGroup. + appGroupNodeNames.Insert(pod.Spec.NodeName) } - podSet.Insert(podToGroupMember(pod, false)) - // Update the Pod references by Node. - memberSetByNode[pod.Spec.NodeName] = podSet - // Update the NodeNames in order to set the SpanMeta for AppliedToGroup. - appGroupNodeNames.Insert(pod.Spec.NodeName) - } - for _, extEntity := range externalEntities { - if extEntity.Spec.ExternalNode == "" { - continue + for _, extEntity := range externalEntities { + if extEntity.Spec.ExternalNode == "" { + continue + } + scheduledExtEntityNum++ + entitySet := memberSetByNode[extEntity.Spec.ExternalNode] + if entitySet == nil { + entitySet = controlplane.GroupMemberSet{} + } + entitySet.Insert(externalEntityToGroupMember(extEntity, false)) + memberSetByNode[extEntity.Spec.ExternalNode] = entitySet + appGroupNodeNames.Insert(extEntity.Spec.ExternalNode) } - scheduledExtEntityNum++ - entitySet := memberSetByNode[extEntity.Spec.ExternalNode] - if entitySet == nil { - entitySet = controlplane.GroupMemberSet{} + updatedAppliedToGroup = &antreatypes.AppliedToGroup{ + UID: appliedToGroup.UID, + Name: appliedToGroup.Name, + Selector: appliedToGroup.Selector, + GroupMemberByNode: memberSetByNode, + SpanMeta: antreatypes.SpanMeta{NodeNames: appGroupNodeNames}, } - entitySet.Insert(externalEntityToGroupMember(extEntity, false)) - memberSetByNode[extEntity.Spec.ExternalNode] = entitySet - appGroupNodeNames.Insert(extEntity.Spec.ExternalNode) + klog.V(2).Infof("Updating existing AppliedToGroup %s with %d Pods and %d External Entities on %d Nodes", + key, scheduledPodNum, scheduledExtEntityNum, appGroupNodeNames.Len()) } - updatedAppliedToGroup = &antreatypes.AppliedToGroup{ - UID: appliedToGroup.UID, - Name: appliedToGroup.Name, - Selector: appliedToGroup.Selector, - GroupMemberByNode: memberSetByNode, - SpanMeta: antreatypes.SpanMeta{NodeNames: appGroupNodeNames}, - } - klog.V(2).Infof("Updating existing AppliedToGroup %s with %d Pods and %d External Entities on %d Nodes", - key, scheduledPodNum, scheduledExtEntityNum, appGroupNodeNames.Len()) } n.appliedToGroupStore.Update(updatedAppliedToGroup) // Get all internal NetworkPolicy objects that refers this AppliedToGroup. @@ -1304,15 +1314,31 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { // getAppliedToWorkloads returns a list of workloads (Pods and ExternalEntities) selected by an AppliedToGroup // for standalone selectors or corresponding to a ClusterGroup. -func (n *NetworkPolicyController) getAppliedToWorkloads(g *antreatypes.AppliedToGroup) ([]*v1.Pod, []*v1alpha2.ExternalEntity) { - // Check if an internal Group object exists corresponding to this AppliedToGroup. +func (n *NetworkPolicyController) getAppliedToWorkloads(g *antreatypes.AppliedToGroup) ([]*v1.Pod, []*v1alpha2.ExternalEntity, error) { + // Check if an internal Group object exists corresponding to this AppliedToGroup group, found, _ := n.internalGroupStore.Get(g.Name) if found { // This AppliedToGroup is derived from a ClusterGroup. grp := group.(*antreatypes.Group) - return n.getInternalGroupWorkloads(grp) + pods, ees := n.getInternalGroupWorkloads(grp) + if !grp.IsClusterGroup() { + // Namespaced Group can only select Pods and ExternalEntities in the same Namespace as the Group when used + // for AppliedTo. + for _, pod := range pods { + if pod.Namespace != grp.SourceReference.Namespace { + return nil, nil, &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: grp.SourceReference.Name, namespace: grp.SourceReference.Namespace} + } + } + for _, ee := range ees { + if ee.Namespace != grp.SourceReference.Namespace { + return nil, nil, &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: grp.SourceReference.Name, namespace: grp.SourceReference.Namespace} + } + } + } + return pods, ees, nil } - return n.groupingInterface.GetEntities(appliedToGroupType, g.Name) + pods, ees := n.groupingInterface.GetEntities(appliedToGroupType, g.Name) + return pods, ees, nil } // getInternalGroupWorkloads returns a list of workloads (Pods and ExternalEntities) selected by a ClusterGroup. @@ -1386,18 +1412,29 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne newInternalNetworkPolicy, newAppliedToGroups, newAddressGroups = n.processNetworkPolicy(knp) } - newNodeNames := sets.NewString() - // Calculate the set of Node names based on the span of the - // AppliedToGroups referenced by this NetworkPolicy. - for appliedToGroupName := range newAppliedToGroups { - appGroupObj, found, _ := n.appliedToGroupStore.Get(appliedToGroupName) - if !found { - continue + newNodeNames, err := func() (sets.String, error) { + nodeNames := sets.NewString() + // Calculate the set of Node names based on the span of the + // AppliedToGroups referenced by this NetworkPolicy. + for appliedToGroupName := range newAppliedToGroups { + appGroupObj, found, _ := n.appliedToGroupStore.Get(appliedToGroupName) + if !found { + continue + } + appGroup := appGroupObj.(*antreatypes.AppliedToGroup) + if appGroup.SyncError != nil { + return nil, appGroup.SyncError + } + utilsets.MergeString(nodeNames, appGroup.SpanMeta.NodeNames) } - appGroup := appGroupObj.(*antreatypes.AppliedToGroup) - utilsets.MergeString(newNodeNames, appGroup.SpanMeta.NodeNames) + return nodeNames, nil + }() + if err != nil { + klog.ErrorS(err, "Error when processing AppliedToGroups for internal NetworkPolicy", "key", key) + newInternalNetworkPolicy.SyncError = err + } else { + newInternalNetworkPolicy.NodeNames = newNodeNames } - newInternalNetworkPolicy.NodeNames = newNodeNames var oldInternalNetworkPolicy *antreatypes.NetworkPolicy oldInternalNetworkPolicyObj, oldInternalPolicyExists, _ := n.internalNetworkPolicyStore.Get(internalNetworkPolicyName) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_test.go index 0a5bf1f6945..704606cb71e 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_test.go @@ -81,6 +81,7 @@ type networkPolicyController struct { serviceStore cache.Store networkPolicyStore cache.Store cnpStore cache.Store + anpStore cache.Store tierStore cache.Store cgStore cache.Store gStore cache.Store @@ -142,6 +143,7 @@ func newController(objects ...runtime.Object) (*fake.Clientset, *networkPolicyCo informerFactory.Core().V1().Services().Informer().GetStore(), informerFactory.Networking().V1().NetworkPolicies().Informer().GetStore(), crdInformerFactory.Crd().V1alpha1().ClusterNetworkPolicies().Informer().GetStore(), + crdInformerFactory.Crd().V1alpha1().NetworkPolicies().Informer().GetStore(), crdInformerFactory.Crd().V1alpha1().Tiers().Informer().GetStore(), crdInformerFactory.Crd().V1alpha3().ClusterGroups().Informer().GetStore(), crdInformerFactory.Crd().V1alpha3().Groups().Informer().GetStore(), @@ -211,6 +213,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) ( informerFactory.Core().V1().Services().Informer().GetStore(), informerFactory.Networking().V1().NetworkPolicies().Informer().GetStore(), cnpInformer.Informer().GetStore(), + anpInformer.Informer().GetStore(), tierInformer.Informer().GetStore(), cgInformer.Informer().GetStore(), groupInformer.Informer().GetStore(), @@ -2600,7 +2603,8 @@ func TestGetAppliedToWorkloads(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualPods, actualEEs := c.getAppliedToWorkloads(tt.inATG) + actualPods, actualEEs, actualErr := c.getAppliedToWorkloads(tt.inATG) + assert.NoError(t, actualErr) assert.Equal(t, tt.expEEs, actualEEs) assert.Equal(t, tt.expPods, actualPods) }) @@ -3139,6 +3143,242 @@ func TestSyncInternalNetworkPolicyConcurrently(t *testing.T) { checkGroupItemExistence(t, c.appliedToGroupStore) } +func TestSyncInternalNetworkPolicyWithGroups(t *testing.T) { + p10 := float64(10) + allowAction := crdv1alpha1.RuleActionAllow + podA := getPod("podA", "nsA", "nodeA", "10.0.0.1", false) + podA.Labels = selectorA.MatchLabels + podB := getPod("podB", "nsB", "nodeB", "10.0.0.2", false) + podB.Labels = selectorA.MatchLabels + selectorBGroup := getNormalizedUID(antreatypes.NewGroupSelector("nsA", &selectorB, nil, nil, nil).NormalizedName) + + tests := []struct { + name string + groups []*v1alpha3.Group + inputPolicy *crdv1alpha1.NetworkPolicy + expectedPolicy *antreatypes.NetworkPolicy + }{ + { + name: "anp with valid group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "groupA"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA")}, + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/groupA"}, + }, + }, + { + name: "anp with valid parent group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "parentGroup"}, + Spec: v1alpha3.GroupSpec{ChildGroups: []v1alpha3.ClusterGroupReference{"groupA"}}, + }, + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "parentGroup"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.NewString("nodeA")}, + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/parentGroup"}, + }, + }, + { + name: "anp with invalid group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{NamespaceSelector: &metav1.LabelSelector{}, PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "groupA"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/groupA"}, + SyncError: &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: "groupA", namespace: "nsA"}, + }, + }, + { + name: "anp with invalid parent group", + groups: []*v1alpha3.Group{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "parentGroup"}, + Spec: v1alpha3.GroupSpec{ChildGroups: []v1alpha3.ClusterGroupReference{"groupA"}}, + }, + { + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "groupA"}, + Spec: v1alpha3.GroupSpec{NamespaceSelector: &metav1.LabelSelector{}, PodSelector: &selectorA}, + }, + }, + inputPolicy: &crdv1alpha1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "anpA", UID: "uidA"}, + Spec: crdv1alpha1.NetworkPolicySpec{ + AppliedTo: []crdv1alpha1.NetworkPolicyPeer{ + {Group: "parentGroup"}, + }, + Priority: p10, + Ingress: []crdv1alpha1.Rule{ + { + From: []crdv1alpha1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + Action: &allowAction, + }, + }, + }, + }, + expectedPolicy: &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.AntreaNetworkPolicy, + Namespace: "nsA", + Name: "anpA", + UID: "uidA", + }, + Priority: &p10, + TierPriority: &DefaultTierPriority, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroup}}, + Action: &allowAction, + }, + }, + AppliedToGroups: []string{"nsA/parentGroup"}, + SyncError: &ErrNetworkPolicyAppliedToUnsupportedGroup{groupName: "parentGroup", namespace: "nsA"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, c := newController(podA, podB) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.crdInformerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + go c.groupingInterface.Run(stopCh) + go c.groupingController.Run(stopCh) + go c.Run(stopCh) + + for _, group := range tt.groups { + c.crdClient.CrdV1alpha3().Groups(group.Namespace).Create(context.TODO(), group, metav1.CreateOptions{}) + } + c.crdClient.CrdV1alpha1().NetworkPolicies(tt.inputPolicy.Namespace).Create(context.TODO(), tt.inputPolicy, metav1.CreateOptions{}) + + var gotPolicy *antreatypes.NetworkPolicy + err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { + obj, exists, _ := c.internalNetworkPolicyStore.Get(tt.expectedPolicy.Name) + if !exists { + return false, nil + } + gotPolicy = obj.(*antreatypes.NetworkPolicy) + return reflect.DeepEqual(tt.expectedPolicy, gotPolicy), nil + }) + assert.NoError(t, err, "Expected %#v\ngot %#v", tt.expectedPolicy, gotPolicy) + }) + } +} + func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) { require.Equal(t, len(items), queue.Len()) expectedItems := sets.NewString(items...) diff --git a/pkg/controller/networkpolicy/status_controller.go b/pkg/controller/networkpolicy/status_controller.go index 265489e6858..4a225d861f8 100644 --- a/pkg/controller/networkpolicy/status_controller.go +++ b/pkg/controller/networkpolicy/status_controller.go @@ -268,13 +268,13 @@ func (c *StatusController) syncHandler(key string) error { } internalNP := internalNPObj.(*antreatypes.NetworkPolicy) - // It means the NetworkPolicy hasn't been processed once. Set it to Pending to differentiate from NetworkPolicies - // that spans 0 Node. - if internalNP.SpanMeta.NodeNames == nil { + // It means the NetworkPolicy has been processed, and marked as unrealizable. It will enter unrealizable phase + // instead of being further realized. Antrea-agents will not process further. + if internalNP.SyncError != nil { status := &crdv1alpha1.NetworkPolicyStatus{ Phase: crdv1alpha1.NetworkPolicyPending, ObservedGeneration: internalNP.Generation, - Conditions: GenerateNetworkPolicyCondition(nil), + Conditions: GenerateNetworkPolicyCondition(internalNP.SyncError), } if internalNP.SourceRef.Type == controlplane.AntreaNetworkPolicy { return c.npControlInterface.UpdateAntreaNetworkPolicyStatus(internalNP.SourceRef.Namespace, internalNP.SourceRef.Name, status) @@ -282,13 +282,13 @@ func (c *StatusController) syncHandler(key string) error { return c.npControlInterface.UpdateAntreaClusterNetworkPolicyStatus(internalNP.SourceRef.Name, status) } - // It means the NetworkPolicy has been processed, and marked as unrealizable. It will enter unrealizable phase - // instead of being further realized. Antrea-agents will not process further. - if internalNP.RealizationError != nil { + // It means the NetworkPolicy hasn't been processed once. Set it to Pending to differentiate from NetworkPolicies + // that spans 0 Node. + if internalNP.SpanMeta.NodeNames == nil { status := &crdv1alpha1.NetworkPolicyStatus{ Phase: crdv1alpha1.NetworkPolicyPending, ObservedGeneration: internalNP.Generation, - Conditions: GenerateNetworkPolicyCondition(internalNP.RealizationError), + Conditions: GenerateNetworkPolicyCondition(nil), } if internalNP.SourceRef.Type == controlplane.AntreaNetworkPolicy { return c.npControlInterface.UpdateAntreaNetworkPolicyStatus(internalNP.SourceRef.Namespace, internalNP.SourceRef.Name, status) @@ -419,7 +419,7 @@ func GenerateNetworkPolicyCondition(err error) []crdv1alpha1.NetworkPolicyCondit Status: v1.ConditionTrue, LastTransitionTime: v1.Now(), }) - case ErrNetworkPolicyAppliedToUnsupportedGroup: + case *ErrNetworkPolicyAppliedToUnsupportedGroup: conditions = append(conditions, crdv1alpha1.NetworkPolicyCondition{ Type: crdv1alpha1.NetworkPolicyConditionRealizable, Status: v1.ConditionFalse, diff --git a/pkg/controller/types/group.go b/pkg/controller/types/group.go index ae43b687559..9e7fe41b050 100644 --- a/pkg/controller/types/group.go +++ b/pkg/controller/types/group.go @@ -131,3 +131,7 @@ type Group struct { // ChildGroups is the list of Group names that belong to this Group. ChildGroups []string } + +func (g *Group) IsClusterGroup() bool { + return g.SourceReference.Namespace == "" +} diff --git a/pkg/controller/types/networkpolicy.go b/pkg/controller/types/networkpolicy.go index 6c77fa86ea4..6a1f4099fca 100644 --- a/pkg/controller/types/networkpolicy.go +++ b/pkg/controller/types/networkpolicy.go @@ -58,6 +58,8 @@ type AppliedToGroup struct { // It will be converted to a slice of GroupMember for transferring according // to client's selection. GroupMemberByNode map[string]controlplane.GroupMemberSet + // SyncError is the Error encountered when syncing this AppliedToGroup. + SyncError error } // AddressGroup describes a set of addresses used as source or destination of Network Policy rules. @@ -101,9 +103,8 @@ type NetworkPolicy struct { // AppliedToPerRule tracks if appliedTo is set per rule basis rather than in policy spec. // Must be false for K8s NetworkPolicy. AppliedToPerRule bool - // RealizationError stores realization error of the internal Network Policy. - // It is set when processing the original Network Policy. - RealizationError error + // SyncError is the Error encountered when syncing this NetworkPolicy. + SyncError error } // GetAddressGroups returns AddressGroups used by this NetworkPolicy.