Skip to content

Commit

Permalink
Use VxLAN overlay tunnels for inter-cluster traffic
Browse files Browse the repository at this point in the history
As part of supporting Network policies and for ease of debugging, this
patch implements the following.

1. Creates VxLAN tunnels in the local Cluster between the worker nodes and
   the Cluster Gateway Node.
2. Programms the necessary iptable rules on the Cluster nodes to allow
   inter-cluster traffic.
3. This patch also avoids SNAT/MASQ for inter-cluster traffic, thereby
   preserving the original source ip of the POD all the way until the
   destination POD.
4. Programs the routing rules on the workerNodes to forward the remoteCluster
   traffic over the VxLAN interface that is created between the worker node
   and Cluster GatewayNode.

This patch depends on the following other patches

Depends-On: submariner-io#135
Depends-On: submariner-io/submariner-charts#3
Depends-On: submariner-io/submariner-charts#4
  • Loading branch information
sridhargaddam committed Sep 3, 2019
1 parent 2661f34 commit 29ff1ae
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 110 deletions.
23 changes: 12 additions & 11 deletions pkg/routeagent/controllers/route/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package route

import (
"fmt"
"strconv"
"strings"

"github.com/coreos/go-iptables/iptables"
Expand All @@ -15,13 +16,13 @@ func (r *Controller) createIPTableChains() error {
return err
}

klog.V(4).Infof("Install/ensure %s chain exists", SM_POSTROUTING_CHAIN)
if err = ipt.NewChain("nat", SM_POSTROUTING_CHAIN); err != nil {
klog.Errorf("Unable to create %s chain in iptables: %v", SM_POSTROUTING_CHAIN, err)
klog.V(4).Infof("Install/ensure %s chain exists", SmPostRoutingChain)
if err = ipt.NewChain("nat", SmPostRoutingChain); err != nil {
klog.Errorf("Unable to create %s chain in iptables: %v", SmPostRoutingChain, err)
}

klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SM_POSTROUTING_CHAIN)
forwardToSubPostroutingRuleSpec := []string{"-j", SM_POSTROUTING_CHAIN}
klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SmPostRoutingChain)
forwardToSubPostroutingRuleSpec := []string{"-j", SmPostRoutingChain}
if err = r.insertUnique(ipt, "nat", "POSTROUTING", 1, forwardToSubPostroutingRuleSpec); err != nil {
klog.Errorf("Unable to insert iptable rule in NAT table, POSTROUTING chain: %v", err)
}
Expand All @@ -37,13 +38,13 @@ func (r *Controller) createIPTableChains() error {
}

klog.V(4).Infof("Allow VxLAN incoming traffic in SUBMARINER-INPUT Chain")
ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", VXLAN_PORT, "-j", "ACCEPT"}
ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", strconv.Itoa(VxLANPort), "-j", "ACCEPT"}
if err = ipt.AppendUnique("filter", "SUBMARINER-INPUT", ruleSpec...); err != nil {
klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}

klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VXLAN_IFACE)
ruleSpec = []string{"-o", VXLAN_IFACE, "-j", "ACCEPT"}
klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface)
ruleSpec = []string{"-o", VxLANIface, "-j", "ACCEPT"}
if err = r.insertUnique(ipt, "filter", "FORWARD", 1, ruleSpec); err != nil {
klog.Errorf("Unable to insert iptable rule in filter table to allow vxlan traffic: %v", err)
}
Expand All @@ -60,14 +61,14 @@ func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock s
for _, localClusterCidr := range r.localClusterCidr {
ruleSpec := []string{"-s", localClusterCidr, "-d", remoteCidrBlock, "-j", "ACCEPT"}
klog.V(4).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(ruleSpec, " "))
if err = ipt.AppendUnique("nat", SM_POSTROUTING_CHAIN, ruleSpec...); err != nil {
if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil {
klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}

// Todo: revisit we only have to program to the PODCidr
// Todo: revisit, we only have to program rules to allow traffic from the podCidr
ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"}
klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " "))
if err = ipt.AppendUnique("nat", SM_POSTROUTING_CHAIN, ruleSpec...); err != nil {
if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil {
klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err)
}
}
Expand Down
126 changes: 55 additions & 71 deletions pkg/routeagent/controllers/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,23 @@ type Controller struct {
gatewayNodeIP net.IP
localClusterCidr []string
localServiceCidr []string
remoteSubnets []string
remoteSubnets *StringSet

vxlanDevice *vxLanIface
vxlanGw net.IP
remoteVTEPs []string
remoteVTEPs *StringSet

isGatewayNode bool
link *net.Interface
}

const VXLAN_IFACE = "vxlan100"
const VXLAN_PORT = "4800"
const VXLAN_VTEP_NETWORK_PREFIX = "240"
const SM_POSTROUTING_CHAIN = "SUBMARINER-POSTROUTING"
const SM_ROUTE_AGENT_FILTER = "app=submariner-routeagent"
const (
VxLANIface = "vxlan100"
VxLANPort = 4800
VxLANVTepNetworkPrefix = 240
SmPostRoutingChain = "SUBMARINER-POSTROUTING"
SmRouteAgentFilter = "app=submariner-routeagent"
)

func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, objectNamespace string, link *net.Interface, submarinerClientSet clientset.Interface, clientSet *kubernetes.Clientset, clusterInformer informers.ClusterInformer, endpointInformer informers.EndpointInformer, podInformer podinformer.PodInformer) *Controller {
controller := Controller{
Expand All @@ -70,6 +72,8 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string,
clientSet: clientSet,
link: link,
isGatewayNode: false,
remoteSubnets: NewStringSet(),
remoteVTEPs: NewStringSet(),
clustersSynced: clusterInformer.Informer().HasSynced,
endpointsSynced: endpointInformer.Informer().HasSynced,
smRouteAgentPodsSynced: podInformer.Informer().HasSynced,
Expand Down Expand Up @@ -139,7 +143,7 @@ func (r *Controller) Run(stopCh <-chan struct{}) error {
}

// Query all the submariner-route-agent daemonSet PODs running in the local cluster.
podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SM_ROUTE_AGENT_FILTER})
podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SmRouteAgentFilter})
if err != nil {
klog.Fatalf("error while retrieving all submariner-route-agent pods: %v", err)
}
Expand Down Expand Up @@ -178,35 +182,20 @@ func (r *Controller) runPodWorker() {
}

func (r *Controller) updateIptableRulesForInterclusterTraffic(inputCidrBlocks []string) {
for _, remoteCidrBlock := range inputCidrBlocks {
if !containsString(r.remoteSubnets, remoteCidrBlock) {
r.remoteSubnets = append(r.remoteSubnets, remoteCidrBlock)
r.programIptableRulesForInterClusterTraffic(remoteCidrBlock)
for _, inputCidrBlock := range inputCidrBlocks {
if !r.remoteSubnets.Contains(inputCidrBlock) {
r.remoteSubnets.Add(inputCidrBlock)
r.programIptableRulesForInterClusterTraffic(inputCidrBlock)
}
}
}

func (r *Controller) populateRemoteVtepIps(vtepIP string) {
if !containsString(r.remoteVTEPs, vtepIP) {
r.remoteVTEPs = append(r.remoteVTEPs, vtepIP)
if !r.remoteVTEPs.Contains(vtepIP) {
r.remoteVTEPs.Add(vtepIP)
}
}

func (r *Controller) deleteRemoteVtepIp(vtepIP string) {
if containsString(r.remoteVTEPs, vtepIP) {
r.remoteVTEPs = r.deleteVepEntry(r.remoteVTEPs, vtepIP)
}
}

func (r *Controller) deleteVepEntry(vtepList []string, entryToDelete string) []string {
for i, v := range vtepList {
if v == entryToDelete {
return append(vtepList[:i], vtepList[i+1:]...)
}
}
return vtepList
}

func (r *Controller) processNextCluster() bool {
obj, shutdown := r.clusterWorkqueue.Get()
if shutdown {
Expand Down Expand Up @@ -246,17 +235,15 @@ func (r *Controller) processNextCluster() bool {
return true
}

func (r *Controller) getVxlanVtepIPAddress() (net.IP, *net.IPNet, error) {
ipAddr, ipNetwork, err := r.getHostIfaceIPAddress()
if err != nil {
klog.Errorf("Unable to retrieve the IPv4 address on the Host %v", err)
return nil, nil, err
func (r *Controller) getVxlanVtepIPAddress(ipAddr string) (net.IP, error) {
ipSlice := strings.Split(ipAddr, ".")
if len(ipSlice) < 4 {
return nil, fmt.Errorf("Invalid ipAddr [%s]", ipAddr)
}

ipSlice := strings.Split(ipAddr.String(), ".")
ipSlice[0] = VXLAN_VTEP_NETWORK_PREFIX
ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix)
vxlanIP := net.ParseIP(strings.Join(ipSlice, "."))
return vxlanIP, ipNetwork, nil
return vxlanIP, nil
}

func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) {
Expand All @@ -276,19 +263,25 @@ func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) {
}

func (r *Controller) createVxLANInterface(isGatewayDevice bool) error {
vtepIP, vtepMask, err := r.getVxlanVtepIPAddress()
ipAddr, vtepMask, err := r.getHostIfaceIPAddress()
if err != nil {
klog.Errorf("Unable to retrieve the IPv4 address on the Host %v", err)
return err
}

vtepIP, err := r.getVxlanVtepIPAddress(ipAddr.String())
if err != nil {
klog.Fatalf("Failed to derive the vxlan vtepIP on the Gateway Node %v", err)
klog.Errorf("Failed to derive the vxlan vtepIP %v", err)
return err
}

if isGatewayDevice {
vtepPort, _ := strconv.Atoi(VXLAN_PORT)
attrs := &vxLanAttributes{
name: VXLAN_IFACE,
name: VxLANIface,
vxlanId: 100,
group: nil,
srcAddr: nil,
vtepPort: vtepPort,
vtepPort: VxLANPort,
mtu: 1450,
}

Expand All @@ -297,30 +290,30 @@ func (r *Controller) createVxLANInterface(isGatewayDevice bool) error {
klog.Fatalf("Failed to create vxlan interface on Gateway Node: %v", err)
}

for _, fdbAddress := range r.remoteVTEPs {
for fdbAddress, _ := range r.remoteVTEPs.set {
err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00")
if err != nil {
klog.Fatalf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err)
}
}

// Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface.
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VXLAN_IFACE+"/rp_filter", []byte("2"), 0644)
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VxLANIface+"/rp_filter", []byte("2"), 0644)
if err != nil {
klog.Errorf("Unable to update proc entry, err: %s", err)
return err
} else {
klog.Errorf("Successfully updated proc entry ")
}

} else {
// non-Gateway/Worker Node
vtepPort, _ := strconv.Atoi(VXLAN_PORT)
attrs := &vxLanAttributes{
name: VXLAN_IFACE,
name: VxLANIface,
vxlanId: 100,
group: r.gatewayNodeIP,
srcAddr: vtepIP,
vtepPort: vtepPort,
vtepPort: VxLANPort,
mtu: 1450,
}

Expand Down Expand Up @@ -350,7 +343,7 @@ func (r *Controller) processNextPod() bool {
klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP)
r.populateRemoteVtepIps(pod.Status.PodIP)

// A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster
// A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster.
// On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node.
if r.isGatewayNode {
ret := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00")
Expand Down Expand Up @@ -406,7 +399,7 @@ func (r *Controller) processNextEndpoint() bool {
r.gatewayNodeIP = net.ParseIP(endpoint.Spec.PrivateIP)

ipSlice := strings.Split(r.gatewayNodeIP.String(), ".")
ipSlice[0] = VXLAN_VTEP_NETWORK_PREFIX
ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix)
// remoteVtepIP is used while programming the routing rules
remoteVtepIP := net.ParseIP(strings.Join(ipSlice, "."))
r.vxlanGw = remoteVtepIP
Expand Down Expand Up @@ -471,7 +464,7 @@ func (r *Controller) enqueuePod(obj interface{}) {
pod := obj.(*k8sv1.Pod)

// Add the POD event to the workqueue only if the sm-route-agent podIP does not exist in the local cache.
if !containsString(r.remoteVTEPs, pod.Status.HostIP) {
if !r.remoteVTEPs.Contains(pod.Status.HostIP) {
r.podWorkqueue.AddRateLimited(obj)
}
}
Expand Down Expand Up @@ -513,8 +506,8 @@ func (r *Controller) handleRemovedPod(obj interface{}) {
klog.V(6).Infof("Removing podIP in route controller %v", obj)
pod := obj.(*k8sv1.Pod)

if containsString(r.remoteVTEPs, pod.Status.HostIP) {
r.deleteRemoteVtepIp(pod.Status.HostIP)
if r.remoteVTEPs.Contains(pod.Status.HostIP) {
r.remoteVTEPs.Delete(pod.Status.HostIP)
if r.isGatewayNode {
ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00")
if ret != nil {
Expand All @@ -525,22 +518,22 @@ func (r *Controller) handleRemovedPod(obj interface{}) {
}

func (r *Controller) cleanRoutes() {
link, err := netlink.LinkByName(VXLAN_IFACE)
link, err := netlink.LinkByName(VxLANIface)
if err != nil {
klog.Errorf("Error retrieving link by name %s: %v", VXLAN_IFACE, err)
klog.Errorf("Error retrieving link by name %s: %v", VxLANIface, err)
return
}
currentRouteList, err := netlink.RouteList(link, syscall.AF_INET)
if err != nil {
klog.Errorf("Error retrieving routes on the link %s: %v", VXLAN_IFACE, err)
klog.Errorf("Error retrieving routes on the link %s: %v", VxLANIface, err)
return
}
for _, route := range currentRouteList {
klog.V(6).Infof("Processing route %v", route)
if route.Dst == nil || route.Gw == nil {
klog.V(6).Infof("Found nil gw or dst")
} else {
if containsString(r.remoteSubnets, route.Dst.String()) {
if r.remoteSubnets.Contains(route.Dst.String()) {
klog.V(6).Infof("Removing route %s", route.String())
if err = netlink.RouteDel(&route); err != nil {
klog.Errorf("Error removing route %s: %v", route.String(), err)
Expand Down Expand Up @@ -569,15 +562,15 @@ func (r *Controller) cleanXfrmPolicies() {

// Reconcile the routes installed on this device using rtnetlink
func (r *Controller) reconcileRoutes() error {
link, err := netlink.LinkByName(VXLAN_IFACE)
link, err := netlink.LinkByName(VxLANIface)
if err != nil {
return fmt.Errorf("Error retrieving link by name %s: %v", VXLAN_IFACE, err)
return fmt.Errorf("Error retrieving link by name %s: %v", VxLANIface, err)
}

currentRouteList, err := netlink.RouteList(link, syscall.AF_INET)

if err != nil {
return fmt.Errorf("Error retrieving routes for link %s: %v", VXLAN_IFACE, err)
return fmt.Errorf("Error retrieving routes for link %s: %v", VxLANIface, err)
}

// First lets delete all of the routes that don't match
Expand All @@ -587,7 +580,7 @@ func (r *Controller) reconcileRoutes() error {
if route.Dst == nil || route.Gw == nil {
klog.V(6).Infof("Found nil gw or dst")
} else {
if containsString(r.remoteSubnets, route.Dst.String()) && route.Gw.Equal(r.vxlanGw) {
if r.remoteSubnets.Contains(route.Dst.String()) && route.Gw.Equal(r.vxlanGw) {
klog.V(6).Infof("Found route %s with gw %s already installed", route.String(), route.Gw.String())
} else {
klog.V(6).Infof("Removing route %s", route.String())
Expand All @@ -601,11 +594,11 @@ func (r *Controller) reconcileRoutes() error {
currentRouteList, err = netlink.RouteList(link, syscall.AF_INET)

if err != nil {
return fmt.Errorf("Error retrieving routes for link %s: %v", VXLAN_IFACE, err)
return fmt.Errorf("Error retrieving routes for link %s: %v", VxLANIface, err)
}

// let's now add the routes that are missing
for _, cidrBlock := range r.remoteSubnets {
for cidrBlock, _ := range r.remoteSubnets.set {
_, dst, err := net.ParseCIDR(cidrBlock)
if err != nil {
klog.Errorf("Error parsing cidr block %s: %v", cidrBlock, err)
Expand Down Expand Up @@ -639,12 +632,3 @@ func (r *Controller) reconcileRoutes() error {
}
return nil
}

func containsString(c []string, s string) bool {
for _, v := range c {
if v == s {
return true
}
}
return false
}
Loading

0 comments on commit 29ff1ae

Please sign in to comment.