Skip to content

Commit

Permalink
Merge pull request #140 from sridhargaddam/use-vxlan-tunnels
Browse files Browse the repository at this point in the history
Use VxLAN overlay tunnels for inter-cluster traffic
  • Loading branch information
mangelajo authored Sep 16, 2019
2 parents 5306204 + dce88ec commit 4b7def5
Show file tree
Hide file tree
Showing 10 changed files with 857 additions and 211 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/vishvananda/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect
golang.org/x/oauth2 v0.0.0-20170412232759-a6bd8cefa181 // indirect
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d // indirect
google.golang.org/appengine v1.6.1 // indirect
gopkg.in/inf.v0 v0.0.0-20150911125757-3887ee99ecf0 // indirect
Expand Down
120 changes: 1 addition & 119 deletions pkg/cableengine/ipsec/ipsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package ipsec

import (
"fmt"
"net"
"reflect"
"strings"
"sync"

"github.com/coreos/go-iptables/iptables"
"github.com/submariner-io/submariner/pkg/cableengine"
"github.com/submariner-io/submariner/pkg/types"
"github.com/submariner-io/submariner/pkg/util"
Expand Down Expand Up @@ -39,62 +36,6 @@ func NewEngine(localSubnets []string, localCluster types.SubmarinerCluster, loca

func (i *engine) StartEngine() error {
klog.Infof("Starting IPSec Engine (Charon)")
ifi, err := util.GetDefaultGatewayInterface()
if err != nil {
return err
}

klog.V(8).Infof("Device of default gateway interface was %s", ifi.Name)
ipt, err := iptables.New()
if err != nil {
return fmt.Errorf("error while initializing iptables: %v", err)
}

klog.V(6).Infof("Installing/ensuring the SUBMARINER-POSTROUTING and SUBMARINER-FORWARD chains")
if err = ipt.NewChain("nat", "SUBMARINER-POSTROUTING"); err != nil {
klog.Errorf("Unable to create SUBMARINER-POSTROUTING chain in iptables: %v", err)
}

if err = ipt.NewChain("filter", "SUBMARINER-FORWARD"); err != nil {
klog.Errorf("Unable to create SUBMARINER-FORWARD chain in iptables: %v", err)
}

forwardToSubPostroutingRuleSpec := []string{"-j", "SUBMARINER-POSTROUTING"}
if err = ipt.AppendUnique("nat", "POSTROUTING", forwardToSubPostroutingRuleSpec...); err != nil {
klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubPostroutingRuleSpec, " "), err)
}

forwardToSubForwardRuleSpec := []string{"-j", "SUBMARINER-FORWARD"}
rules, err := ipt.List("filter", "FORWARD")
if err != nil {
return fmt.Errorf("error listing the rules in FORWARD chain: %v", err)
}

appendAt := len(rules) + 1
insertAt := appendAt
for i, rule := range rules {
if rule == "-A FORWARD -j SUBMARINER-FORWARD" {
insertAt = -1
break
} else if rule == "-A FORWARD -j REJECT --reject-with icmp-host-prohibited" {
insertAt = i
break
}
}

if insertAt == appendAt {
// Append the rule at the end of FORWARD Chain.
if err = ipt.Append("filter", "FORWARD", forwardToSubForwardRuleSpec...); err != nil {
klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubForwardRuleSpec, " "), err)
}
} else if insertAt > 0 {
// Insert the rule in the FORWARD Chain.
if err = ipt.Insert("filter", "FORWARD", insertAt, forwardToSubForwardRuleSpec...); err != nil {
klog.Errorf("Unable to insert iptables rule \"%s\" at position %d: %v\n", strings.Join(forwardToSubForwardRuleSpec, " "),
insertAt, err)
}
}

return i.driver.Init()
}

Expand Down Expand Up @@ -131,66 +72,7 @@ func (i *engine) InstallCable(endpoint types.SubmarinerEndpoint) error {
if err != nil {
return err
}

ifi, err := util.GetDefaultGatewayInterface()
if err != nil {
return err
}
klog.V(4).Infof("Device of default gateway interface was %s", ifi.Name)
ipt, err := iptables.New()
if err != nil {
return fmt.Errorf("error while initializing iptables: %v", err)
}

addresses, err := ifi.Addrs()
if err != nil {
return err
}

for _, addr := range addresses {
ipAddr, ipNet, err := net.ParseCIDR(addr.String())
if err != nil {
klog.Errorf("Error while parsing CIDR %s: %v", addr.String(), err)
continue
}

if ipAddr.To4() != nil {
for _, subnet := range endpoint.Spec.Subnets {
ruleSpec := []string{"-s", ipNet.String(), "-d", subnet, "-i", ifi.Name, "-j", "ACCEPT"}
klog.V(8).Infof("Installing iptables rule: %s", strings.Join(ruleSpec, " "))
if err = ipt.AppendUnique("filter", "SUBMARINER-FORWARD", ruleSpec...); err != nil {
klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}

ruleSpec = []string{"-d", ipNet.String(), "-s", subnet, "-i", ifi.Name, "-j", "ACCEPT"}
klog.V(8).Infof("Installing iptables rule: %v", ruleSpec)
if err = ipt.AppendUnique("filter", "SUBMARINER-FORWARD", ruleSpec...); err != nil {
klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}

// -t nat -I POSTROUTING -s <local-network-cidr> -d <remote-cidr> -j SNAT --to-source <this-local-ip>
ruleSpec = []string{"-s", ipNet.String(), "-d", subnet, "-j", "SNAT", "--to-source", ipAddr.String()}
klog.V(8).Infof("Installing iptables rule: %s", strings.Join(ruleSpec, " "))
if err = ipt.AppendUnique("nat", "SUBMARINER-POSTROUTING", ruleSpec...); err != nil {
klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}
}
} else {
klog.V(6).Infof("Skipping adding rule because IPv6 network %s found", ipNet.String())
}
}

// MASQUERADE (on the GatewayNode) the incoming traffic from the remote cluster (i.e, remoteEndpointIP)
// and destined to the local PODs (i.e., localSubnet) scheduled on the non-gateway node.
// This will make the return traffic from the POD to go via the GatewayNode.
for _, localSubnet := range i.localSubnets {
ruleSpec := []string{"-s", remoteEndpointIP, "-d", localSubnet, "-j", "MASQUERADE"}
klog.V(8).Infof("Installing iptables rule for MASQ incoming traffic: %v", ruleSpec)
if err = ipt.AppendUnique("nat", "SUBMARINER-POSTROUTING", ruleSpec...); err != nil {
klog.Errorf("error appending iptables MASQ rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}
}

klog.V(4).Infof("Connected to remoteEndpointIP %s", remoteEndpointIP)
return nil
}

Expand Down
144 changes: 144 additions & 0 deletions pkg/routeagent/controllers/route/iptables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package route

import (
"fmt"
"strconv"
"strings"

"github.com/coreos/go-iptables/iptables"
"k8s.io/klog"
)

func (r *Controller) createIPTableChains() error {
ipt, err := iptables.New()
if err != nil {
return fmt.Errorf("error initializing iptables: %v", err)
}

klog.V(4).Infof("Install/ensure %s chain exists", SmPostRoutingChain)
if err = r.createChainIfNotExists(ipt, "nat", SmPostRoutingChain); err != nil {
return fmt.Errorf("Unable to create %s chain in iptables: %v", SmPostRoutingChain, err)
}

klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SmPostRoutingChain)
forwardToSubPostroutingRuleSpec := []string{"-j", SmPostRoutingChain}
if err = r.prependUnique(ipt, "nat", "POSTROUTING", forwardToSubPostroutingRuleSpec); err != nil {
klog.Errorf("Unable to insert iptable rule in NAT table, POSTROUTING chain: %v", err)
}

klog.V(4).Infof("Install/ensure SUBMARINER-INPUT chain exists")
if err = r.createChainIfNotExists(ipt, "filter", "SUBMARINER-INPUT"); err != nil {
return fmt.Errorf("Unable to create SUBMARINER-INPUT chain in iptables: %v", err)
}

forwardToSubInputRuleSpec := []string{"-p", "udp", "-m", "udp", "-j", "SUBMARINER-INPUT"}
if err = ipt.AppendUnique("filter", "INPUT", forwardToSubInputRuleSpec...); err != nil {
klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubInputRuleSpec, " "), err)
}

klog.V(4).Infof("Allow VxLAN incoming traffic in SUBMARINER-INPUT Chain")
ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", strconv.Itoa(VxLANPort), "-j", "ACCEPT"}
if err = ipt.AppendUnique("filter", "SUBMARINER-INPUT", ruleSpec...); err != nil {
klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}

klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface)
ruleSpec = []string{"-o", VxLANIface, "-j", "ACCEPT"}
if err = r.prependUnique(ipt, "filter", "FORWARD", ruleSpec); err != nil {
klog.Errorf("Unable to insert iptable rule in filter table to allow vxlan traffic: %v", err)
}

return nil
}

func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock string) error {
ipt, err := iptables.New()
if err != nil {
return fmt.Errorf("error initializing iptables: %v", err)
}

for _, localClusterCidr := range r.localClusterCidr {
ruleSpec := []string{"-s", localClusterCidr, "-d", remoteCidrBlock, "-j", "ACCEPT"}
klog.V(4).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(ruleSpec, " "))
if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil {
return fmt.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}

// TODO: revisit, we only have to program rules to allow traffic from the podCidr
ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"}
klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " "))
if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil {
return fmt.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}
}
return nil
}

func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain string, ruleSpec []string) error {
rules, err := ipt.List(table, chain)
if err != nil {
return fmt.Errorf("error listing the rules in %s chain: %v", chain, err)
}

// Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain
// so that we can preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes
// to the traffic.
// In this API, we check if the required iptable rule is present at the beginning of the chain.
// If the rule is already present and there are no stale[1] flows, we simply return. If not, we create one.
// [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some
// new rules ahead of the rule that we programmed. In such cases, the rule that we programmed will
// not be the first rule to hit and Submariner behavior might get affected. So, we query the rules
// in the chain to see if the rule slipped its position, and if so, delete all such occurrences.
// We then re-program a new rule at the beginning of the chain as required.

isPresentAtRequiredPosition := false
numOccurrences := 0
for index, rule := range rules {
if strings.Contains(rule, strings.Join(ruleSpec, " ")) {
klog.V(4).Infof("In %s table, iptables rule \"%s\", exists at index %d.", table, strings.Join(ruleSpec, " "), index)
numOccurrences++

if index == 1 {
isPresentAtRequiredPosition = true
}
}
}

// The required rule is present in the Chain, but either there are multiple occurrences or its
// not at the desired location
if numOccurrences > 1 || !isPresentAtRequiredPosition {
for i := 0; i < numOccurrences; i++ {
if err = ipt.Delete(table, chain, ruleSpec...); err != nil {
return fmt.Errorf("error deleting stale iptable rule \"%s\": %v", strings.Join(ruleSpec, " "), err)
}
}
}

// The required rule is present only once and is at the desired location
if numOccurrences == 1 && isPresentAtRequiredPosition {
klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " "))
return nil
} else {
if err = ipt.Insert(table, chain, 1, ruleSpec...); err != nil {
return err
}
}

return nil
}

func (r *Controller) createChainIfNotExists(ipt *iptables.IPTables, table, chain string) error {
existingChains, err := ipt.ListChains(table)
if err != nil {
return err
}

for _, val := range existingChains {
if val == chain {
// Chain already exists
return nil
}
}

return ipt.NewChain(table, chain)
}
Loading

0 comments on commit 4b7def5

Please sign in to comment.