Skip to content

Commit

Permalink
Fix reject loop issue and add error handling
Browse files Browse the repository at this point in the history
Fixes #3559
Skip the reject response generation when neither src nor dst are on
current Node.

Re-write the MAC address for `RejectPodLocal` reject type no matter
AntreaIPAM is on or not. And send the packetOut directly to the
dstPod instead of L3Forwarding table.

Signed-off-by: wgrayson <wgrayson@vmware.com>
  • Loading branch information
GraysonWu committed Apr 1, 2022
1 parent ac50657 commit 7de9380
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 8 deletions.
37 changes: 29 additions & 8 deletions pkg/agent/controller/networkpolicy/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package networkpolicy

import (
"encoding/binary"
"fmt"

"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"
Expand Down Expand Up @@ -73,6 +74,9 @@ const (
// Service traffic, when AntreaProxy is disabled. The EndpointPod is on a remote
// Node and the dstPod of the reject response is on the local Node.
RejectNoAPServiceRemoteToLocal
// ErrorStatus represents that Antrea couldn't generate packetOut for current
// packetIn.
ErrorStatus
)

// rejectRequest sends reject response to the requesting client, based on the
Expand Down Expand Up @@ -133,6 +137,18 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
return dstFound && dstMAC == gwIfaces[0].MAC.String()
}
packetOutType := getRejectType(isServiceTraffic(), c.antreaProxyEnabled, srcFound, dstFound)
if packetOutType == ErrorStatus {
return fmt.Errorf("error when generating reject response for the packet from: %s to %s: neither source nor destination are on this Node", dstIP, srcIP)
}
// When in AntreaIPAM mode, even srcPod and dstPod are on the same Node, MAC will
// still be re-written in L3ForwardingTable. During rejection, the reject response
// will be directly sent to the dst OF port without go thru L3ForwardingTable. So
// we need to re-write MAC here. There is no need to check whether AntreaIPAM mode
// is on. Because if AntreaIPAM mode is off, this re-write doesn't change anything.
if packetOutType == RejectPodLocal {
srcMAC = sIface.MAC.String()
dstMAC = dIface.MAC.String()
}
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface)
mutateFunc := getRejectPacketOutMutateFunc(packetOutType)

Expand Down Expand Up @@ -195,21 +211,30 @@ func getRejectType(isServiceTraffic, antreaProxyEnabled, srcIsLocal, dstIsLocal
}
return RejectLocalToRemote
}
return RejectPodRemoteToLocal
if dstIsLocal {
return RejectPodRemoteToLocal
}
return ErrorStatus
}
if !antreaProxyEnabled {
if srcIsLocal {
return RejectNoAPServiceLocal
}
return RejectNoAPServiceRemoteToLocal
if dstIsLocal {
return RejectNoAPServiceRemoteToLocal
}
return ErrorStatus
}
if srcIsLocal {
if dstIsLocal {
return RejectServiceLocal
}
return RejectLocalToRemote
}
return RejectServiceRemoteToLocal
if dstIsLocal {
return RejectServiceRemoteToLocal
}
return ErrorStatus
}

// getRejectOFPorts returns the inPort and outPort of a packetOut based on the RejectType.
Expand All @@ -219,6 +244,7 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte
switch rejectType {
case RejectPodLocal:
inPort = uint32(sIface.OFPort)
outPort = uint32(dIface.OFPort)
case RejectServiceLocal:
inPort = uint32(sIface.OFPort)
case RejectPodRemoteToLocal:
Expand Down Expand Up @@ -247,11 +273,6 @@ func getRejectPacketOutMutateFunc(rejectType RejectType) func(binding.PacketOutB
mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder {
return packetOutBuilder.AddResubmitAction(nil, &tableID)
}
case RejectPodLocal:
// When in AntreaIPAM mode, even srcPod and dstPod are on the same Node, MAC will
// still be re-written in L3ForwardingTable. So during rejection, we also need to
// let the reject response go thru L3ForwardingTable to re-write MAC.
fallthrough
case RejectLocalToRemote:
tableID := openflow.L3ForwardingTable.GetID()
mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder {
Expand Down
168 changes: 168 additions & 0 deletions test/e2e/antreapolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,173 @@ func testRejectServiceTraffic(t *testing.T, data *TestData) {
time.Sleep(networkPolicyDelay)
}

// testRejectLoopTraffic tests that a double direction rejection won't cause an infinite rejection loop.
func testRejectLoopTraffic(t *testing.T, data *TestData) {
clientName := "agnhost-client"
require.NoError(t, data.createAgnhostPodOnNode(clientName, testNamespace, nodeName(0), false))
defer data.deletePodAndWait(defaultTimeout, clientName, testNamespace)
_, err := data.podWaitForIPs(defaultTimeout, clientName, testNamespace)
require.NoError(t, err)

_, server0IP, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "server", nodeName(0), testNamespace, false)
defer cleanupFunc()

_, server1IP, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "server", nodeName(1), testNamespace, false)
defer cleanupFunc()

var testcases []podToAddrTestStep
if clusterInfo.podV4NetworkCIDR != "" {
testcases = append(testcases, []podToAddrTestStep{
{
"antrea-test/agnhost-client",
server0IP.ipv4.String(),
80,
Rejected,
},
{
"antrea-test/agnhost-client",
server1IP.ipv4.String(),
80,
Rejected,
},
}...)
}
if clusterInfo.podV6NetworkCIDR != "" {
testcases = append(testcases, []podToAddrTestStep{
{
"antrea-test/agnhost-client",
server0IP.ipv6.String(),
80,
Rejected,
},
{
"antrea-test/agnhost-client",
server1IP.ipv6.String(),
80,
Rejected,
},
}...)
}

// Test client and server reject traffic that ingress from each other.
builder1 := &ClusterNetworkPolicySpecBuilder{}
builder1 = builder1.SetName("acnp-reject-ingress-double-dir").
SetPriority(1.0)
builder1.AddIngress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"app": "nginx"}, nil,
nil, nil, false, []ACNPAppliedToSpec{{PodSelector: map[string]string{"antrea-e2e": clientName}}}, crdv1alpha1.RuleActionReject, "", "", nil)
builder1.AddIngress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"antrea-e2e": clientName}, nil,
nil, nil, false, []ACNPAppliedToSpec{{PodSelector: map[string]string{"app": "nginx"}}}, crdv1alpha1.RuleActionReject, "", "", nil)

acnpIngress := builder1.Get()
k8sUtils.CreateOrUpdateACNP(acnpIngress)
failOnError(waitForResourceReady(acnpIngress, timeout), t)
time.Sleep(networkPolicyDelay)

for _, tc := range testcases {
log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort)
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, v1.ProtocolTCP)
if err != nil {
t.Errorf("failure -- could not complete probe: %v", err)
}
if connectivity != tc.expectedConnectivity {
t.Errorf("failure -- wrong results for probe: Source %s/%s --> Dest %s:%d connectivity: %v, expected: %v",
tc.clientPod.Namespace(), tc.clientPod.PodName(), tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
}
}
failOnError(k8sUtils.DeleteACNP(builder1.Name), t)
failOnError(waitForResourceDelete("", builder1.Name, resourceACNP, timeout), t)
time.Sleep(networkPolicyDelay)

// Test client and server reject traffic that egress to each other.
builder2 := &ClusterNetworkPolicySpecBuilder{}
builder2 = builder2.SetName("acnp-reject-egress-double-dir").
SetPriority(1.0)
builder2.AddEgress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"app": "nginx"}, nil,
nil, nil, false, []ACNPAppliedToSpec{{PodSelector: map[string]string{"antrea-e2e": clientName}}}, crdv1alpha1.RuleActionReject, "", "", nil)
builder2.AddEgress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"antrea-e2e": clientName}, nil,
nil, nil, false, []ACNPAppliedToSpec{{PodSelector: map[string]string{"app": "nginx"}}}, crdv1alpha1.RuleActionReject, "", "", nil)

acnpEgress := builder2.Get()
k8sUtils.CreateOrUpdateACNP(acnpEgress)
failOnError(waitForResourceReady(acnpEgress, timeout), t)
time.Sleep(networkPolicyDelay)

for _, tc := range testcases {
log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort)
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, v1.ProtocolTCP)
if err != nil {
t.Errorf("failure -- could not complete probe: %v", err)
}
if connectivity != tc.expectedConnectivity {
t.Errorf("failure -- wrong results for probe: Source %s/%s --> Dest %s:%d connectivity: %v, expected: %v",
tc.clientPod.Namespace(), tc.clientPod.PodName(), tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
}
}
failOnError(k8sUtils.DeleteACNP(builder2.Name), t)
failOnError(waitForResourceDelete("", builder2.Name, resourceACNP, timeout), t)
time.Sleep(networkPolicyDelay)

// Test server reject traffic that egress to client and ingress from client.
builder3 := &ClusterNetworkPolicySpecBuilder{}
builder3 = builder3.SetName("acnp-reject-server-double-dir").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"app": "nginx"}}})
builder3.AddIngress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"antrea-e2e": clientName}, nil,
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)
builder3.AddEgress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"antrea-e2e": clientName}, nil,
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)

acnpServer := builder3.Get()
k8sUtils.CreateOrUpdateACNP(acnpServer)
failOnError(waitForResourceReady(acnpServer, timeout), t)
time.Sleep(networkPolicyDelay)

for _, tc := range testcases {
log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort)
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, v1.ProtocolTCP)
if err != nil {
t.Errorf("failure -- could not complete probe: %v", err)
}
if connectivity != tc.expectedConnectivity {
t.Errorf("failure -- wrong results for probe: Source %s/%s --> Dest %s:%d connectivity: %v, expected: %v",
tc.clientPod.Namespace(), tc.clientPod.PodName(), tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
}
}
failOnError(k8sUtils.DeleteACNP(builder3.Name), t)
failOnError(waitForResourceDelete("", builder3.Name, resourceACNP, timeout), t)
time.Sleep(networkPolicyDelay)

// Test client reject traffic that egress to server and ingress from server.
builder4 := &ClusterNetworkPolicySpecBuilder{}
builder4 = builder4.SetName("acnp-reject-client-double-dir").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"antrea-e2e": clientName}}})
builder4.AddIngress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"app": "nginx"}, nil,
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)
builder4.AddEgress(v1.ProtocolTCP, nil, nil, nil, nil, map[string]string{"app": "nginx"}, nil,
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)

acnpClient := builder4.Get()
k8sUtils.CreateOrUpdateACNP(acnpClient)
failOnError(waitForResourceReady(acnpClient, timeout), t)
time.Sleep(networkPolicyDelay)

for _, tc := range testcases {
log.Tracef("Probing: %s -> %s:%d", tc.clientPod.PodName(), tc.destAddr, tc.destPort)
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, v1.ProtocolTCP)
if err != nil {
t.Errorf("failure -- could not complete probe: %v", err)
}
if connectivity != tc.expectedConnectivity {
t.Errorf("failure -- wrong results for probe: Source %s/%s --> Dest %s:%d connectivity: %v, expected: %v",
tc.clientPod.Namespace(), tc.clientPod.PodName(), tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
}
}
failOnError(k8sUtils.DeleteACNP(builder4.Name), t)
failOnError(waitForResourceDelete("", builder4.Name, resourceACNP, timeout), t)
time.Sleep(networkPolicyDelay)
}

// testANPPortRange tests the port range in a ANP can work.
func testANPPortRange(t *testing.T) {
builder := &AntreaNetworkPolicySpecBuilder{}
Expand Down Expand Up @@ -3106,6 +3273,7 @@ func TestAntreaPolicy(t *testing.T) {
t.Run("Case=ACNPRejectIngress", func(t *testing.T) { testACNPRejectIngress(t, v1.ProtocolTCP) })
t.Run("Case=ACNPRejectIngressUDP", func(t *testing.T) { testACNPRejectIngress(t, v1.ProtocolUDP) })
t.Run("Case=RejectServiceTraffic", func(t *testing.T) { testRejectServiceTraffic(t, data) })
t.Run("Case=RejectLoopTraffic", func(t *testing.T) { testRejectLoopTraffic(t, data) })
t.Run("Case=ACNPNoEffectOnOtherProtocols", func(t *testing.T) { testACNPNoEffectOnOtherProtocols(t) })
t.Run("Case=ACNPBaselinePolicy", func(t *testing.T) { testBaselineNamespaceIsolation(t) })
t.Run("Case=ACNPPriorityOverride", func(t *testing.T) { testACNPPriorityOverride(t) })
Expand Down

0 comments on commit 7de9380

Please sign in to comment.