diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index 8d2f3d1a772..1cc8a3fbb31 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -591,10 +591,10 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule, memberByServicesMap, servicesMap := groupMembersByServices(newRule.Services, newRule.ToAddresses) // Same as the process in `add`, we must ensure the group for the original services is present // in memberByServicesMap, so that this group won't be removed and its "From" will be updated. - svcKey := normalizeServices(newRule.Services) - if _, exists := memberByServicesMap[svcKey]; !exists { - memberByServicesMap[svcKey] = v1beta2.NewGroupMemberSet() - servicesMap[svcKey] = newRule.Services + originalSvcKey := normalizeServices(newRule.Services) + if _, exists := memberByServicesMap[originalSvcKey]; !exists { + memberByServicesMap[originalSvcKey] = v1beta2.NewGroupMemberSet() + servicesMap[originalSvcKey] = newRule.Services } prevMembersByServicesMap, _ := groupMembersByServices(lastRealized.Services, lastRealized.ToAddresses) for svcKey, members := range memberByServicesMap { @@ -612,6 +612,12 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule, PolicyRef: newRule.SourceRef, EnableLogging: newRule.EnableLogging, } + // If the PolicyRule for the original services doesn't exist and IPBlocks is present, it means the + // reconciler hasn't installed flows for IPBlocks, then it must be added to the new PolicyRule. + if svcKey == originalSvcKey && len(newRule.To.IPBlocks) > 0 { + to := ipBlocksToOFAddresses(newRule.To.IPBlocks, r.ipv4Enabled, r.ipv6Enabled) + ofRule.To = append(ofRule.To, to...) + } err := r.idAllocator.allocateForRule(ofRule) if err != nil { return fmt.Errorf("error allocating Openflow ID") diff --git a/pkg/agent/controller/networkpolicy/reconciler_test.go b/pkg/agent/controller/networkpolicy/reconciler_test.go index 2c025d65e6d..c5a061065a1 100644 --- a/pkg/agent/controller/networkpolicy/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/reconciler_test.go @@ -15,8 +15,10 @@ package networkpolicy import ( + "errors" "fmt" "net" + "reflect" "testing" "github.com/golang/mock/gomock" @@ -86,6 +88,8 @@ var ( Name: "name1", UID: "uid1", } + + transientError = errors.New("Transient OVS error") ) func newCIDR(cidrStr string) *net.IPNet { @@ -525,6 +529,118 @@ func TestReconcilerReconcile(t *testing.T) { } } +// TestReconcileWithTransientError ensures the reconciler can reconcile a rule properly after the first attempt meets +// transient error. +// The input rule is an egress rule with named port, applying to 3 Pods and 1 IPBlock. The first 2 Pods have different +// port numbers for the named port and the 3rd Pod cannot resolve it. +// The first reconciling is supposed to fail without any openflow IDs persisted. +// The second reconciling is supposed to succeed with proper PolicyRules installed and all openflow IDs persisted. +// The third reconciling is supposed to do nothing. +func TestReconcileWithTransientError(t *testing.T) { + ifaceStore := interfacestore.NewInterfaceStore() + ifaceStore.AddInterface( + &interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName("pod1", "ns1", "container1"), + IPs: []net.IP{net.ParseIP("2.2.2.2")}, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "pod1", PodNamespace: "ns1", ContainerID: "container1"}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 1}}) + + ipNet := *newCIDR("10.10.0.0/16") + ipBlock := v1beta2.IPBlock{ + CIDR: v1beta2.IPNet{IP: v1beta2.IPAddress(ipNet.IP), PrefixLength: 16}, + } + // The 3 pods should result in 3 PolicyRules. + // The IPBlock should be in the same PolicyRule as member3 as they cannot resolve the named port. + member1 := &v1beta2.GroupMember{ + IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("1.1.1.1"))}, + Ports: []v1beta2.NamedPort{{Name: "http", Protocol: v1beta2.ProtocolTCP, Port: 80}}, + } + member2 := &v1beta2.GroupMember{ + IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("1.1.1.2"))}, + Ports: []v1beta2.NamedPort{{Name: "http", Protocol: v1beta2.ProtocolTCP, Port: 443}}, + } + member3 := &v1beta2.GroupMember{ + IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("1.1.1.3"))}, + } + + egressRule := &CompletedRule{ + rule: &rule{ + ID: "egress-rule", + Direction: v1beta2.DirectionOut, + SourceRef: &np1, + Services: []v1beta2.Service{serviceHTTP, serviceTCP8080}, + To: v1beta2.NetworkPolicyPeer{ + IPBlocks: []v1beta2.IPBlock{ipBlock}, + }, + }, + ToAddresses: v1beta2.NewGroupMemberSet(member1, member2, member3), + TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1")), + } + + controller := gomock.NewController(t) + defer controller.Finish() + mockOFClient := openflowtest.NewMockClient(controller) + mockOFClient.EXPECT().IsIPv4Enabled().Return(true).AnyTimes() + mockOFClient.EXPECT().IsIPv6Enabled().Return(true).AnyTimes() + r := newReconciler(mockOFClient, ifaceStore, testAsyncDeleteInterval) + // Set deleteInterval to verify openflow ID is released immediately. + r.idAllocator.deleteInterval = 0 + + // Make the first call fail. + mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any()).Return(transientError).Times(1) + err := r.Reconcile(egressRule) + assert.Error(t, err) + // Ensure the openflow ID is not persistent in lastRealized and is released to idAllocator upon error. + value, exists := r.lastRealizeds.Load(egressRule.ID) + assert.True(t, exists) + assert.Empty(t, value.(*lastRealized).ofIDs) + assert.Equal(t, 1, r.idAllocator.deleteQueue.Len()) + + // Make the second call success. + // The following PolicyRules are expected to be installed. + policyRules := []*types.PolicyRule{ + { + Direction: v1beta2.DirectionOut, + From: ipsToOFAddresses(sets.NewString("2.2.2.2")), + To: ipsToOFAddresses(sets.NewString("1.1.1.1")), + Service: []v1beta2.Service{serviceTCP80, serviceTCP8080}, + PolicyRef: &np1, + TableID: openflow.EgressRuleTable, + }, + { + Direction: v1beta2.DirectionOut, + From: ipsToOFAddresses(sets.NewString("2.2.2.2")), + To: ipsToOFAddresses(sets.NewString("1.1.1.2")), + Service: []v1beta2.Service{serviceTCP443, serviceTCP8080}, + PolicyRef: &np1, + TableID: openflow.EgressRuleTable, + }, + { + Direction: v1beta2.DirectionOut, + From: ipsToOFAddresses(sets.NewString("2.2.2.2")), + To: append(ipsToOFAddresses(sets.NewString("1.1.1.3")), openflow.NewIPNetAddress(ipNet)), + Service: []v1beta2.Service{serviceTCP8080}, + PolicyRef: &np1, + TableID: openflow.EgressRuleTable, + }, + } + for _, policyRule := range policyRules { + mockOFClient.EXPECT().InstallPolicyRuleFlows(newPolicyRulesMatcher(policyRule)).Return(nil).Times(1) + } + err = r.Reconcile(egressRule) + assert.NoError(t, err) + // Ensure the openflow IDs are persistent in lastRealized and are not released to idAllocator upon success. + value, exists = r.lastRealizeds.Load(egressRule.ID) + assert.True(t, exists) + assert.Len(t, value.(*lastRealized).ofIDs, 3) + // Ensure the number of released IDs doesn't change. + assert.Equal(t, 1, r.idAllocator.deleteQueue.Len()) + + // Reconciling the same rule should be idempotent. + err = r.Reconcile(egressRule) + assert.NoError(t, err) +} + func TestReconcilerBatchReconcile(t *testing.T) { ifaceStore := interfacestore.NewInterfaceStore() ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ @@ -1810,3 +1926,74 @@ func BenchmarkGroupPodsByServicesWithNamedPort(b *testing.B) { func BenchmarkGroupPodsByServicesWithoutNamedPort(b *testing.B) { benchmarkGroupMembersByServices(b, false) } + +// policyRuleMatcher implements gomock.Matcher. +// It is used to check whether the argument of the mocked method is expected. It ignores differences in slice element +// order and some fields including "Priority" and "FlowID" which are a little difficult to predict. +type policyRuleMatcher struct { + ofPolicyRule *types.PolicyRule +} + +func newPolicyRulesMatcher(ofRule *types.PolicyRule) gomock.Matcher { + return policyRuleMatcher{ofPolicyRule: ofRule} +} + +func (m policyRuleMatcher) Matches(x interface{}) bool { + b, ok := x.(*types.PolicyRule) + if !ok { + return false + } + a := m.ofPolicyRule + if !sliceEqual(a.Service, b.Service) || + !sliceEqual(a.From, b.From) || + !sliceEqual(a.To, b.To) || + a.TableID != b.TableID || + a.PolicyRef != b.PolicyRef || + a.Direction != b.Direction || + a.EnableLogging != b.EnableLogging { + return false + } + return true +} + +func sliceEqual(a, b interface{}) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + aKind := reflect.TypeOf(a).Kind() + bKind := reflect.TypeOf(b).Kind() + if aKind != reflect.Slice || bKind != reflect.Slice { + return false + } + aValue := reflect.ValueOf(a) + bValue := reflect.ValueOf(b) + if aValue.Len() != bValue.Len() { + return false + } + + visited := make([]bool, aValue.Len()) + for i := 0; i < aValue.Len(); i++ { + found := false + for j := 0; j < bValue.Len(); j++ { + if visited[j] { + continue + } + if reflect.DeepEqual(aValue.Index(i).Interface(), bValue.Index(j).Interface()) { + visited[j] = true + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func (m policyRuleMatcher) String() string { + return fmt.Sprintf("is equal to %v", m.ofPolicyRule) +}