Skip to content

Commit

Permalink
Enhance error handling of policy reconciler (#1667)
Browse files Browse the repository at this point in the history
The reconciler might fail to install flows due to transient OVS error.
It should add the IPBlocks to the PolicyRule when it retries, in which
case "update" method is called. This patch fixes it and adds an unit
test for it.
  • Loading branch information
tnqn authored Dec 21, 2020
1 parent 1cf6172 commit f86c262
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,10 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule,
memberByServicesMap, servicesMap := groupMembersByServices(newRule.Services, newRule.ToAddresses)
// Same as the process in `add`, we must ensure the group for the original services is present
// in memberByServicesMap, so that this group won't be removed and its "From" will be updated.
svcKey := normalizeServices(newRule.Services)
if _, exists := memberByServicesMap[svcKey]; !exists {
memberByServicesMap[svcKey] = v1beta2.NewGroupMemberSet()
servicesMap[svcKey] = newRule.Services
originalSvcKey := normalizeServices(newRule.Services)
if _, exists := memberByServicesMap[originalSvcKey]; !exists {
memberByServicesMap[originalSvcKey] = v1beta2.NewGroupMemberSet()
servicesMap[originalSvcKey] = newRule.Services
}
prevMembersByServicesMap, _ := groupMembersByServices(lastRealized.Services, lastRealized.ToAddresses)
for svcKey, members := range memberByServicesMap {
Expand All @@ -612,6 +612,12 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule,
PolicyRef: newRule.SourceRef,
EnableLogging: newRule.EnableLogging,
}
// If the PolicyRule for the original services doesn't exist and IPBlocks is present, it means the
// reconciler hasn't installed flows for IPBlocks, then it must be added to the new PolicyRule.
if svcKey == originalSvcKey && len(newRule.To.IPBlocks) > 0 {
to := ipBlocksToOFAddresses(newRule.To.IPBlocks, r.ipv4Enabled, r.ipv6Enabled)
ofRule.To = append(ofRule.To, to...)
}
err := r.idAllocator.allocateForRule(ofRule)
if err != nil {
return fmt.Errorf("error allocating Openflow ID")
Expand Down
187 changes: 187 additions & 0 deletions pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package networkpolicy

import (
"errors"
"fmt"
"net"
"reflect"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -86,6 +88,8 @@ var (
Name: "name1",
UID: "uid1",
}

transientError = errors.New("Transient OVS error")
)

func newCIDR(cidrStr string) *net.IPNet {
Expand Down Expand Up @@ -525,6 +529,118 @@ func TestReconcilerReconcile(t *testing.T) {
}
}

// TestReconcileWithTransientError ensures the reconciler can reconcile a rule properly after the first attempt meets
// transient error.
// The input rule is an egress rule with named port, applying to 3 Pods and 1 IPBlock. The first 2 Pods have different
// port numbers for the named port and the 3rd Pod cannot resolve it.
// The first reconciling is supposed to fail without any openflow IDs persisted.
// The second reconciling is supposed to succeed with proper PolicyRules installed and all openflow IDs persisted.
// The third reconciling is supposed to do nothing.
func TestReconcileWithTransientError(t *testing.T) {
ifaceStore := interfacestore.NewInterfaceStore()
ifaceStore.AddInterface(
&interfacestore.InterfaceConfig{
InterfaceName: util.GenerateContainerInterfaceName("pod1", "ns1", "container1"),
IPs: []net.IP{net.ParseIP("2.2.2.2")},
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: "pod1", PodNamespace: "ns1", ContainerID: "container1"},
OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 1}})

ipNet := *newCIDR("10.10.0.0/16")
ipBlock := v1beta2.IPBlock{
CIDR: v1beta2.IPNet{IP: v1beta2.IPAddress(ipNet.IP), PrefixLength: 16},
}
// The 3 pods should result in 3 PolicyRules.
// The IPBlock should be in the same PolicyRule as member3 as they cannot resolve the named port.
member1 := &v1beta2.GroupMember{
IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("1.1.1.1"))},
Ports: []v1beta2.NamedPort{{Name: "http", Protocol: v1beta2.ProtocolTCP, Port: 80}},
}
member2 := &v1beta2.GroupMember{
IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("1.1.1.2"))},
Ports: []v1beta2.NamedPort{{Name: "http", Protocol: v1beta2.ProtocolTCP, Port: 443}},
}
member3 := &v1beta2.GroupMember{
IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("1.1.1.3"))},
}

egressRule := &CompletedRule{
rule: &rule{
ID: "egress-rule",
Direction: v1beta2.DirectionOut,
SourceRef: &np1,
Services: []v1beta2.Service{serviceHTTP, serviceTCP8080},
To: v1beta2.NetworkPolicyPeer{
IPBlocks: []v1beta2.IPBlock{ipBlock},
},
},
ToAddresses: v1beta2.NewGroupMemberSet(member1, member2, member3),
TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMember("pod1", "ns1")),
}

controller := gomock.NewController(t)
defer controller.Finish()
mockOFClient := openflowtest.NewMockClient(controller)
mockOFClient.EXPECT().IsIPv4Enabled().Return(true).AnyTimes()
mockOFClient.EXPECT().IsIPv6Enabled().Return(true).AnyTimes()
r := newReconciler(mockOFClient, ifaceStore, testAsyncDeleteInterval)
// Set deleteInterval to verify openflow ID is released immediately.
r.idAllocator.deleteInterval = 0

// Make the first call fail.
mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any()).Return(transientError).Times(1)
err := r.Reconcile(egressRule)
assert.Error(t, err)
// Ensure the openflow ID is not persistent in lastRealized and is released to idAllocator upon error.
value, exists := r.lastRealizeds.Load(egressRule.ID)
assert.True(t, exists)
assert.Empty(t, value.(*lastRealized).ofIDs)
assert.Equal(t, 1, r.idAllocator.deleteQueue.Len())

// Make the second call success.
// The following PolicyRules are expected to be installed.
policyRules := []*types.PolicyRule{
{
Direction: v1beta2.DirectionOut,
From: ipsToOFAddresses(sets.NewString("2.2.2.2")),
To: ipsToOFAddresses(sets.NewString("1.1.1.1")),
Service: []v1beta2.Service{serviceTCP80, serviceTCP8080},
PolicyRef: &np1,
TableID: openflow.EgressRuleTable,
},
{
Direction: v1beta2.DirectionOut,
From: ipsToOFAddresses(sets.NewString("2.2.2.2")),
To: ipsToOFAddresses(sets.NewString("1.1.1.2")),
Service: []v1beta2.Service{serviceTCP443, serviceTCP8080},
PolicyRef: &np1,
TableID: openflow.EgressRuleTable,
},
{
Direction: v1beta2.DirectionOut,
From: ipsToOFAddresses(sets.NewString("2.2.2.2")),
To: append(ipsToOFAddresses(sets.NewString("1.1.1.3")), openflow.NewIPNetAddress(ipNet)),
Service: []v1beta2.Service{serviceTCP8080},
PolicyRef: &np1,
TableID: openflow.EgressRuleTable,
},
}
for _, policyRule := range policyRules {
mockOFClient.EXPECT().InstallPolicyRuleFlows(newPolicyRulesMatcher(policyRule)).Return(nil).Times(1)
}
err = r.Reconcile(egressRule)
assert.NoError(t, err)
// Ensure the openflow IDs are persistent in lastRealized and are not released to idAllocator upon success.
value, exists = r.lastRealizeds.Load(egressRule.ID)
assert.True(t, exists)
assert.Len(t, value.(*lastRealized).ofIDs, 3)
// Ensure the number of released IDs doesn't change.
assert.Equal(t, 1, r.idAllocator.deleteQueue.Len())

// Reconciling the same rule should be idempotent.
err = r.Reconcile(egressRule)
assert.NoError(t, err)
}

func TestReconcilerBatchReconcile(t *testing.T) {
ifaceStore := interfacestore.NewInterfaceStore()
ifaceStore.AddInterface(&interfacestore.InterfaceConfig{
Expand Down Expand Up @@ -1810,3 +1926,74 @@ func BenchmarkGroupPodsByServicesWithNamedPort(b *testing.B) {
func BenchmarkGroupPodsByServicesWithoutNamedPort(b *testing.B) {
benchmarkGroupMembersByServices(b, false)
}

// policyRuleMatcher implements gomock.Matcher.
// It is used to check whether the argument of the mocked method is expected. It ignores differences in slice element
// order and some fields including "Priority" and "FlowID" which are a little difficult to predict.
type policyRuleMatcher struct {
ofPolicyRule *types.PolicyRule
}

func newPolicyRulesMatcher(ofRule *types.PolicyRule) gomock.Matcher {
return policyRuleMatcher{ofPolicyRule: ofRule}
}

func (m policyRuleMatcher) Matches(x interface{}) bool {
b, ok := x.(*types.PolicyRule)
if !ok {
return false
}
a := m.ofPolicyRule
if !sliceEqual(a.Service, b.Service) ||
!sliceEqual(a.From, b.From) ||
!sliceEqual(a.To, b.To) ||
a.TableID != b.TableID ||
a.PolicyRef != b.PolicyRef ||
a.Direction != b.Direction ||
a.EnableLogging != b.EnableLogging {
return false
}
return true
}

func sliceEqual(a, b interface{}) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
aKind := reflect.TypeOf(a).Kind()
bKind := reflect.TypeOf(b).Kind()
if aKind != reflect.Slice || bKind != reflect.Slice {
return false
}
aValue := reflect.ValueOf(a)
bValue := reflect.ValueOf(b)
if aValue.Len() != bValue.Len() {
return false
}

visited := make([]bool, aValue.Len())
for i := 0; i < aValue.Len(); i++ {
found := false
for j := 0; j < bValue.Len(); j++ {
if visited[j] {
continue
}
if reflect.DeepEqual(aValue.Index(i).Interface(), bValue.Index(j).Interface()) {
visited[j] = true
found = true
break
}
}
if !found {
return false
}
}
return true
}

func (m policyRuleMatcher) String() string {
return fmt.Sprintf("is equal to %v", m.ofPolicyRule)
}

0 comments on commit f86c262

Please sign in to comment.