Skip to content

Commit

Permalink
[WIP] Add excepts for egress to avoid SNAT
Browse files Browse the repository at this point in the history
Avoid SNAT for assigned IP block if the pod wants to communicate
directly, this can improve network performance

Fixes: antrea-io#2707

Signed-off-by: Yang Li yang.li@transwarp.io
  • Loading branch information
leonstack committed Sep 8, 2021
1 parent 589e1f7 commit 89d232e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 13 deletions.
8 changes: 8 additions & 0 deletions build/yamls/base/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ spec:
oneOf:
- format: ipv4
- format: ipv6
excepts:
items:
properties:
cidr:
type: string
format: cidr
type: object
type: array
externalIPPool:
type: string
status:
Expand Down
27 changes: 23 additions & 4 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -81,6 +82,8 @@ type egressState struct {
egressIP string
// The actual datapath mark of this Egress. Used to check if the mark changes since last process.
mark uint32
// The except Info for bypass SNAT flows
excepts []crdv1a2.Except
// The actual openflow ports for which we have installed SNAT rules. Used to identify stale openflow ports when
// updating or deleting an Egress.
ofPorts sets.Int32
Expand Down Expand Up @@ -464,11 +467,12 @@ func (c *EgressController) deleteEgressState(egressName string) {
delete(c.egressStates, egressName)
}

func (c *EgressController) newEgressState(egressName string, egressIP string) *egressState {
func (c *EgressController) newEgressState(egressName string, egressIP string, excepts []crdv1a2.Except) *egressState {
c.egressStatesMutex.Lock()
defer c.egressStatesMutex.Unlock()
state := &egressState{
egressIP: egressIP,
excepts: excepts,
ofPorts: sets.NewInt32(),
pods: sets.NewString(),
}
Expand Down Expand Up @@ -592,7 +596,7 @@ func (c *EgressController) syncEgress(egressName string) error {
return nil
}
if !exist {
eState = c.newEgressState(egressName, egress.Spec.EgressIP)
eState = c.newEgressState(egressName, egress.Spec.EgressIP, egress.Spec.Excepts)
}

localNodeSelected, err := c.cluster.ShouldSelectEgress(egress)
Expand All @@ -619,12 +623,13 @@ func (c *EgressController) syncEgress(egressName string) error {

// If the mark changes, uninstall all of the Egress's Pod flows first, then installs them with new mark.
// It could happen when the Egress IP is added to or removed from the Node.
if eState.mark != mark {
if eState.mark != mark || !Equal(eState.excepts, egress.Spec.Excepts) {
// Uninstall all of its Pod flows.
if err := c.uninstallPodFlows(egressName, eState, eState.ofPorts, eState.pods); err != nil {
return err
}
eState.mark = mark
eState.excepts = egress.Spec.Excepts
}

if err := c.updateEgressStatus(egress, c.localIPDetector.IsLocalIP(egress.Spec.EgressIP)); err != nil {
Expand Down Expand Up @@ -671,7 +676,7 @@ func (c *EgressController) syncEgress(egressName string) error {
staleOFPorts.Delete(ofPort)
continue
}
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil {
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark, eState.excepts); err != nil {
return err
}
eState.ofPorts.Insert(ofPort)
Expand Down Expand Up @@ -872,3 +877,17 @@ func (c *EgressController) deleteEgressGroup(group *cpv1b2.EgressGroup) {
delete(c.egressGroups, group.Name)
c.queue.Add(group.Name)
}

func Equal(a, b []crdv1a2.Except) bool {
if len(a) != len(b) {
return false
}
sort.Slice(a, func(i, j int) bool {return a[i].CIDR < a[j].CIDR})
sort.Slice(b, func(i, j int) bool {return b[i].CIDR < b[j].CIDR})
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
7 changes: 4 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/third_party/proxy"
Expand Down Expand Up @@ -176,7 +177,7 @@ type Client interface {
// tunnel destination, and the packets should be SNAT'd on the remote
// Node. As of now, a Pod can be configured to use only a single SNAT
// IP in a single address family (IPv4 or IPv6).
InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32) error
InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32, excepts []crdv1a2.Except) error

// UninstallPodSNATFlows removes the SNAT flows for the local Pod.
UninstallPodSNATFlows(ofPort uint32) error
Expand Down Expand Up @@ -806,8 +807,8 @@ func (c *client) UninstallSNATMarkFlows(mark uint32) error {
return c.deleteFlows(c.snatFlowCache, cacheKey)
}

func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32) error {
flows := []binding.Flow{c.snatRuleFlow(ofPort, snatIP, snatMark, c.nodeConfig.GatewayConfig.MAC)}
func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32, excepts []crdv1a2.Except) error {
flows := c.snatRuleFlows(ofPort, snatIP, snatMark, c.nodeConfig.GatewayConfig.MAC, excepts)
cacheKey := fmt.Sprintf("p%x", ofPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down
27 changes: 21 additions & 6 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
Expand Down Expand Up @@ -1927,26 +1928,39 @@ func (c *client) snatIPFromTunnelFlow(snatIP net.IP, mark uint32) binding.Flow {
Done()
}

// snatRuleFlow generates a flow that applies the SNAT rule for a local Pod. If
// snatRuleFlows generates flows that applies the SNAT rule for a local Pod. If
// the SNAT IP exists on the local Node, it sets the packet mark with the ID of
// the SNAT IP, for the traffic from the ofPort to external; if the SNAT IP is
// on a remote Node, it tunnels the packets to the SNAT IP.
func (c *client) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint32, localGatewayMAC net.HardwareAddr) binding.Flow {
func (c *client) snatRuleFlows(ofPort uint32, snatIP net.IP, snatMark uint32, localGatewayMAC net.HardwareAddr, excepts []crdv1a2.Except) []binding.Flow {
ipProto := getIPProtocol(snatIP)
snatTable := c.pipeline[snatTable]
l3FwdTable := c.pipeline[l3ForwardingTable]
nextTable := l3FwdTable.GetNext()
snatFlows := []binding.Flow{}
if snatMark != 0 {
// Local SNAT IP.
return snatTable.BuildFlow(priorityNormal).
return append(snatFlows, snatTable.BuildFlow(priorityNormal).
MatchProtocol(ipProto).
MatchCTStateNew(true).MatchCTStateTrk(true).
MatchInPort(ofPort).
Action().LoadPktMarkRange(snatMark, snatPktMarkRange).
Action().GotoTable(snatTable.GetNext()).
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done()
Done())
}
for _, except := range excepts {
_, excidr, _ := net.ParseCIDR(except.CIDR)
snatFlows = append(snatFlows, l3FwdTable.BuildFlow(priorityNormal).
MatchInPort(ofPort).
MatchProtocol(ipProto).
MatchDstIPNet(*excidr).
Action().GotoTable(nextTable).
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done())
}
// SNAT IP should be on a remote Node.
return snatTable.BuildFlow(priorityNormal).
snatFlows = append(snatFlows, snatTable.BuildFlow(priorityNormal).
MatchProtocol(ipProto).
MatchInPort(ofPort).
Action().SetSrcMAC(localGatewayMAC).
Expand All @@ -1955,7 +1969,8 @@ func (c *client) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint32, loc
Action().SetTunnelDst(snatIP).
Action().GotoTable(l3DecTTLTable).
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done()
Done())
return snatFlows
}

// loadBalancerServiceFromOutsideFlow generates the flow to forward LoadBalancer service traffic from outside node
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/crd/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ type EgressStatus struct {
EgressNode string `json:"egressNode"`
}

type Except struct {
CIDR string `json:"cidr"`
}

// EgressSpec defines the desired state for Egress.
type EgressSpec struct {
// AppliedTo selects Pods to which the Egress will be applied.
Expand All @@ -226,6 +230,7 @@ type EgressSpec struct {
// If it is non-empty, the EgressIP will be assigned to a Node specified by the pool automatically and will failover
// to a different Node when the Node becomes unreachable.
ExternalIPPool string `json:"externalIPPool"`
Excepts []Except `json:"excepts"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

0 comments on commit 89d232e

Please sign in to comment.