From 79a29bfbc5bd214f9c660e98dfa6cd6b7062784a Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Mon, 2 Sep 2019 20:23:58 +0530 Subject: [PATCH 1/9] Enhance Submariner to use VxLAN Overlay Tunnels for inter-cluster traffic As part of supporting Network policies and for ease of debugging, this patch implements the following. 1. Creates VxLAN tunnels in the local Cluster between the worker nodes and the Cluster Gateway Node. 2. Programms the necessary iptable rules on the Cluster nodes to allow inter-cluster traffic. 3. This patch also avoids SNAT/MASQ for inter-cluster traffic, thereby preserving the original source ip of the POD all the way until the destination POD. 4. Programs the routing rules on the workerNodes to forward the remoteCluster traffic over the VxLAN interface that is created between the worker node and Cluster GatewayNode. This patch depends on the following other patches Depends-On: https://github.com/submariner-io/submariner/pull/135 Depends-On: https://github.com/submariner-io/submariner-charts/pull/3 Depends-On: https://github.com/submariner-io/submariner-charts/pull/4 --- go.mod | 1 + pkg/cableengine/ipsec/ipsec.go | 120 +------ pkg/routeagent/controllers/route/iptables.go | 91 +++++ pkg/routeagent/controllers/route/route.go | 334 +++++++++++++++--- .../controllers/route/route_test.go | 12 +- pkg/routeagent/controllers/route/vxlan.go | 178 ++++++++++ pkg/routeagent/main.go | 25 +- test/e2e/framework/network_pods.go | 2 +- 8 files changed, 590 insertions(+), 173 deletions(-) create mode 100644 pkg/routeagent/controllers/route/iptables.go create mode 100644 pkg/routeagent/controllers/route/vxlan.go diff --git a/go.mod b/go.mod index da02f1549..16d7f8842 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/vishvananda/netlink v1.0.0 github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect golang.org/x/oauth2 v0.0.0-20170412232759-a6bd8cefa181 // indirect + golang.org/x/sys v0.0.0-20190606165138-5da285871e9c golang.org/x/time v0.0.0-20161028155119-f51c12702a4d // indirect google.golang.org/appengine v1.6.1 // indirect gopkg.in/inf.v0 v0.0.0-20150911125757-3887ee99ecf0 // indirect diff --git a/pkg/cableengine/ipsec/ipsec.go b/pkg/cableengine/ipsec/ipsec.go index 248c29eff..6e88be251 100644 --- a/pkg/cableengine/ipsec/ipsec.go +++ b/pkg/cableengine/ipsec/ipsec.go @@ -2,12 +2,9 @@ package ipsec import ( "fmt" - "net" "reflect" - "strings" "sync" - "github.com/coreos/go-iptables/iptables" "github.com/submariner-io/submariner/pkg/cableengine" "github.com/submariner-io/submariner/pkg/types" "github.com/submariner-io/submariner/pkg/util" @@ -39,62 +36,6 @@ func NewEngine(localSubnets []string, localCluster types.SubmarinerCluster, loca func (i *engine) StartEngine() error { klog.Infof("Starting IPSec Engine (Charon)") - ifi, err := util.GetDefaultGatewayInterface() - if err != nil { - return err - } - - klog.V(8).Infof("Device of default gateway interface was %s", ifi.Name) - ipt, err := iptables.New() - if err != nil { - return fmt.Errorf("error while initializing iptables: %v", err) - } - - klog.V(6).Infof("Installing/ensuring the SUBMARINER-POSTROUTING and SUBMARINER-FORWARD chains") - if err = ipt.NewChain("nat", "SUBMARINER-POSTROUTING"); err != nil { - klog.Errorf("Unable to create SUBMARINER-POSTROUTING chain in iptables: %v", err) - } - - if err = ipt.NewChain("filter", "SUBMARINER-FORWARD"); err != nil { - klog.Errorf("Unable to create SUBMARINER-FORWARD chain in iptables: %v", err) - } - - forwardToSubPostroutingRuleSpec := []string{"-j", "SUBMARINER-POSTROUTING"} - if err = ipt.AppendUnique("nat", "POSTROUTING", forwardToSubPostroutingRuleSpec...); err != nil { - klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubPostroutingRuleSpec, " "), err) - } - - forwardToSubForwardRuleSpec := []string{"-j", "SUBMARINER-FORWARD"} - rules, err := ipt.List("filter", "FORWARD") - if err != nil { - return fmt.Errorf("error listing the rules in FORWARD chain: %v", err) - } - - appendAt := len(rules) + 1 - insertAt := appendAt - for i, rule := range rules { - if rule == "-A FORWARD -j SUBMARINER-FORWARD" { - insertAt = -1 - break - } else if rule == "-A FORWARD -j REJECT --reject-with icmp-host-prohibited" { - insertAt = i - break - } - } - - if insertAt == appendAt { - // Append the rule at the end of FORWARD Chain. - if err = ipt.Append("filter", "FORWARD", forwardToSubForwardRuleSpec...); err != nil { - klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubForwardRuleSpec, " "), err) - } - } else if insertAt > 0 { - // Insert the rule in the FORWARD Chain. - if err = ipt.Insert("filter", "FORWARD", insertAt, forwardToSubForwardRuleSpec...); err != nil { - klog.Errorf("Unable to insert iptables rule \"%s\" at position %d: %v\n", strings.Join(forwardToSubForwardRuleSpec, " "), - insertAt, err) - } - } - return i.driver.Init() } @@ -131,66 +72,7 @@ func (i *engine) InstallCable(endpoint types.SubmarinerEndpoint) error { if err != nil { return err } - - ifi, err := util.GetDefaultGatewayInterface() - if err != nil { - return err - } - klog.V(4).Infof("Device of default gateway interface was %s", ifi.Name) - ipt, err := iptables.New() - if err != nil { - return fmt.Errorf("error while initializing iptables: %v", err) - } - - addresses, err := ifi.Addrs() - if err != nil { - return err - } - - for _, addr := range addresses { - ipAddr, ipNet, err := net.ParseCIDR(addr.String()) - if err != nil { - klog.Errorf("Error while parsing CIDR %s: %v", addr.String(), err) - continue - } - - if ipAddr.To4() != nil { - for _, subnet := range endpoint.Spec.Subnets { - ruleSpec := []string{"-s", ipNet.String(), "-d", subnet, "-i", ifi.Name, "-j", "ACCEPT"} - klog.V(8).Infof("Installing iptables rule: %s", strings.Join(ruleSpec, " ")) - if err = ipt.AppendUnique("filter", "SUBMARINER-FORWARD", ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - - ruleSpec = []string{"-d", ipNet.String(), "-s", subnet, "-i", ifi.Name, "-j", "ACCEPT"} - klog.V(8).Infof("Installing iptables rule: %v", ruleSpec) - if err = ipt.AppendUnique("filter", "SUBMARINER-FORWARD", ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - - // -t nat -I POSTROUTING -s -d -j SNAT --to-source - ruleSpec = []string{"-s", ipNet.String(), "-d", subnet, "-j", "SNAT", "--to-source", ipAddr.String()} - klog.V(8).Infof("Installing iptables rule: %s", strings.Join(ruleSpec, " ")) - if err = ipt.AppendUnique("nat", "SUBMARINER-POSTROUTING", ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - } - } else { - klog.V(6).Infof("Skipping adding rule because IPv6 network %s found", ipNet.String()) - } - } - - // MASQUERADE (on the GatewayNode) the incoming traffic from the remote cluster (i.e, remoteEndpointIP) - // and destined to the local PODs (i.e., localSubnet) scheduled on the non-gateway node. - // This will make the return traffic from the POD to go via the GatewayNode. - for _, localSubnet := range i.localSubnets { - ruleSpec := []string{"-s", remoteEndpointIP, "-d", localSubnet, "-j", "MASQUERADE"} - klog.V(8).Infof("Installing iptables rule for MASQ incoming traffic: %v", ruleSpec) - if err = ipt.AppendUnique("nat", "SUBMARINER-POSTROUTING", ruleSpec...); err != nil { - klog.Errorf("error appending iptables MASQ rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - } - + klog.V(4).Infof("Connected to remoteEndpointIP %s", remoteEndpointIP) return nil } diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go new file mode 100644 index 000000000..9255cf906 --- /dev/null +++ b/pkg/routeagent/controllers/route/iptables.go @@ -0,0 +1,91 @@ +package route + +import ( + "fmt" + "strings" + + "github.com/coreos/go-iptables/iptables" + "k8s.io/klog" +) + +func (r *Controller) createIPTableChains() error { + ipt, err := iptables.New() + if err != nil { + klog.Errorf("Error while initializing iptables: %v", err) + return err + } + + klog.V(4).Infof("Install/ensure %s chain exists", SM_POSTROUTING_CHAIN) + if err = ipt.NewChain("nat", SM_POSTROUTING_CHAIN); err != nil { + klog.Errorf("Unable to create %s chain in iptables: %v", SM_POSTROUTING_CHAIN, err) + } + + klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SM_POSTROUTING_CHAIN) + forwardToSubPostroutingRuleSpec := []string{"-j", SM_POSTROUTING_CHAIN} + if err = r.insertUnique(ipt, "nat", "POSTROUTING", 1, forwardToSubPostroutingRuleSpec); err != nil { + klog.Errorf("Unable to insert iptable rule in NAT table, POSTROUTING chain: %v", err) + } + + klog.V(4).Infof("Install/ensure SUBMARINER-INPUT chain exists") + if err = ipt.NewChain("filter", "SUBMARINER-INPUT"); err != nil { + klog.Errorf("Unable to create SUBMARINER-INPUT chain in iptables: %v", err) + } + + forwardToSubInputRuleSpec := []string{"-p", "udp", "-m", "udp", "-j", "SUBMARINER-INPUT"} + if err = ipt.AppendUnique("filter", "INPUT", forwardToSubInputRuleSpec...); err != nil { + klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubInputRuleSpec, " "), err) + } + + klog.V(4).Infof("Allow VxLAN incoming traffic in SUBMARINER-INPUT Chain") + ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", VXLAN_PORT, "-j", "ACCEPT"} + if err = ipt.AppendUnique("filter", "SUBMARINER-INPUT", ruleSpec...); err != nil { + klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + } + + klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VXLAN_IFACE) + ruleSpec = []string{"-o", VXLAN_IFACE, "-j", "ACCEPT"} + if err = r.insertUnique(ipt, "filter", "FORWARD", 1, ruleSpec); err != nil { + klog.Errorf("Unable to insert iptable rule in filter table to allow vxlan traffic: %v", err) + } + + return nil +} + +func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock string) { + ipt, err := iptables.New() + if err != nil { + klog.Errorf("error while initializing iptables: %v", err) + } + + for _, localClusterCidr := range r.localClusterCidr { + ruleSpec := []string{"-s", localClusterCidr, "-d", remoteCidrBlock, "-j", "ACCEPT"} + klog.V(4).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(ruleSpec, " ")) + if err = ipt.AppendUnique("nat", SM_POSTROUTING_CHAIN, ruleSpec...); err != nil { + klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + } + + // Todo: revisit we only have to program to the PODCidr + ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"} + klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " ")) + if err = ipt.AppendUnique("nat", SM_POSTROUTING_CHAIN, ruleSpec...); err != nil { + klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + } + } +} + +func (r *Controller) insertUnique(ipt *iptables.IPTables, table string, chain string, position int, ruleSpec []string) error { + rules, err := ipt.List(table, chain) + if err != nil { + return fmt.Errorf("error listing the rules in %s chain: %v", chain, err) + } + + if strings.Contains(rules[position], strings.Join(ruleSpec, " ")) { + klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " ")) + return nil + } else { + if err = ipt.Insert(table, chain, position, ruleSpec...); err != nil { + klog.Errorf("In %s table, unable to insert iptables rule \"%s\": %v\n", table, strings.Join(ruleSpec, " "), err) + } + } + return nil +} diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index f22707ec7..859a54c4c 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -2,8 +2,11 @@ package route import ( "fmt" + "io/ioutil" "net" "os" + "strconv" + "strings" "sync" "syscall" "time" @@ -12,9 +15,13 @@ import ( clientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" informers "github.com/submariner-io/submariner/pkg/client/informers/externalversions/submariner.io/v1" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + k8sv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + podinformer "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -24,30 +31,51 @@ type Controller struct { clusterID string objectNamespace string - submarinerClientSet clientset.Interface - clustersSynced cache.InformerSynced - endpointsSynced cache.InformerSynced + submarinerClientSet clientset.Interface + clientSet *kubernetes.Clientset + clustersSynced cache.InformerSynced + endpointsSynced cache.InformerSynced + smRouteAgentPodsSynced cache.InformerSynced clusterWorkqueue workqueue.RateLimitingInterface endpointWorkqueue workqueue.RateLimitingInterface + podWorkqueue workqueue.RateLimitingInterface - gw net.IP - subnets []string + gatewayNodeIP net.IP + localClusterCidr []string + localServiceCidr []string + remoteSubnets []string - link *net.Interface + vxlanDevice *vxLanIface + vxlanGw net.IP + remoteVTEPs []string + + isGatewayNode bool + link *net.Interface } -func NewController(clusterID string, objectNamespace string, link *net.Interface, submarinerClientSet clientset.Interface, - clusterInformer informers.ClusterInformer, endpointInformer informers.EndpointInformer) *Controller { +const VXLAN_IFACE = "vxlan100" +const VXLAN_PORT = "4800" +const VXLAN_VTEP_NETWORK_PREFIX = "240" +const SM_POSTROUTING_CHAIN = "SUBMARINER-POSTROUTING" +const SM_ROUTE_AGENT_FILTER = "app=submariner-routeagent" + +func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, objectNamespace string, link *net.Interface, submarinerClientSet clientset.Interface, clientSet *kubernetes.Clientset, clusterInformer informers.ClusterInformer, endpointInformer informers.EndpointInformer, podInformer podinformer.PodInformer) *Controller { controller := Controller{ - clusterID: clusterID, - objectNamespace: objectNamespace, - submarinerClientSet: submarinerClientSet, - link: link, - clustersSynced: clusterInformer.Informer().HasSynced, - endpointsSynced: endpointInformer.Informer().HasSynced, - clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"), - endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"), + clusterID: clusterID, + objectNamespace: objectNamespace, + localClusterCidr: ClusterCidr, + localServiceCidr: ServiceCidr, + submarinerClientSet: submarinerClientSet, + clientSet: clientSet, + link: link, + isGatewayNode: false, + clustersSynced: clusterInformer.Informer().HasSynced, + endpointsSynced: endpointInformer.Informer().HasSynced, + smRouteAgentPodsSynced: podInformer.Informer().HasSynced, + clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"), + endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"), + podWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"), } clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -66,6 +94,14 @@ func NewController(clusterID string, objectNamespace string, link *net.Interface DeleteFunc: controller.handleRemovedEndpoint, }) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueuePod, + UpdateFunc: func(old, new interface{}) { + controller.enqueuePod(new) + }, + DeleteFunc: controller.handleRemovedPod, + }) + return &controller } @@ -75,31 +111,48 @@ func (r *Controller) Run(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() // Start the informer factories to begin populating the informer caches - klog.Info("Starting Route Controller") + klog.V(4).Infof("Starting Route Controller. ClusterID: %s, localClusterCIDR: %v, localServiceCIDR: %v", r.clusterID, r.localClusterCidr, r.localServiceCidr) // Wait for the caches to be synced before starting workers - klog.Info("Waiting for endpoint informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, r.endpointsSynced, r.clustersSynced); !ok { + klog.Info("Waiting for endpoint informer caches to sync.") + if ok := cache.WaitForCacheSync(stopCh, r.endpointsSynced, r.clustersSynced, r.smRouteAgentPodsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } - // let's go ahead and pre-populate clusters + // Create the necessary IPTable chains in the filter and nat tables. + err := r.createIPTableChains() + if err != nil { + return fmt.Errorf("Failed to program the necessary iptable rules.") + } + // let's go ahead and pre-populate clusters clusters, err := r.submarinerClientSet.SubmarinerV1().Clusters(r.objectNamespace).List(metav1.ListOptions{}) - if err != nil { klog.Fatalf("error while retrieving all clusters: %v", err) } + // Program iptables rules for traffic destined to all the remote cluster CIDRs for _, cluster := range clusters.Items { if cluster.Spec.ClusterID != r.clusterID { - r.populateCidrBlockList(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) + r.updateIptableRulesForInterclusterTraffic(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) } } + // Query all the submariner-route-agent daemonSet PODs running in the local cluster. + podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SM_ROUTE_AGENT_FILTER}) + if err != nil { + klog.Fatalf("error while retrieving all submariner-route-agent pods: %v", err) + } + + for index, pod := range podList.Items { + klog.V(4).Infof("In %s, podIP of submariner-route-agent[%d] is %s", r.clusterID, index, pod.Status.PodIP) + r.populateRemoteVtepIps(pod.Status.PodIP) + } + klog.Info("Starting workers") go wait.Until(r.runClusterWorker, time.Second, stopCh) go wait.Until(r.runEndpointWorker, time.Second, stopCh) + go wait.Until(r.runPodWorker, time.Second, stopCh) wg.Wait() <-stopCh klog.Info("Shutting down workers") @@ -118,12 +171,40 @@ func (r *Controller) runEndpointWorker() { } } -func (r *Controller) populateCidrBlockList(inputCidrBlocks []string) { - for _, cidrBlock := range inputCidrBlocks { - if !containsString(r.subnets, cidrBlock) { - r.subnets = append(r.subnets, cidrBlock) +func (r *Controller) runPodWorker() { + for r.processNextPod() { + + } +} + +func (r *Controller) updateIptableRulesForInterclusterTraffic(inputCidrBlocks []string) { + for _, remoteCidrBlock := range inputCidrBlocks { + if !containsString(r.remoteSubnets, remoteCidrBlock) { + r.remoteSubnets = append(r.remoteSubnets, remoteCidrBlock) + r.programIptableRulesForInterClusterTraffic(remoteCidrBlock) + } + } +} + +func (r *Controller) populateRemoteVtepIps(vtepIP string) { + if !containsString(r.remoteVTEPs, vtepIP) { + r.remoteVTEPs = append(r.remoteVTEPs, vtepIP) + } +} + +func (r *Controller) deleteRemoteVtepIp(vtepIP string) { + if containsString(r.remoteVTEPs, vtepIP) { + r.remoteVTEPs = r.deleteVepEntry(r.remoteVTEPs, vtepIP) + } +} + +func (r *Controller) deleteVepEntry(vtepList []string, entryToDelete string) []string { + for i, v := range vtepList { + if v == entryToDelete { + return append(vtepList[:i], vtepList[i+1:]...) } } + return vtepList } func (r *Controller) processNextCluster() bool { @@ -150,7 +231,7 @@ func (r *Controller) processNextCluster() bool { // no need to reconcile because this endpoint isn't ours } - r.populateCidrBlockList(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) + r.updateIptableRulesForInterclusterTraffic(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) r.clusterWorkqueue.Forget(obj) klog.V(4).Infof("cluster processed by route controller") @@ -165,6 +246,132 @@ func (r *Controller) processNextCluster() bool { return true } +func (r *Controller) getVxlanVtepIPAddress() (net.IP, *net.IPNet, error) { + ipAddr, ipNetwork, err := r.getHostIfaceIPAddress() + if err != nil { + klog.Errorf("Unable to retrieve the IPv4 address on the Host %v", err) + return nil, nil, err + } + + ipSlice := strings.Split(ipAddr.String(), ".") + ipSlice[0] = VXLAN_VTEP_NETWORK_PREFIX + vxlanIP := net.ParseIP(strings.Join(ipSlice, ".")) + return vxlanIP, ipNetwork, nil +} + +func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { + addrs, err := r.link.Addrs() + if len(addrs) > 0 { + for i := range addrs { + ipAddr, ipNetwork, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + klog.Errorf("Unable to ParseCIDR : %v\n", addrs) + } + if ipAddr.To4() != nil { + return ipAddr, ipNetwork, nil + } + } + } + return nil, nil, err +} + +func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { + vtepIP, vtepMask, err := r.getVxlanVtepIPAddress() + if err != nil { + klog.Fatalf("Failed to derive the vxlan vtepIP on the Gateway Node %v", err) + } + + if isGatewayDevice { + vtepPort, _ := strconv.Atoi(VXLAN_PORT) + attrs := &vxLanAttributes{ + name: VXLAN_IFACE, + vxlanId: 100, + group: nil, + srcAddr: nil, + vtepPort: vtepPort, + mtu: 1450, + } + + r.vxlanDevice, err = newVxlanIface(attrs) + if err != nil { + klog.Fatalf("Failed to create vxlan interface on Gateway Node: %v", err) + } + + for _, fdbAddress := range r.remoteVTEPs { + err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00") + if err != nil { + klog.Fatalf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) + } + } + + // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VXLAN_IFACE+"/rp_filter", []byte("2"), 0644) + if err != nil { + klog.Errorf("Unable to update proc entry, err: %s", err) + } else { + klog.Errorf("Successfully updated proc entry ") + } + + } else { + // non-Gateway/Worker Node + vtepPort, _ := strconv.Atoi(VXLAN_PORT) + attrs := &vxLanAttributes{ + name: VXLAN_IFACE, + vxlanId: 100, + group: r.gatewayNodeIP, + srcAddr: vtepIP, + vtepPort: vtepPort, + mtu: 1450, + } + + r.vxlanDevice, err = newVxlanIface(attrs) + if err != nil { + klog.Fatalf("Failed to create vxlan interface on non-Gateway Node: %v", err) + } + } + + err = r.vxlanDevice.configureIPAddress(vtepIP, vtepMask.Mask) + if err != nil { + klog.Fatalf("Failed to configure vxlan interface ipaddress on the Gateway Node %v", err) + } + + return nil +} + +func (r *Controller) processNextPod() bool { + pod, shutdown := r.podWorkqueue.Get() + if shutdown { + return false + } + err := func() error { + defer r.podWorkqueue.Done(pod) + pod := pod.(*k8sv1.Pod) + + klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP) + r.populateRemoteVtepIps(pod.Status.PodIP) + + // A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster + // On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node. + if r.isGatewayNode { + ret := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") + if ret != nil { + klog.Errorf("Failed to add FDB entry on the Gateway Node vxlan iface %v", ret) + } + } + + r.podWorkqueue.Forget(pod) + klog.V(4).Infof("Pod event processed by route controller") + return nil + }() + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + func (r *Controller) processNextEndpoint() bool { obj, shutdown := r.endpointWorkqueue.Get() if shutdown { @@ -195,18 +402,32 @@ func (r *Controller) processNextEndpoint() bool { klog.Fatalf("unable to determine hostname: %v", err) } + klog.V(6).Infof("Local Cluster Gateway Node IP is %s", endpoint.Spec.PrivateIP) + r.gatewayNodeIP = net.ParseIP(endpoint.Spec.PrivateIP) + + ipSlice := strings.Split(r.gatewayNodeIP.String(), ".") + ipSlice[0] = VXLAN_VTEP_NETWORK_PREFIX + // remoteVtepIP is used while programming the routing rules + remoteVtepIP := net.ParseIP(strings.Join(ipSlice, ".")) + r.vxlanGw = remoteVtepIP + if endpoint.Spec.Hostname == hostname { r.cleanRoutes() + r.isGatewayNode = true + if r.createVxLANInterface(true) != nil { + klog.Fatalf("Unable to create VxLAN interface on GatewayNode (%s): %v", hostname, err) + } klog.V(6).Infof("not reconciling routes because we appear to be the gateway host") return nil } - klog.V(6).Infof("Setting gateway to gw: %s", endpoint.Spec.PrivateIP) + r.isGatewayNode = false + if r.createVxLANInterface(false) != nil { + klog.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err) + } - r.gw = net.ParseIP(endpoint.Spec.PrivateIP) r.cleanXfrmPolicies() err = r.reconcileRoutes() - if err != nil { r.endpointWorkqueue.AddRateLimited(obj) return fmt.Errorf("Error while reconciling routes %v", err) @@ -232,7 +453,6 @@ func (r *Controller) enqueueCluster(obj interface{}) { utilruntime.HandleError(err) return } - klog.V(4).Infof("Enqueueing cluster for route controller %v", obj) r.clusterWorkqueue.AddRateLimited(key) } @@ -243,10 +463,19 @@ func (r *Controller) enqueueEndpoint(obj interface{}) { utilruntime.HandleError(err) return } - klog.V(4).Infof("Enqueueing endpoint for route controller %v", obj) r.endpointWorkqueue.AddRateLimited(key) } +func (r *Controller) enqueuePod(obj interface{}) { + klog.V(6).Infof("Enqueueing pod for route controller %v", obj) + pod := obj.(*k8sv1.Pod) + + // Add the POD event to the workqueue only if the sm-route-agent podIP does not exist in the local cache. + if !containsString(r.remoteVTEPs, pod.Status.HostIP) { + r.podWorkqueue.AddRateLimited(obj) + } +} + func (r *Controller) handleRemovedEndpoint(obj interface{}) { // ideally we should attempt to remove all routes if the endpoint matches our cluster ID var object *v1.Endpoint @@ -280,15 +509,30 @@ func (r *Controller) handleRemovedCluster(obj interface{}) { // ideally we should attempt to remove all routes if the endpoint matches our cluster ID } +func (r *Controller) handleRemovedPod(obj interface{}) { + klog.V(6).Infof("Removing podIP in route controller %v", obj) + pod := obj.(*k8sv1.Pod) + + if containsString(r.remoteVTEPs, pod.Status.HostIP) { + r.deleteRemoteVtepIp(pod.Status.HostIP) + if r.isGatewayNode { + ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") + if ret != nil { + klog.Errorf("Failed to delete FDB entry on the Gateway Node vxlan iface %v", ret) + } + } + } +} + func (r *Controller) cleanRoutes() { - link, err := netlink.LinkByName(r.link.Name) + link, err := netlink.LinkByName(VXLAN_IFACE) if err != nil { - klog.Errorf("Error retrieving link by name %s: %v", r.link.Name, err) + klog.Errorf("Error retrieving link by name %s: %v", VXLAN_IFACE, err) return } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - klog.Errorf("Error retrieving routes: %v", err) + klog.Errorf("Error retrieving routes on the link %s: %v", VXLAN_IFACE, err) return } for _, route := range currentRouteList { @@ -296,7 +540,7 @@ func (r *Controller) cleanRoutes() { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if containsString(r.subnets, route.Dst.String()) { + if containsString(r.remoteSubnets, route.Dst.String()) { klog.V(6).Infof("Removing route %s", route.String()) if err = netlink.RouteDel(&route); err != nil { klog.Errorf("Error removing route %s: %v", route.String(), err) @@ -325,15 +569,15 @@ func (r *Controller) cleanXfrmPolicies() { // Reconcile the routes installed on this device using rtnetlink func (r *Controller) reconcileRoutes() error { - link, err := netlink.LinkByName(r.link.Name) + link, err := netlink.LinkByName(VXLAN_IFACE) if err != nil { - return fmt.Errorf("Error retrieving link by name %s: %v", r.link.Name, err) + return fmt.Errorf("Error retrieving link by name %s: %v", VXLAN_IFACE, err) } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", r.link.Name, err) + return fmt.Errorf("Error retrieving routes for link %s: %v", VXLAN_IFACE, err) } // First lets delete all of the routes that don't match @@ -343,7 +587,7 @@ func (r *Controller) reconcileRoutes() error { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if containsString(r.subnets, route.Dst.String()) && route.Gw.Equal(r.gw) { + if containsString(r.remoteSubnets, route.Dst.String()) && route.Gw.Equal(r.vxlanGw) { klog.V(6).Infof("Found route %s with gw %s already installed", route.String(), route.Gw.String()) } else { klog.V(6).Infof("Removing route %s", route.String()) @@ -357,11 +601,11 @@ func (r *Controller) reconcileRoutes() error { currentRouteList, err = netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", r.link.Name, err) + return fmt.Errorf("Error retrieving routes for link %s: %v", VXLAN_IFACE, err) } // let's now add the routes that are missing - for _, cidrBlock := range r.subnets { + for _, cidrBlock := range r.remoteSubnets { _, dst, err := net.ParseCIDR(cidrBlock) if err != nil { klog.Errorf("Error parsing cidr block %s: %v", cidrBlock, err) @@ -369,15 +613,17 @@ func (r *Controller) reconcileRoutes() error { } route := netlink.Route{ Dst: dst, - Gw: r.gw, + Gw: r.vxlanGw, + Scope: unix.RT_SCOPE_UNIVERSE, LinkIndex: link.Attrs().Index, + Protocol: 4, } found := false for _, curRoute := range currentRouteList { if curRoute.Gw == nil || curRoute.Dst == nil { } else { - if curRoute.Gw.Equal(route.Gw) && curRoute.Dst == route.Dst { + if curRoute.Gw.Equal(route.Gw) && curRoute.Dst.String() == route.Dst.String() { klog.V(6).Infof("Found equivalent route, not adding") found = true } diff --git a/pkg/routeagent/controllers/route/route_test.go b/pkg/routeagent/controllers/route/route_test.go index edde3256c..1a7560fdf 100644 --- a/pkg/routeagent/controllers/route/route_test.go +++ b/pkg/routeagent/controllers/route/route_test.go @@ -11,18 +11,18 @@ var _ = Describe("Route", func() { Describe("Function populateCidrBlockList", func() { Context("When input CIDR blocks are not present in the existing subnets", func() { It("Should append the CIDR blocks to subnets", func() { - routeController := Controller{subnets: []string{"192.168.1.0/24"}} - routeController.populateCidrBlockList([]string{"10.10.10.0/24", "192.168.1.0/24"}) + routeController := Controller{remoteSubnets: []string{"192.168.1.0/24"}} + routeController.updateIptableRulesForInterclusterTraffic([]string{"10.10.10.0/24", "192.168.1.0/24"}) want := []string{"192.168.1.0/24", "10.10.10.0/24"} - Expect(routeController.subnets).To(Equal(want)) + Expect(routeController.remoteSubnets).To(Equal(want)) }) }) Context("When input CIDR blocks are present in the existing subnets", func() { It("Should not append the CIDR blocks to subnets", func() { - routeController := Controller{subnets: []string{"10.10.10.0/24"}} - routeController.populateCidrBlockList([]string{"10.10.10.0/24", "192.168.1.0/24"}) + routeController := Controller{remoteSubnets: []string{"10.10.10.0/24"}} + routeController.updateIptableRulesForInterclusterTraffic([]string{"10.10.10.0/24", "192.168.1.0/24"}) want := []string{"10.10.10.0/24", "192.168.1.0/24"} - Expect(routeController.subnets).To(Equal(want)) + Expect(routeController.remoteSubnets).To(Equal(want)) }) }) }) diff --git a/pkg/routeagent/controllers/route/vxlan.go b/pkg/routeagent/controllers/route/vxlan.go new file mode 100644 index 000000000..37f9c322c --- /dev/null +++ b/pkg/routeagent/controllers/route/vxlan.go @@ -0,0 +1,178 @@ +package route + +import ( + "net" + "syscall" + + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "k8s.io/klog" +) + +type vxLanAttributes struct { + name string + vxlanId int + group net.IP + srcAddr net.IP + vtepPort int + mtu int +} + +type vxLanIface struct { + link *netlink.Vxlan +} + +func newVxlanIface(attrs *vxLanAttributes) (*vxLanIface, error) { + iface := &netlink.Vxlan{ + LinkAttrs: netlink.LinkAttrs{ + Name: attrs.name, + MTU: attrs.mtu, + Flags: net.FlagUp, + }, + VxlanId: attrs.vxlanId, + SrcAddr: attrs.srcAddr, + Group: attrs.group, + Port: attrs.vtepPort, + } + + vxLANIface := &vxLanIface{ + link: iface, + } + + if err := createVxLanIface(vxLANIface); err != nil { + return nil, err + } + + return vxLANIface, nil +} + +func createVxLanIface(iface *vxLanIface) error { + err := netlink.LinkAdd(iface.link) + if err == syscall.EEXIST { + // Get the properties of existing vxlan interface + existing, err := netlink.LinkByName(iface.link.Name) + if err != nil { + return err + } + + if isVxlanConfigTheSame(iface.link, existing) { + klog.V(4).Infof("VxLAN interface already exists with same configuration.") + iface.link = existing.(*netlink.Vxlan) + return nil + } + + // Config does not match, delete the existing interface and re-create it. + if err = netlink.LinkDel(existing); err != nil { + klog.V(4).Infof("Failed to delete the existing vxlan interface: %v", err) + return err + } + + if err = netlink.LinkAdd(iface.link); err != nil { + klog.V(4).Infof("Failed to re-create the the vxlan interface: %v", err) + return err + } + } else if err != nil { + klog.V(4).Infof("Failed to create the the vxlan interface: %v", err) + return err + } + + return nil +} + +func isVxlanConfigTheSame(new, current netlink.Link) bool { + + required := new.(*netlink.Vxlan) + existing := current.(*netlink.Vxlan) + + if required.VxlanId != existing.VxlanId { + klog.V(4).Infof("VxlanId of existing interface (%d) does not match with required VxlanId (%d)", existing.VxlanId, required.VxlanId) + return false + } + + if len(required.Group) > 0 && len(existing.Group) > 0 && !required.Group.Equal(existing.Group) { + klog.V(4).Infof("Vxlan Group of existing interface (%v) does not match with required Group (%v)", existing.Group, required.Group) + return false + } + + if len(required.SrcAddr) > 0 && len(existing.SrcAddr) > 0 && !required.SrcAddr.Equal(existing.SrcAddr) { + klog.V(4).Infof("Vxlan SrcAddr of existing interface (%v) does not match with required SrcAddr (%v)", existing.SrcAddr, required.SrcAddr) + return false + } + + if required.Port > 0 && existing.Port > 0 && required.Port != existing.Port { + klog.V(4).Infof("Vxlan Port of existing interface (%d) does not match with required Port (%d)", existing.Port, required.Port) + return false + } + + return true +} + +func (iface *vxLanIface) configureIPAddress(ipAddress net.IP, mask net.IPMask) error { + ipConfig := &netlink.Addr{IPNet: &net.IPNet{ + IP: ipAddress, + Mask: mask, + }} + + err := netlink.AddrAdd(iface.link, ipConfig) + if err == syscall.EEXIST { + return nil + } else if err != nil { + klog.Errorf("Unable to configure address (%s) on vxlan interface (%s). %v", ipAddress, iface.link.Name, err) + return err + } + return nil +} + +func (iface *vxLanIface) AddFDB(ipAddress net.IP, hwAddr string) error { + macAddr, err := net.ParseMAC(hwAddr) + if err != nil { + klog.Errorf("Invalid MAC Address (%s) supplied. %v", hwAddr, err) + return err + } + + neigh := &netlink.Neigh{ + LinkIndex: iface.link.Index, + Family: unix.AF_BRIDGE, + Flags: netlink.NTF_SELF, + Type: netlink.NDA_DST, + IP: ipAddress, + State: netlink.NUD_PERMANENT | netlink.NUD_NOARP, + HardwareAddr: macAddr, + } + + err = netlink.NeighAppend(neigh) + if err != nil { + klog.Errorf("Unable to add the bridge fdb entry %v, err: %s", neigh, err) + return err + } else { + klog.V(4).Infof("Successfully added the bridge fdb entry %v", neigh) + } + return nil +} + +func (iface *vxLanIface) DelFDB(ipAddress net.IP, hwAddr string) error { + macAddr, err := net.ParseMAC(hwAddr) + if err != nil { + klog.Errorf("Invalid MAC Address (%s) supplied. %v", hwAddr, err) + return err + } + + neigh := &netlink.Neigh{ + LinkIndex: iface.link.Index, + Family: unix.AF_BRIDGE, + Flags: netlink.NTF_SELF, + Type: netlink.NDA_DST, + IP: ipAddress, + State: netlink.NUD_PERMANENT | netlink.NUD_NOARP, + HardwareAddr: macAddr, + } + + err = netlink.NeighDel(neigh) + if err != nil { + klog.Errorf("Unable to delete the bridge fdb entry %v, err: %s", neigh, err) + return err + } else { + klog.V(4).Infof("Successfully deleted the bridge fdb entry %v", neigh) + } + return nil +} diff --git a/pkg/routeagent/main.go b/pkg/routeagent/main.go index 298241083..fedef88d6 100644 --- a/pkg/routeagent/main.go +++ b/pkg/routeagent/main.go @@ -6,6 +6,10 @@ import ( "time" "github.com/kelseyhightower/envconfig" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "github.com/submariner-io/submariner/pkg/routeagent/controllers/route" "github.com/submariner-io/submariner/pkg/util" @@ -23,8 +27,14 @@ var ( ) type SubmarinerRouteControllerSpecification struct { - ClusterID string - Namespace string + ClusterID string + Namespace string + ClusterCidr []string + ServiceCidr []string +} + +func filterRouteAgentPods(options *v1.ListOptions) { + options.LabelSelector = route.SM_ROUTE_AGENT_FILTER } func main() { @@ -53,10 +63,19 @@ func main() { submarinerInformerFactory := submarinerInformers.NewSharedInformerFactoryWithOptions(submarinerClient, time.Second*30, submarinerInformers.WithNamespace(srcs.Namespace)) + clientSet, err := kubernetes.NewForConfig(cfg) + if err != nil { + klog.Errorf("Error building clientset: %s", err.Error()) + return + } + + informerFactory := informers.NewSharedInformerFactoryWithOptions(clientSet, time.Second*60, informers.WithNamespace(srcs.Namespace), informers.WithTweakListOptions(filterRouteAgentPods)) + defLink, err := util.GetDefaultGatewayInterface() - routeController := route.NewController(srcs.ClusterID, srcs.Namespace, defLink, submarinerClient, submarinerInformerFactory.Submariner().V1().Clusters(), submarinerInformerFactory.Submariner().V1().Endpoints()) + routeController := route.NewController(srcs.ClusterID, srcs.ClusterCidr, srcs.ServiceCidr, srcs.Namespace, defLink, submarinerClient, clientSet, submarinerInformerFactory.Submariner().V1().Clusters(), submarinerInformerFactory.Submariner().V1().Endpoints(), informerFactory.Core().V1().Pods()) submarinerInformerFactory.Start(stopCh) + informerFactory.Start(stopCh) var wg sync.WaitGroup diff --git a/test/e2e/framework/network_pods.go b/test/e2e/framework/network_pods.go index e9aa9cf9c..bb07145fb 100644 --- a/test/e2e/framework/network_pods.go +++ b/test/e2e/framework/network_pods.go @@ -70,7 +70,7 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.P Image: "busybox", // We send the string 50 times to put more pressure on the TCP connection and avoid limited // resource environments from not sending at least some data before timeout. - Command: []string{"sh", "-c", "for in in $(seq 50); do echo connector says $SEND_STRING; done | nc -v $REMOTE_IP $REMOTE_PORT -w 5 >/dev/termination-log 2>&1"}, + Command: []string{"sh", "-c", "for in in $(seq 50); do echo connector says $SEND_STRING; done | nc -v $REMOTE_IP $REMOTE_PORT -w 8 >/dev/termination-log 2>&1"}, Env: []v1.EnvVar{ {Name: "REMOTE_PORT", Value: strconv.Itoa(TestPort)}, {Name: "SEND_STRING", Value: sendString}, From 28f1b1d95bc1cd0651cbf01259f1ecd7705f6ed0 Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Wed, 4 Sep 2019 01:21:26 +0530 Subject: [PATCH 2/9] Use VxLAN overlay tunnels for inter-cluster traffic As part of supporting Network policies and for ease of debugging, this patch implements the following. 1. Creates VxLAN tunnels in the local Cluster between the worker nodes and the Cluster Gateway Node. 2. Programms the necessary iptable rules on the Cluster nodes to allow inter-cluster traffic. 3. This patch also avoids SNAT/MASQ for inter-cluster traffic, thereby preserving the original source ip of the POD all the way until the destination POD. 4. Programs the routing rules on the workerNodes to forward the remoteCluster traffic over the VxLAN interface that is created between the worker node and Cluster GatewayNode. This patch depends on the following other patches Depends-On: https://github.com/submariner-io/submariner/pull/135 Depends-On: https://github.com/submariner-io/submariner-charts/pull/3 Depends-On: https://github.com/submariner-io/submariner-charts/pull/4 --- pkg/routeagent/controllers/route/iptables.go | 23 ++-- pkg/routeagent/controllers/route/route.go | 126 ++++++++---------- .../controllers/route/route_test.go | 40 ++---- pkg/routeagent/controllers/route/stringset.go | 28 ++++ .../controllers/route/stringset_test.go | 55 ++++++++ pkg/routeagent/main.go | 2 +- 6 files changed, 164 insertions(+), 110 deletions(-) create mode 100644 pkg/routeagent/controllers/route/stringset.go create mode 100644 pkg/routeagent/controllers/route/stringset_test.go diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go index 9255cf906..efc84041c 100644 --- a/pkg/routeagent/controllers/route/iptables.go +++ b/pkg/routeagent/controllers/route/iptables.go @@ -2,6 +2,7 @@ package route import ( "fmt" + "strconv" "strings" "github.com/coreos/go-iptables/iptables" @@ -15,13 +16,13 @@ func (r *Controller) createIPTableChains() error { return err } - klog.V(4).Infof("Install/ensure %s chain exists", SM_POSTROUTING_CHAIN) - if err = ipt.NewChain("nat", SM_POSTROUTING_CHAIN); err != nil { - klog.Errorf("Unable to create %s chain in iptables: %v", SM_POSTROUTING_CHAIN, err) + klog.V(4).Infof("Install/ensure %s chain exists", SmPostRoutingChain) + if err = ipt.NewChain("nat", SmPostRoutingChain); err != nil { + klog.Errorf("Unable to create %s chain in iptables: %v", SmPostRoutingChain, err) } - klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SM_POSTROUTING_CHAIN) - forwardToSubPostroutingRuleSpec := []string{"-j", SM_POSTROUTING_CHAIN} + klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SmPostRoutingChain) + forwardToSubPostroutingRuleSpec := []string{"-j", SmPostRoutingChain} if err = r.insertUnique(ipt, "nat", "POSTROUTING", 1, forwardToSubPostroutingRuleSpec); err != nil { klog.Errorf("Unable to insert iptable rule in NAT table, POSTROUTING chain: %v", err) } @@ -37,13 +38,13 @@ func (r *Controller) createIPTableChains() error { } klog.V(4).Infof("Allow VxLAN incoming traffic in SUBMARINER-INPUT Chain") - ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", VXLAN_PORT, "-j", "ACCEPT"} + ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", strconv.Itoa(VxLANPort), "-j", "ACCEPT"} if err = ipt.AppendUnique("filter", "SUBMARINER-INPUT", ruleSpec...); err != nil { klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) } - klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VXLAN_IFACE) - ruleSpec = []string{"-o", VXLAN_IFACE, "-j", "ACCEPT"} + klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface) + ruleSpec = []string{"-o", VxLANIface, "-j", "ACCEPT"} if err = r.insertUnique(ipt, "filter", "FORWARD", 1, ruleSpec); err != nil { klog.Errorf("Unable to insert iptable rule in filter table to allow vxlan traffic: %v", err) } @@ -60,14 +61,14 @@ func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock s for _, localClusterCidr := range r.localClusterCidr { ruleSpec := []string{"-s", localClusterCidr, "-d", remoteCidrBlock, "-j", "ACCEPT"} klog.V(4).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(ruleSpec, " ")) - if err = ipt.AppendUnique("nat", SM_POSTROUTING_CHAIN, ruleSpec...); err != nil { + if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) } - // Todo: revisit we only have to program to the PODCidr + // Todo: revisit, we only have to program rules to allow traffic from the podCidr ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"} klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " ")) - if err = ipt.AppendUnique("nat", SM_POSTROUTING_CHAIN, ruleSpec...); err != nil { + if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) } } diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index 859a54c4c..537c00b28 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -44,21 +44,23 @@ type Controller struct { gatewayNodeIP net.IP localClusterCidr []string localServiceCidr []string - remoteSubnets []string + remoteSubnets *StringSet vxlanDevice *vxLanIface vxlanGw net.IP - remoteVTEPs []string + remoteVTEPs *StringSet isGatewayNode bool link *net.Interface } -const VXLAN_IFACE = "vxlan100" -const VXLAN_PORT = "4800" -const VXLAN_VTEP_NETWORK_PREFIX = "240" -const SM_POSTROUTING_CHAIN = "SUBMARINER-POSTROUTING" -const SM_ROUTE_AGENT_FILTER = "app=submariner-routeagent" +const ( + VxLANIface = "vxlan100" + VxLANPort = 4800 + VxLANVTepNetworkPrefix = 240 + SmPostRoutingChain = "SUBMARINER-POSTROUTING" + SmRouteAgentFilter = "app=submariner-routeagent" +) func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, objectNamespace string, link *net.Interface, submarinerClientSet clientset.Interface, clientSet *kubernetes.Clientset, clusterInformer informers.ClusterInformer, endpointInformer informers.EndpointInformer, podInformer podinformer.PodInformer) *Controller { controller := Controller{ @@ -70,6 +72,8 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, clientSet: clientSet, link: link, isGatewayNode: false, + remoteSubnets: NewStringSet(), + remoteVTEPs: NewStringSet(), clustersSynced: clusterInformer.Informer().HasSynced, endpointsSynced: endpointInformer.Informer().HasSynced, smRouteAgentPodsSynced: podInformer.Informer().HasSynced, @@ -139,7 +143,7 @@ func (r *Controller) Run(stopCh <-chan struct{}) error { } // Query all the submariner-route-agent daemonSet PODs running in the local cluster. - podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SM_ROUTE_AGENT_FILTER}) + podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SmRouteAgentFilter}) if err != nil { klog.Fatalf("error while retrieving all submariner-route-agent pods: %v", err) } @@ -178,35 +182,20 @@ func (r *Controller) runPodWorker() { } func (r *Controller) updateIptableRulesForInterclusterTraffic(inputCidrBlocks []string) { - for _, remoteCidrBlock := range inputCidrBlocks { - if !containsString(r.remoteSubnets, remoteCidrBlock) { - r.remoteSubnets = append(r.remoteSubnets, remoteCidrBlock) - r.programIptableRulesForInterClusterTraffic(remoteCidrBlock) + for _, inputCidrBlock := range inputCidrBlocks { + if !r.remoteSubnets.Contains(inputCidrBlock) { + r.remoteSubnets.Add(inputCidrBlock) + r.programIptableRulesForInterClusterTraffic(inputCidrBlock) } } } func (r *Controller) populateRemoteVtepIps(vtepIP string) { - if !containsString(r.remoteVTEPs, vtepIP) { - r.remoteVTEPs = append(r.remoteVTEPs, vtepIP) + if !r.remoteVTEPs.Contains(vtepIP) { + r.remoteVTEPs.Add(vtepIP) } } -func (r *Controller) deleteRemoteVtepIp(vtepIP string) { - if containsString(r.remoteVTEPs, vtepIP) { - r.remoteVTEPs = r.deleteVepEntry(r.remoteVTEPs, vtepIP) - } -} - -func (r *Controller) deleteVepEntry(vtepList []string, entryToDelete string) []string { - for i, v := range vtepList { - if v == entryToDelete { - return append(vtepList[:i], vtepList[i+1:]...) - } - } - return vtepList -} - func (r *Controller) processNextCluster() bool { obj, shutdown := r.clusterWorkqueue.Get() if shutdown { @@ -246,17 +235,15 @@ func (r *Controller) processNextCluster() bool { return true } -func (r *Controller) getVxlanVtepIPAddress() (net.IP, *net.IPNet, error) { - ipAddr, ipNetwork, err := r.getHostIfaceIPAddress() - if err != nil { - klog.Errorf("Unable to retrieve the IPv4 address on the Host %v", err) - return nil, nil, err +func (r *Controller) getVxlanVtepIPAddress(ipAddr string) (net.IP, error) { + ipSlice := strings.Split(ipAddr, ".") + if len(ipSlice) < 4 { + return nil, fmt.Errorf("Invalid ipAddr [%s]", ipAddr) } - ipSlice := strings.Split(ipAddr.String(), ".") - ipSlice[0] = VXLAN_VTEP_NETWORK_PREFIX + ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix) vxlanIP := net.ParseIP(strings.Join(ipSlice, ".")) - return vxlanIP, ipNetwork, nil + return vxlanIP, nil } func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { @@ -276,19 +263,25 @@ func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { } func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { - vtepIP, vtepMask, err := r.getVxlanVtepIPAddress() + ipAddr, vtepMask, err := r.getHostIfaceIPAddress() + if err != nil { + klog.Errorf("Unable to retrieve the IPv4 address on the Host %v", err) + return err + } + + vtepIP, err := r.getVxlanVtepIPAddress(ipAddr.String()) if err != nil { - klog.Fatalf("Failed to derive the vxlan vtepIP on the Gateway Node %v", err) + klog.Errorf("Failed to derive the vxlan vtepIP %v", err) + return err } if isGatewayDevice { - vtepPort, _ := strconv.Atoi(VXLAN_PORT) attrs := &vxLanAttributes{ - name: VXLAN_IFACE, + name: VxLANIface, vxlanId: 100, group: nil, srcAddr: nil, - vtepPort: vtepPort, + vtepPort: VxLANPort, mtu: 1450, } @@ -297,7 +290,7 @@ func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { klog.Fatalf("Failed to create vxlan interface on Gateway Node: %v", err) } - for _, fdbAddress := range r.remoteVTEPs { + for fdbAddress, _ := range r.remoteVTEPs.set { err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00") if err != nil { klog.Fatalf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) @@ -305,22 +298,22 @@ func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { } // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VXLAN_IFACE+"/rp_filter", []byte("2"), 0644) + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VxLANIface+"/rp_filter", []byte("2"), 0644) if err != nil { klog.Errorf("Unable to update proc entry, err: %s", err) + return err } else { klog.Errorf("Successfully updated proc entry ") } } else { // non-Gateway/Worker Node - vtepPort, _ := strconv.Atoi(VXLAN_PORT) attrs := &vxLanAttributes{ - name: VXLAN_IFACE, + name: VxLANIface, vxlanId: 100, group: r.gatewayNodeIP, srcAddr: vtepIP, - vtepPort: vtepPort, + vtepPort: VxLANPort, mtu: 1450, } @@ -350,7 +343,7 @@ func (r *Controller) processNextPod() bool { klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP) r.populateRemoteVtepIps(pod.Status.PodIP) - // A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster + // A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster. // On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node. if r.isGatewayNode { ret := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") @@ -406,7 +399,7 @@ func (r *Controller) processNextEndpoint() bool { r.gatewayNodeIP = net.ParseIP(endpoint.Spec.PrivateIP) ipSlice := strings.Split(r.gatewayNodeIP.String(), ".") - ipSlice[0] = VXLAN_VTEP_NETWORK_PREFIX + ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix) // remoteVtepIP is used while programming the routing rules remoteVtepIP := net.ParseIP(strings.Join(ipSlice, ".")) r.vxlanGw = remoteVtepIP @@ -471,7 +464,7 @@ func (r *Controller) enqueuePod(obj interface{}) { pod := obj.(*k8sv1.Pod) // Add the POD event to the workqueue only if the sm-route-agent podIP does not exist in the local cache. - if !containsString(r.remoteVTEPs, pod.Status.HostIP) { + if !r.remoteVTEPs.Contains(pod.Status.HostIP) { r.podWorkqueue.AddRateLimited(obj) } } @@ -513,8 +506,8 @@ func (r *Controller) handleRemovedPod(obj interface{}) { klog.V(6).Infof("Removing podIP in route controller %v", obj) pod := obj.(*k8sv1.Pod) - if containsString(r.remoteVTEPs, pod.Status.HostIP) { - r.deleteRemoteVtepIp(pod.Status.HostIP) + if r.remoteVTEPs.Contains(pod.Status.HostIP) { + r.remoteVTEPs.Delete(pod.Status.HostIP) if r.isGatewayNode { ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") if ret != nil { @@ -525,14 +518,14 @@ func (r *Controller) handleRemovedPod(obj interface{}) { } func (r *Controller) cleanRoutes() { - link, err := netlink.LinkByName(VXLAN_IFACE) + link, err := netlink.LinkByName(VxLANIface) if err != nil { - klog.Errorf("Error retrieving link by name %s: %v", VXLAN_IFACE, err) + klog.Errorf("Error retrieving link by name %s: %v", VxLANIface, err) return } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - klog.Errorf("Error retrieving routes on the link %s: %v", VXLAN_IFACE, err) + klog.Errorf("Error retrieving routes on the link %s: %v", VxLANIface, err) return } for _, route := range currentRouteList { @@ -540,7 +533,7 @@ func (r *Controller) cleanRoutes() { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if containsString(r.remoteSubnets, route.Dst.String()) { + if r.remoteSubnets.Contains(route.Dst.String()) { klog.V(6).Infof("Removing route %s", route.String()) if err = netlink.RouteDel(&route); err != nil { klog.Errorf("Error removing route %s: %v", route.String(), err) @@ -569,15 +562,15 @@ func (r *Controller) cleanXfrmPolicies() { // Reconcile the routes installed on this device using rtnetlink func (r *Controller) reconcileRoutes() error { - link, err := netlink.LinkByName(VXLAN_IFACE) + link, err := netlink.LinkByName(VxLANIface) if err != nil { - return fmt.Errorf("Error retrieving link by name %s: %v", VXLAN_IFACE, err) + return fmt.Errorf("Error retrieving link by name %s: %v", VxLANIface, err) } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", VXLAN_IFACE, err) + return fmt.Errorf("Error retrieving routes for link %s: %v", VxLANIface, err) } // First lets delete all of the routes that don't match @@ -587,7 +580,7 @@ func (r *Controller) reconcileRoutes() error { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if containsString(r.remoteSubnets, route.Dst.String()) && route.Gw.Equal(r.vxlanGw) { + if r.remoteSubnets.Contains(route.Dst.String()) && route.Gw.Equal(r.vxlanGw) { klog.V(6).Infof("Found route %s with gw %s already installed", route.String(), route.Gw.String()) } else { klog.V(6).Infof("Removing route %s", route.String()) @@ -601,11 +594,11 @@ func (r *Controller) reconcileRoutes() error { currentRouteList, err = netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", VXLAN_IFACE, err) + return fmt.Errorf("Error retrieving routes for link %s: %v", VxLANIface, err) } // let's now add the routes that are missing - for _, cidrBlock := range r.remoteSubnets { + for cidrBlock, _ := range r.remoteSubnets.set { _, dst, err := net.ParseCIDR(cidrBlock) if err != nil { klog.Errorf("Error parsing cidr block %s: %v", cidrBlock, err) @@ -639,12 +632,3 @@ func (r *Controller) reconcileRoutes() error { } return nil } - -func containsString(c []string, s string) bool { - for _, v := range c { - if v == s { - return true - } - } - return false -} diff --git a/pkg/routeagent/controllers/route/route_test.go b/pkg/routeagent/controllers/route/route_test.go index 1a7560fdf..ca1c4d810 100644 --- a/pkg/routeagent/controllers/route/route_test.go +++ b/pkg/routeagent/controllers/route/route_test.go @@ -1,44 +1,30 @@ package route import ( + "strconv" "testing" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -var _ = Describe("Route", func() { - Describe("Function populateCidrBlockList", func() { - Context("When input CIDR blocks are not present in the existing subnets", func() { - It("Should append the CIDR blocks to subnets", func() { - routeController := Controller{remoteSubnets: []string{"192.168.1.0/24"}} - routeController.updateIptableRulesForInterclusterTraffic([]string{"10.10.10.0/24", "192.168.1.0/24"}) - want := []string{"192.168.1.0/24", "10.10.10.0/24"} - Expect(routeController.remoteSubnets).To(Equal(want)) +var _ = Describe("getVxlanVtepIPAddress", func() { + Describe("Unit tests for getVxlanVtepIPAddress", func() { + Context("When a valid ipaddress is provided to getVxlanVtepIPAddress", func() { + It("Should return the VxLAN VtepIP that can be configured", func() { + routeController := Controller{} + vtepIP, _ := routeController.getVxlanVtepIPAddress("192.168.100.24") + Expect(vtepIP.String()).Should(Equal(strconv.Itoa(VxLANVTepNetworkPrefix) + ".168.100.24")) }) }) - Context("When input CIDR blocks are present in the existing subnets", func() { - It("Should not append the CIDR blocks to subnets", func() { - routeController := Controller{remoteSubnets: []string{"10.10.10.0/24"}} - routeController.updateIptableRulesForInterclusterTraffic([]string{"10.10.10.0/24", "192.168.1.0/24"}) - want := []string{"10.10.10.0/24", "192.168.1.0/24"} - Expect(routeController.remoteSubnets).To(Equal(want)) - }) - }) - }) - Describe("Function containsString", func() { - Context("When the given array of strings contains specified string", func() { - It("Should return true", func() { - Expect(containsString([]string{"unit", "test"}, "unit")).To(BeTrue()) + Context("When an invalid ipaddress is provided to getVxlanVtepIPAddress", func() { + It("Should return an error", func() { + routeController := Controller{} + _, err := routeController.getVxlanVtepIPAddress("10.0.0") + Expect(err).ShouldNot(Equal(nil)) }) }) - Context("When the given array of strings does not contain specified string", func() { - It("Should return false", func() { - Expect(containsString([]string{"unit", "test"}, "ginkgo")).To(BeFalse()) - }) - - }) }) }) diff --git a/pkg/routeagent/controllers/route/stringset.go b/pkg/routeagent/controllers/route/stringset.go new file mode 100644 index 000000000..42f69a3d7 --- /dev/null +++ b/pkg/routeagent/controllers/route/stringset.go @@ -0,0 +1,28 @@ +package route + +type StringSet struct { + set map[string]bool +} + +func NewStringSet() *StringSet { + return &StringSet{make(map[string]bool)} +} + +func (set *StringSet) Add(s string) bool { + _, found := set.set[s] + set.set[s] = true + return !found +} + +func (set *StringSet) Contains(s string) bool { + _, found := set.set[s] + return found +} + +func (set *StringSet) Size() int { + return len(set.set) +} + +func (set *StringSet) Delete(s string) { + delete(set.set, s) +} diff --git a/pkg/routeagent/controllers/route/stringset_test.go b/pkg/routeagent/controllers/route/stringset_test.go new file mode 100644 index 000000000..d003148a2 --- /dev/null +++ b/pkg/routeagent/controllers/route/stringset_test.go @@ -0,0 +1,55 @@ +package route + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("StringSet", func() { + Describe("Unit tests for NewStringSet", func() { + Context("When subnetList contains a specified string", func() { + It("Should return true", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + Expect(subnetList.Contains("192.168.2.0/24")).Should(Equal(true)) + }) + }) + Context("When subnetList does not contain a specified string", func() { + It("Should return false", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + Expect(subnetList.Contains("192.168.3.0/24")).Should(Equal(false)) + }) + }) + Context("When subnetList already has an entry", func() { + It("Should not append to the list", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + subnetList.Add("192.168.3.0/24") + subnetList.Add("192.168.2.0/24") + Expect(subnetList.Size()).Should(Equal(3)) + }) + }) + Context("When an entry is deleted from subnetList", func() { + It("Should be removed from the list", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + subnetList.Add("192.168.3.0/24") + subnetList.Delete("192.168.2.0/24") + Expect(subnetList.Contains("192.168.2.0/24")).Should(Equal(false)) + Expect(subnetList.Size()).Should(Equal(2)) + }) + }) + }) +}) + +func TestStringSet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "StringSet Suite") +} diff --git a/pkg/routeagent/main.go b/pkg/routeagent/main.go index fedef88d6..4457794db 100644 --- a/pkg/routeagent/main.go +++ b/pkg/routeagent/main.go @@ -34,7 +34,7 @@ type SubmarinerRouteControllerSpecification struct { } func filterRouteAgentPods(options *v1.ListOptions) { - options.LabelSelector = route.SM_ROUTE_AGENT_FILTER + options.LabelSelector = route.SmRouteAgentFilter } func main() { From eaeccced70c96ff4c9e3b43855a40d2f9603924e Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Wed, 4 Sep 2019 12:47:14 +0530 Subject: [PATCH 3/9] Moved StringSet to util package --- pkg/routeagent/controllers/route/route.go | 13 +++++++------ .../controllers/route => util}/stringset.go | 14 +++++++------- .../controllers/route => util}/stringset_test.go | 2 +- 3 files changed, 15 insertions(+), 14 deletions(-) rename pkg/{routeagent/controllers/route => util}/stringset.go (68%) rename pkg/{routeagent/controllers/route => util}/stringset_test.go (99%) diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index 537c00b28..be4344852 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -14,6 +14,7 @@ import ( v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" clientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" informers "github.com/submariner-io/submariner/pkg/client/informers/externalversions/submariner.io/v1" + "github.com/submariner-io/submariner/pkg/util" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" k8sv1 "k8s.io/api/core/v1" @@ -44,11 +45,11 @@ type Controller struct { gatewayNodeIP net.IP localClusterCidr []string localServiceCidr []string - remoteSubnets *StringSet + remoteSubnets *util.StringSet vxlanDevice *vxLanIface vxlanGw net.IP - remoteVTEPs *StringSet + remoteVTEPs *util.StringSet isGatewayNode bool link *net.Interface @@ -72,8 +73,8 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, clientSet: clientSet, link: link, isGatewayNode: false, - remoteSubnets: NewStringSet(), - remoteVTEPs: NewStringSet(), + remoteSubnets: util.NewStringSet(), + remoteVTEPs: util.NewStringSet(), clustersSynced: clusterInformer.Informer().HasSynced, endpointsSynced: endpointInformer.Informer().HasSynced, smRouteAgentPodsSynced: podInformer.Informer().HasSynced, @@ -290,7 +291,7 @@ func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { klog.Fatalf("Failed to create vxlan interface on Gateway Node: %v", err) } - for fdbAddress, _ := range r.remoteVTEPs.set { + for fdbAddress, _ := range r.remoteVTEPs.Set { err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00") if err != nil { klog.Fatalf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) @@ -598,7 +599,7 @@ func (r *Controller) reconcileRoutes() error { } // let's now add the routes that are missing - for cidrBlock, _ := range r.remoteSubnets.set { + for cidrBlock, _ := range r.remoteSubnets.Set { _, dst, err := net.ParseCIDR(cidrBlock) if err != nil { klog.Errorf("Error parsing cidr block %s: %v", cidrBlock, err) diff --git a/pkg/routeagent/controllers/route/stringset.go b/pkg/util/stringset.go similarity index 68% rename from pkg/routeagent/controllers/route/stringset.go rename to pkg/util/stringset.go index 42f69a3d7..389a52b87 100644 --- a/pkg/routeagent/controllers/route/stringset.go +++ b/pkg/util/stringset.go @@ -1,7 +1,7 @@ -package route +package util type StringSet struct { - set map[string]bool + Set map[string]bool } func NewStringSet() *StringSet { @@ -9,20 +9,20 @@ func NewStringSet() *StringSet { } func (set *StringSet) Add(s string) bool { - _, found := set.set[s] - set.set[s] = true + _, found := set.Set[s] + set.Set[s] = true return !found } func (set *StringSet) Contains(s string) bool { - _, found := set.set[s] + _, found := set.Set[s] return found } func (set *StringSet) Size() int { - return len(set.set) + return len(set.Set) } func (set *StringSet) Delete(s string) { - delete(set.set, s) + delete(set.Set, s) } diff --git a/pkg/routeagent/controllers/route/stringset_test.go b/pkg/util/stringset_test.go similarity index 99% rename from pkg/routeagent/controllers/route/stringset_test.go rename to pkg/util/stringset_test.go index d003148a2..0d3e6959b 100644 --- a/pkg/routeagent/controllers/route/stringset_test.go +++ b/pkg/util/stringset_test.go @@ -1,4 +1,4 @@ -package route +package util import ( "testing" From 3475e7802d520576a5337584a5f24f9b911b109f Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Thu, 5 Sep 2019 14:54:07 +0530 Subject: [PATCH 4/9] Mutex is used to protect the contents of critical section. --- pkg/routeagent/controllers/route/iptables.go | 2 +- pkg/routeagent/controllers/route/route.go | 23 ++++++++++++++++++-- pkg/routeagent/controllers/route/vxlan.go | 6 ++--- pkg/util/stringset.go | 23 ++++++++++++++++++-- 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go index efc84041c..d7432cec4 100644 --- a/pkg/routeagent/controllers/route/iptables.go +++ b/pkg/routeagent/controllers/route/iptables.go @@ -65,7 +65,7 @@ func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock s klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) } - // Todo: revisit, we only have to program rules to allow traffic from the podCidr + // TODO: revisit, we only have to program rules to allow traffic from the podCidr ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"} klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " ")) if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index be4344852..f1926ad48 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -56,8 +56,27 @@ type Controller struct { } const ( - VxLANIface = "vxlan100" - VxLANPort = 4800 + VxLANIface = "vxlan100" + VxLANPort = 4800 + + /* Why VxLANVTepNetworkPrefix is 240? + On VxLAN interfaces we need a unique IPAddress which does not collide with the + host ip-address. This is going to be tricky as currently there is no specific + CIDR in K8s that can be used for this purpose. If really required, this can be + taken as an input from the user (i.e., as a configuration parameter). + But we want to avoid any additional inputs particularly if there is a way to automate it. + + So, the approach we are taking is to derive the VxLAN ip from the hostIPAddress as shown below. + Example: Say the host ipaddress is "192.168.1.100/16", we prepend 240 to the host-ip address + and configure it on the VxLAN interface (i.e., 240.168.1.100/16). + + The reason behind choosing 240 is that "240.0.0.0/4" is a Reserved IPAddress [*] which + normally will not be assigned on any of the hosts. Also, note that the VxLAN IPs that are + so configured are only used within the local cluster and traffic will not leave the cluster + with the VxLAN ipaddress. + + [*] https://en.wikipedia.org/wiki/Reserved_IP_addresses */ + VxLANVTepNetworkPrefix = 240 SmPostRoutingChain = "SUBMARINER-POSTROUTING" SmRouteAgentFilter = "app=submariner-routeagent" diff --git a/pkg/routeagent/controllers/route/vxlan.go b/pkg/routeagent/controllers/route/vxlan.go index 37f9c322c..8e42074ae 100644 --- a/pkg/routeagent/controllers/route/vxlan.go +++ b/pkg/routeagent/controllers/route/vxlan.go @@ -63,16 +63,16 @@ func createVxLanIface(iface *vxLanIface) error { // Config does not match, delete the existing interface and re-create it. if err = netlink.LinkDel(existing); err != nil { - klog.V(4).Infof("Failed to delete the existing vxlan interface: %v", err) + klog.Errorf("Failed to delete the existing vxlan interface: %v", err) return err } if err = netlink.LinkAdd(iface.link); err != nil { - klog.V(4).Infof("Failed to re-create the the vxlan interface: %v", err) + klog.Errorf("Failed to re-create the the vxlan interface: %v", err) return err } } else if err != nil { - klog.V(4).Infof("Failed to create the the vxlan interface: %v", err) + klog.Errorf("Failed to create the the vxlan interface: %v", err) return err } diff --git a/pkg/util/stringset.go b/pkg/util/stringset.go index 389a52b87..9ff06af62 100644 --- a/pkg/util/stringset.go +++ b/pkg/util/stringset.go @@ -1,28 +1,47 @@ package util +import ( + "sync" +) + type StringSet struct { - Set map[string]bool + syncMutex *sync.Mutex + Set map[string]bool } func NewStringSet() *StringSet { - return &StringSet{make(map[string]bool)} + return &StringSet{ + syncMutex: &sync.Mutex{}, + Set: make(map[string]bool)} } func (set *StringSet) Add(s string) bool { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + _, found := set.Set[s] set.Set[s] = true return !found } func (set *StringSet) Contains(s string) bool { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + _, found := set.Set[s] return found } func (set *StringSet) Size() int { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + return len(set.Set) } func (set *StringSet) Delete(s string) { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + delete(set.Set, s) } From 20e59843732eb0790887b08933c1ebca1ffb0585 Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Fri, 6 Sep 2019 22:48:57 +0530 Subject: [PATCH 5/9] Incorporated review comments. 1. Modified the vxlan interface to vx-submariner 2. error handling in iptables.go and route.go files 3. endpoint delete event --- pkg/routeagent/controllers/route/iptables.go | 48 ++++-- pkg/routeagent/controllers/route/route.go | 147 ++++++++++++------- pkg/routeagent/controllers/route/vxlan.go | 10 ++ 3 files changed, 135 insertions(+), 70 deletions(-) diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go index d7432cec4..aa3def0ec 100644 --- a/pkg/routeagent/controllers/route/iptables.go +++ b/pkg/routeagent/controllers/route/iptables.go @@ -12,24 +12,23 @@ import ( func (r *Controller) createIPTableChains() error { ipt, err := iptables.New() if err != nil { - klog.Errorf("Error while initializing iptables: %v", err) - return err + return fmt.Errorf("error initializing iptables: %v", err) } klog.V(4).Infof("Install/ensure %s chain exists", SmPostRoutingChain) - if err = ipt.NewChain("nat", SmPostRoutingChain); err != nil { - klog.Errorf("Unable to create %s chain in iptables: %v", SmPostRoutingChain, err) + if err = r.createChainIfNotExists(ipt, "nat", SmPostRoutingChain); err != nil { + return fmt.Errorf("Unable to create %s chain in iptables: %v", SmPostRoutingChain, err) } klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SmPostRoutingChain) forwardToSubPostroutingRuleSpec := []string{"-j", SmPostRoutingChain} - if err = r.insertUnique(ipt, "nat", "POSTROUTING", 1, forwardToSubPostroutingRuleSpec); err != nil { + if err = r.prependUnique(ipt, "nat", "POSTROUTING", forwardToSubPostroutingRuleSpec); err != nil { klog.Errorf("Unable to insert iptable rule in NAT table, POSTROUTING chain: %v", err) } klog.V(4).Infof("Install/ensure SUBMARINER-INPUT chain exists") - if err = ipt.NewChain("filter", "SUBMARINER-INPUT"); err != nil { - klog.Errorf("Unable to create SUBMARINER-INPUT chain in iptables: %v", err) + if err = r.createChainIfNotExists(ipt, "filter", "SUBMARINER-INPUT"); err != nil { + return fmt.Errorf("Unable to create SUBMARINER-INPUT chain in iptables: %v", err) } forwardToSubInputRuleSpec := []string{"-p", "udp", "-m", "udp", "-j", "SUBMARINER-INPUT"} @@ -45,48 +44,65 @@ func (r *Controller) createIPTableChains() error { klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface) ruleSpec = []string{"-o", VxLANIface, "-j", "ACCEPT"} - if err = r.insertUnique(ipt, "filter", "FORWARD", 1, ruleSpec); err != nil { + if err = r.prependUnique(ipt, "filter", "FORWARD", ruleSpec); err != nil { klog.Errorf("Unable to insert iptable rule in filter table to allow vxlan traffic: %v", err) } return nil } -func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock string) { +func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock string) error { ipt, err := iptables.New() if err != nil { - klog.Errorf("error while initializing iptables: %v", err) + return fmt.Errorf("error initializing iptables: %v", err) } for _, localClusterCidr := range r.localClusterCidr { ruleSpec := []string{"-s", localClusterCidr, "-d", remoteCidrBlock, "-j", "ACCEPT"} klog.V(4).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(ruleSpec, " ")) if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + return fmt.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) } // TODO: revisit, we only have to program rules to allow traffic from the podCidr ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"} klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " ")) if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + return fmt.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) } } + return nil } -func (r *Controller) insertUnique(ipt *iptables.IPTables, table string, chain string, position int, ruleSpec []string) error { +func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain string, ruleSpec []string) error { rules, err := ipt.List(table, chain) if err != nil { return fmt.Errorf("error listing the rules in %s chain: %v", chain, err) } - if strings.Contains(rules[position], strings.Join(ruleSpec, " ")) { + if strings.Contains(rules[1], strings.Join(ruleSpec, " ")) { klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " ")) return nil } else { - if err = ipt.Insert(table, chain, position, ruleSpec...); err != nil { - klog.Errorf("In %s table, unable to insert iptables rule \"%s\": %v\n", table, strings.Join(ruleSpec, " "), err) + if err = ipt.Insert(table, chain, 1, ruleSpec...); err != nil { + return err } } return nil } + +func (r *Controller) createChainIfNotExists(ipt *iptables.IPTables, table, chain string) error { + existingChains, err := ipt.ListChains(table) + if err != nil { + return err + } + + for _, val := range existingChains { + if val == chain { + // Chain already exists + return nil + } + } + + return ipt.NewChain(table, chain) +} diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index f1926ad48..699926c75 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -42,22 +42,22 @@ type Controller struct { endpointWorkqueue workqueue.RateLimitingInterface podWorkqueue workqueue.RateLimitingInterface - gatewayNodeIP net.IP localClusterCidr []string localServiceCidr []string remoteSubnets *util.StringSet vxlanDevice *vxLanIface - vxlanGw net.IP remoteVTEPs *util.StringSet - isGatewayNode bool - link *net.Interface + isGatewayNode bool + defaultHostIface *net.Interface } const ( - VxLANIface = "vxlan100" - VxLANPort = 4800 + VxLANIface = "vx-submariner" + VxInterfaceWorker = 0 + VxInterfaceGateway = 1 + VxLANPort = 4800 /* Why VxLANVTepNetworkPrefix is 240? On VxLAN interfaces we need a unique IPAddress which does not collide with the @@ -90,7 +90,7 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, localServiceCidr: ServiceCidr, submarinerClientSet: submarinerClientSet, clientSet: clientSet, - link: link, + defaultHostIface: link, isGatewayNode: false, remoteSubnets: util.NewStringSet(), remoteVTEPs: util.NewStringSet(), @@ -135,7 +135,7 @@ func (r *Controller) Run(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() // Start the informer factories to begin populating the informer caches - klog.V(4).Infof("Starting Route Controller. ClusterID: %s, localClusterCIDR: %v, localServiceCIDR: %v", r.clusterID, r.localClusterCidr, r.localServiceCidr) + klog.Infof("Starting Route Controller. ClusterID: %s, localClusterCIDR: %v, localServiceCIDR: %v", r.clusterID, r.localClusterCidr, r.localServiceCidr) // Wait for the caches to be synced before starting workers klog.Info("Waiting for endpoint informer caches to sync.") @@ -146,13 +146,13 @@ func (r *Controller) Run(stopCh <-chan struct{}) error { // Create the necessary IPTable chains in the filter and nat tables. err := r.createIPTableChains() if err != nil { - return fmt.Errorf("Failed to program the necessary iptable rules.") + return fmt.Errorf("createIPTableChains returned error. %v", err) } // let's go ahead and pre-populate clusters clusters, err := r.submarinerClientSet.SubmarinerV1().Clusters(r.objectNamespace).List(metav1.ListOptions{}) if err != nil { - klog.Fatalf("error while retrieving all clusters: %v", err) + return fmt.Errorf("error while retrieving all clusters: %v", err) } // Program iptables rules for traffic destined to all the remote cluster CIDRs @@ -165,7 +165,7 @@ func (r *Controller) Run(stopCh <-chan struct{}) error { // Query all the submariner-route-agent daemonSet PODs running in the local cluster. podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SmRouteAgentFilter}) if err != nil { - klog.Fatalf("error while retrieving all submariner-route-agent pods: %v", err) + return fmt.Errorf("error while retrieving submariner-route-agent pods: %v", err) } for index, pod := range podList.Items { @@ -205,7 +205,10 @@ func (r *Controller) updateIptableRulesForInterclusterTraffic(inputCidrBlocks [] for _, inputCidrBlock := range inputCidrBlocks { if !r.remoteSubnets.Contains(inputCidrBlock) { r.remoteSubnets.Add(inputCidrBlock) - r.programIptableRulesForInterClusterTraffic(inputCidrBlock) + err := r.programIptableRulesForInterClusterTraffic(inputCidrBlock) + if err != nil { + klog.Errorf("Failed to program iptable rule. %v", err) + } } } } @@ -267,7 +270,11 @@ func (r *Controller) getVxlanVtepIPAddress(ipAddr string) (net.IP, error) { } func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { - addrs, err := r.link.Addrs() + addrs, err := r.defaultHostIface.Addrs() + if err != nil { + return nil, nil, err + } + if len(addrs) > 0 { for i := range addrs { ipAddr, ipNetwork, err := net.ParseCIDR(addrs[i].String()) @@ -282,20 +289,18 @@ func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { return nil, nil, err } -func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { +func (r *Controller) createVxLANInterface(ifaceType int, gatewayNodeIP net.IP) error { ipAddr, vtepMask, err := r.getHostIfaceIPAddress() if err != nil { - klog.Errorf("Unable to retrieve the IPv4 address on the Host %v", err) - return err + return fmt.Errorf("Unable to retrieve the IPv4 address on the Host %v", err) } vtepIP, err := r.getVxlanVtepIPAddress(ipAddr.String()) if err != nil { - klog.Errorf("Failed to derive the vxlan vtepIP %v", err) - return err + return fmt.Errorf("Failed to derive the vxlan vtepIP %v", err) } - if isGatewayDevice { + if ifaceType == VxInterfaceGateway { attrs := &vxLanAttributes{ name: VxLANIface, vxlanId: 100, @@ -307,31 +312,30 @@ func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { r.vxlanDevice, err = newVxlanIface(attrs) if err != nil { - klog.Fatalf("Failed to create vxlan interface on Gateway Node: %v", err) + return fmt.Errorf("Failed to create vxlan interface on Gateway Node: %v", err) } - for fdbAddress, _ := range r.remoteVTEPs.Set { + for fdbAddress := range r.remoteVTEPs.Set { err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00") if err != nil { - klog.Fatalf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) + return fmt.Errorf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) } } // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VxLANIface+"/rp_filter", []byte("2"), 0644) if err != nil { - klog.Errorf("Unable to update proc entry, err: %s", err) - return err + return fmt.Errorf("Unable to update vxlan rp_filter proc entry, err: %s", err) } else { - klog.Errorf("Successfully updated proc entry ") + klog.Info("Successfully configured rp_filter to loose mode(2) ") } - } else { + } else if ifaceType == VxInterfaceWorker { // non-Gateway/Worker Node attrs := &vxLanAttributes{ name: VxLANIface, vxlanId: 100, - group: r.gatewayNodeIP, + group: gatewayNodeIP, srcAddr: vtepIP, vtepPort: VxLANPort, mtu: 1450, @@ -339,26 +343,36 @@ func (r *Controller) createVxLANInterface(isGatewayDevice bool) error { r.vxlanDevice, err = newVxlanIface(attrs) if err != nil { - klog.Fatalf("Failed to create vxlan interface on non-Gateway Node: %v", err) + return fmt.Errorf("Failed to create vxlan interface on non-Gateway Node: %v", err) } } err = r.vxlanDevice.configureIPAddress(vtepIP, vtepMask.Mask) if err != nil { - klog.Fatalf("Failed to configure vxlan interface ipaddress on the Gateway Node %v", err) + return fmt.Errorf("Failed to configure vxlan interface ipaddress on the Gateway Node %v", err) } return nil } func (r *Controller) processNextPod() bool { - pod, shutdown := r.podWorkqueue.Get() + obj, shutdown := r.podWorkqueue.Get() if shutdown { return false } err := func() error { - defer r.podWorkqueue.Done(pod) - pod := pod.(*k8sv1.Pod) + defer r.podWorkqueue.Done(obj) + + key := obj.(string) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("Error while splitting meta namespace key %s: %v", key, err) + } + + pod, err := r.clientSet.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("Error retrieving submariner-route-agent pod object %s: %v", name, err) + } klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP) r.populateRemoteVtepIps(pod.Status.PodIP) @@ -366,13 +380,19 @@ func (r *Controller) processNextPod() bool { // A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster. // On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node. if r.isGatewayNode { - ret := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") - if ret != nil { - klog.Errorf("Failed to add FDB entry on the Gateway Node vxlan iface %v", ret) + if r.vxlanDevice != nil { + err := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") + if err != nil { + return fmt.Errorf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) + } + } else { + r.podWorkqueue.AddRateLimited(obj) + klog.Errorf("vxlanDevice is not yet created on the Gateway node") + return nil } } - r.podWorkqueue.Forget(pod) + r.podWorkqueue.Forget(obj) klog.V(4).Infof("Pod event processed by route controller") return nil }() @@ -416,31 +436,31 @@ func (r *Controller) processNextEndpoint() bool { } klog.V(6).Infof("Local Cluster Gateway Node IP is %s", endpoint.Spec.PrivateIP) - r.gatewayNodeIP = net.ParseIP(endpoint.Spec.PrivateIP) - - ipSlice := strings.Split(r.gatewayNodeIP.String(), ".") - ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix) - // remoteVtepIP is used while programming the routing rules - remoteVtepIP := net.ParseIP(strings.Join(ipSlice, ".")) - r.vxlanGw = remoteVtepIP + // If the endpoint hostname matches with our hostname, it implies we are on gateway node if endpoint.Spec.Hostname == hostname { r.cleanRoutes() r.isGatewayNode = true - if r.createVxLANInterface(true) != nil { + if r.createVxLANInterface(VxInterfaceGateway, nil) != nil { klog.Fatalf("Unable to create VxLAN interface on GatewayNode (%s): %v", hostname, err) } klog.V(6).Infof("not reconciling routes because we appear to be the gateway host") return nil } + localClusterGwNodeIP := net.ParseIP(endpoint.Spec.PrivateIP) + remoteVtepIP, err := r.getVxlanVtepIPAddress(localClusterGwNodeIP.String()) + if err != nil { + return fmt.Errorf("Failed to derive the remoteVtepIP %v", err) + } + r.isGatewayNode = false - if r.createVxLANInterface(false) != nil { + if r.createVxLANInterface(VxInterfaceWorker, localClusterGwNodeIP) != nil { klog.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err) } r.cleanXfrmPolicies() - err = r.reconcileRoutes() + err = r.reconcileRoutes(remoteVtepIP) if err != nil { r.endpointWorkqueue.AddRateLimited(obj) return fmt.Errorf("Error while reconciling routes %v", err) @@ -466,6 +486,7 @@ func (r *Controller) enqueueCluster(obj interface{}) { utilruntime.HandleError(err) return } + klog.V(4).Infof("Enqueueing cluster for route controller %v", obj) r.clusterWorkqueue.AddRateLimited(key) } @@ -476,16 +497,23 @@ func (r *Controller) enqueueEndpoint(obj interface{}) { utilruntime.HandleError(err) return } + klog.V(4).Infof("Enqueueing endpoint for route controller %v", obj) r.endpointWorkqueue.AddRateLimited(key) } func (r *Controller) enqueuePod(obj interface{}) { - klog.V(6).Infof("Enqueueing pod for route controller %v", obj) - pod := obj.(*k8sv1.Pod) + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + pod := obj.(*k8sv1.Pod) // Add the POD event to the workqueue only if the sm-route-agent podIP does not exist in the local cache. if !r.remoteVTEPs.Contains(pod.Status.HostIP) { - r.podWorkqueue.AddRateLimited(obj) + klog.V(4).Infof("Enqueueing sm-route-agent-pod event, ip: %s", pod.Status.HostIP) + r.podWorkqueue.AddRateLimited(key) } } @@ -515,6 +543,17 @@ func (r *Controller) handleRemovedEndpoint(obj interface{}) { if object.Spec.Hostname == hostname { r.cleanRoutes() } + + if object.Spec.ClusterID == r.clusterID { + klog.V(6).Infof("Endpoint matches the cluster ID of this cluster") + err := r.vxlanDevice.deleteVxLanIface() + if err != nil { + klog.Errorf("Failed to delete the the vxlan interface on endpoint removal: %v", err) + return + } + r.vxlanDevice = nil + } + klog.V(4).Infof("Removed routes from host") } @@ -528,7 +567,7 @@ func (r *Controller) handleRemovedPod(obj interface{}) { if r.remoteVTEPs.Contains(pod.Status.HostIP) { r.remoteVTEPs.Delete(pod.Status.HostIP) - if r.isGatewayNode { + if r.isGatewayNode && r.vxlanDevice != nil { ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") if ret != nil { klog.Errorf("Failed to delete FDB entry on the Gateway Node vxlan iface %v", ret) @@ -581,7 +620,7 @@ func (r *Controller) cleanXfrmPolicies() { } // Reconcile the routes installed on this device using rtnetlink -func (r *Controller) reconcileRoutes() error { +func (r *Controller) reconcileRoutes(vxlanGw net.IP) error { link, err := netlink.LinkByName(VxLANIface) if err != nil { return fmt.Errorf("Error retrieving link by name %s: %v", VxLANIface, err) @@ -600,7 +639,7 @@ func (r *Controller) reconcileRoutes() error { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if r.remoteSubnets.Contains(route.Dst.String()) && route.Gw.Equal(r.vxlanGw) { + if r.remoteSubnets.Contains(route.Dst.String()) && route.Gw.Equal(vxlanGw) { klog.V(6).Infof("Found route %s with gw %s already installed", route.String(), route.Gw.String()) } else { klog.V(6).Infof("Removing route %s", route.String()) @@ -618,7 +657,7 @@ func (r *Controller) reconcileRoutes() error { } // let's now add the routes that are missing - for cidrBlock, _ := range r.remoteSubnets.Set { + for cidrBlock := range r.remoteSubnets.Set { _, dst, err := net.ParseCIDR(cidrBlock) if err != nil { klog.Errorf("Error parsing cidr block %s: %v", cidrBlock, err) @@ -626,7 +665,7 @@ func (r *Controller) reconcileRoutes() error { } route := netlink.Route{ Dst: dst, - Gw: r.vxlanGw, + Gw: vxlanGw, Scope: unix.RT_SCOPE_UNIVERSE, LinkIndex: link.Attrs().Index, Protocol: 4, diff --git a/pkg/routeagent/controllers/route/vxlan.go b/pkg/routeagent/controllers/route/vxlan.go index 8e42074ae..a8e45f8ea 100644 --- a/pkg/routeagent/controllers/route/vxlan.go +++ b/pkg/routeagent/controllers/route/vxlan.go @@ -79,6 +79,16 @@ func createVxLanIface(iface *vxLanIface) error { return nil } +func (iface *vxLanIface) deleteVxLanIface() error { + err := netlink.LinkDel(iface.link) + if err != nil { + klog.Errorf("Failed to delete the the vxlan interface: %v", err) + return err + } + + return nil +} + func isVxlanConfigTheSame(new, current netlink.Link) bool { required := new.(*netlink.Vxlan) From 4ca4ec8016baaa2790dc1f0353b816808064b3ad Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Tue, 10 Sep 2019 15:54:47 +0530 Subject: [PATCH 6/9] Updated the code based on the review comments. Mostly formatting fixes, passing args and error handling. --- pkg/routeagent/controllers/route/route.go | 74 +++++++++++++---------- pkg/routeagent/controllers/route/vxlan.go | 36 +++++------ pkg/routeagent/main.go | 18 +++++- 3 files changed, 72 insertions(+), 56 deletions(-) diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index 699926c75..142550386 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -28,12 +28,20 @@ import ( "k8s.io/klog" ) +type InformerConfigStruct struct { + SubmarinerClientSet clientset.Interface + ClientSet kubernetes.Interface + ClusterInformer informers.ClusterInformer + EndpointInformer informers.EndpointInformer + PodInformer podinformer.PodInformer +} + type Controller struct { clusterID string objectNamespace string submarinerClientSet clientset.Interface - clientSet *kubernetes.Clientset + clientSet kubernetes.Interface clustersSynced cache.InformerSynced endpointsSynced cache.InformerSynced smRouteAgentPodsSynced cache.InformerSynced @@ -82,27 +90,28 @@ const ( SmRouteAgentFilter = "app=submariner-routeagent" ) -func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, objectNamespace string, link *net.Interface, submarinerClientSet clientset.Interface, clientSet *kubernetes.Clientset, clusterInformer informers.ClusterInformer, endpointInformer informers.EndpointInformer, podInformer podinformer.PodInformer) *Controller { +func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, objectNamespace string, + link *net.Interface, config InformerConfigStruct) *Controller { controller := Controller{ clusterID: clusterID, objectNamespace: objectNamespace, localClusterCidr: ClusterCidr, localServiceCidr: ServiceCidr, - submarinerClientSet: submarinerClientSet, - clientSet: clientSet, + submarinerClientSet: config.SubmarinerClientSet, + clientSet: config.ClientSet, defaultHostIface: link, isGatewayNode: false, remoteSubnets: util.NewStringSet(), remoteVTEPs: util.NewStringSet(), - clustersSynced: clusterInformer.Informer().HasSynced, - endpointsSynced: endpointInformer.Informer().HasSynced, - smRouteAgentPodsSynced: podInformer.Informer().HasSynced, + clustersSynced: config.ClusterInformer.Informer().HasSynced, + endpointsSynced: config.EndpointInformer.Informer().HasSynced, + smRouteAgentPodsSynced: config.PodInformer.Informer().HasSynced, clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"), endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"), podWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"), } - clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + config.ClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueCluster, UpdateFunc: func(old, new interface{}) { controller.enqueueCluster(new) @@ -110,7 +119,7 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, DeleteFunc: controller.handleRemovedCluster, }) - endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + config.EndpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueEndpoint, UpdateFunc: func(old, new interface{}) { controller.enqueueEndpoint(new) @@ -118,7 +127,7 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, DeleteFunc: controller.handleRemovedEndpoint, }) - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + config.PodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueuePod, UpdateFunc: func(old, new interface{}) { controller.enqueuePod(new) @@ -230,11 +239,11 @@ func (r *Controller) processNextCluster() bool { key := obj.(string) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return fmt.Errorf("Error while splitting meta namespace key %s: %v", key, err) + return fmt.Errorf("error while splitting meta namespace key %s: %v", key, err) } cluster, err := r.submarinerClientSet.SubmarinerV1().Clusters(ns).Get(name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("Error retrieving submariner cluster object %s: %v", name, err) + return fmt.Errorf("error retrieving submariner cluster object %s: %v", name, err) } if cluster.Spec.ClusterID == r.clusterID { @@ -261,7 +270,7 @@ func (r *Controller) processNextCluster() bool { func (r *Controller) getVxlanVtepIPAddress(ipAddr string) (net.IP, error) { ipSlice := strings.Split(ipAddr, ".") if len(ipSlice) < 4 { - return nil, fmt.Errorf("Invalid ipAddr [%s]", ipAddr) + return nil, fmt.Errorf("invalid ipAddr [%s]", ipAddr) } ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix) @@ -286,18 +295,18 @@ func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { } } } - return nil, nil, err + return nil, nil, nil } func (r *Controller) createVxLANInterface(ifaceType int, gatewayNodeIP net.IP) error { ipAddr, vtepMask, err := r.getHostIfaceIPAddress() if err != nil { - return fmt.Errorf("Unable to retrieve the IPv4 address on the Host %v", err) + return fmt.Errorf("unable to retrieve the IPv4 address on the Host %v", err) } vtepIP, err := r.getVxlanVtepIPAddress(ipAddr.String()) if err != nil { - return fmt.Errorf("Failed to derive the vxlan vtepIP %v", err) + return fmt.Errorf("failed to derive the vxlan vtepIP for %s, %v", ipAddr, err) } if ifaceType == VxInterfaceGateway { @@ -312,20 +321,20 @@ func (r *Controller) createVxLANInterface(ifaceType int, gatewayNodeIP net.IP) e r.vxlanDevice, err = newVxlanIface(attrs) if err != nil { - return fmt.Errorf("Failed to create vxlan interface on Gateway Node: %v", err) + return fmt.Errorf("failed to create vxlan interface on Gateway Node: %v", err) } for fdbAddress := range r.remoteVTEPs.Set { err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00") if err != nil { - return fmt.Errorf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) + return fmt.Errorf("failed to add FDB entry on the Gateway Node vxlan iface %v", err) } } // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VxLANIface+"/rp_filter", []byte("2"), 0644) if err != nil { - return fmt.Errorf("Unable to update vxlan rp_filter proc entry, err: %s", err) + return fmt.Errorf("unable to update vxlan rp_filter proc entry, err: %s", err) } else { klog.Info("Successfully configured rp_filter to loose mode(2) ") } @@ -343,13 +352,13 @@ func (r *Controller) createVxLANInterface(ifaceType int, gatewayNodeIP net.IP) e r.vxlanDevice, err = newVxlanIface(attrs) if err != nil { - return fmt.Errorf("Failed to create vxlan interface on non-Gateway Node: %v", err) + return fmt.Errorf("failed to create vxlan interface on non-Gateway Node: %v", err) } } err = r.vxlanDevice.configureIPAddress(vtepIP, vtepMask.Mask) if err != nil { - return fmt.Errorf("Failed to configure vxlan interface ipaddress on the Gateway Node %v", err) + return fmt.Errorf("failed to configure vxlan interface ipaddress on the Gateway Node %v", err) } return nil @@ -366,12 +375,14 @@ func (r *Controller) processNextPod() bool { key := obj.(string) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return fmt.Errorf("Error while splitting meta namespace key %s: %v", key, err) + r.podWorkqueue.Forget(obj) + return fmt.Errorf("error while splitting meta namespace key %s: %v", key, err) } pod, err := r.clientSet.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("Error retrieving submariner-route-agent pod object %s: %v", name, err) + r.podWorkqueue.Forget(obj) + return fmt.Errorf("error retrieving submariner-route-agent pod object %s: %v", name, err) } klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP) @@ -383,7 +394,8 @@ func (r *Controller) processNextPod() bool { if r.vxlanDevice != nil { err := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") if err != nil { - return fmt.Errorf("Failed to add FDB entry on the Gateway Node vxlan iface %v", err) + r.podWorkqueue.Forget(obj) + return fmt.Errorf("failed to add FDB entry on the Gateway Node vxlan iface %v", err) } } else { r.podWorkqueue.AddRateLimited(obj) @@ -417,11 +429,11 @@ func (r *Controller) processNextEndpoint() bool { key := obj.(string) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return fmt.Errorf("Error while splitting meta namespace key %s: %v", key, err) + return fmt.Errorf("error while splitting meta namespace key %s: %v", key, err) } endpoint, err := r.submarinerClientSet.SubmarinerV1().Endpoints(ns).Get(name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("Error retrieving submariner endpoint object %s: %v", name, err) + return fmt.Errorf("error retrieving submariner endpoint object %s: %v", name, err) } if endpoint.Spec.ClusterID != r.clusterID { @@ -451,7 +463,7 @@ func (r *Controller) processNextEndpoint() bool { localClusterGwNodeIP := net.ParseIP(endpoint.Spec.PrivateIP) remoteVtepIP, err := r.getVxlanVtepIPAddress(localClusterGwNodeIP.String()) if err != nil { - return fmt.Errorf("Failed to derive the remoteVtepIP %v", err) + return fmt.Errorf("failed to derive the remoteVtepIP %v", err) } r.isGatewayNode = false @@ -463,7 +475,7 @@ func (r *Controller) processNextEndpoint() bool { err = r.reconcileRoutes(remoteVtepIP) if err != nil { r.endpointWorkqueue.AddRateLimited(obj) - return fmt.Errorf("Error while reconciling routes %v", err) + return fmt.Errorf("error while reconciling routes %v", err) } r.endpointWorkqueue.Forget(obj) @@ -623,13 +635,13 @@ func (r *Controller) cleanXfrmPolicies() { func (r *Controller) reconcileRoutes(vxlanGw net.IP) error { link, err := netlink.LinkByName(VxLANIface) if err != nil { - return fmt.Errorf("Error retrieving link by name %s: %v", VxLANIface, err) + return fmt.Errorf("error retrieving link by name %s: %v", VxLANIface, err) } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", VxLANIface, err) + return fmt.Errorf("error retrieving routes for link %s: %v", VxLANIface, err) } // First lets delete all of the routes that don't match @@ -653,7 +665,7 @@ func (r *Controller) reconcileRoutes(vxlanGw net.IP) error { currentRouteList, err = netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", VxLANIface, err) + return fmt.Errorf("error retrieving routes for link %s: %v", VxLANIface, err) } // let's now add the routes that are missing diff --git a/pkg/routeagent/controllers/route/vxlan.go b/pkg/routeagent/controllers/route/vxlan.go index a8e45f8ea..0754821d1 100644 --- a/pkg/routeagent/controllers/route/vxlan.go +++ b/pkg/routeagent/controllers/route/vxlan.go @@ -1,6 +1,7 @@ package route import ( + "fmt" "net" "syscall" @@ -56,24 +57,21 @@ func createVxLanIface(iface *vxLanIface) error { } if isVxlanConfigTheSame(iface.link, existing) { - klog.V(4).Infof("VxLAN interface already exists with same configuration.") + klog.V(6).Infof("VxLAN interface already exists with same configuration.") iface.link = existing.(*netlink.Vxlan) return nil } // Config does not match, delete the existing interface and re-create it. if err = netlink.LinkDel(existing); err != nil { - klog.Errorf("Failed to delete the existing vxlan interface: %v", err) - return err + return fmt.Errorf("failed to delete the existing vxlan interface: %v", err) } if err = netlink.LinkAdd(iface.link); err != nil { - klog.Errorf("Failed to re-create the the vxlan interface: %v", err) - return err + return fmt.Errorf("failed to re-create the the vxlan interface: %v", err) } } else if err != nil { - klog.Errorf("Failed to create the the vxlan interface: %v", err) - return err + return fmt.Errorf("failed to create the the vxlan interface: %v", err) } return nil @@ -82,8 +80,7 @@ func createVxLanIface(iface *vxLanIface) error { func (iface *vxLanIface) deleteVxLanIface() error { err := netlink.LinkDel(iface.link) if err != nil { - klog.Errorf("Failed to delete the the vxlan interface: %v", err) - return err + return fmt.Errorf("failed to delete the the vxlan interface: %v", err) } return nil @@ -100,17 +97,17 @@ func isVxlanConfigTheSame(new, current netlink.Link) bool { } if len(required.Group) > 0 && len(existing.Group) > 0 && !required.Group.Equal(existing.Group) { - klog.V(4).Infof("Vxlan Group of existing interface (%v) does not match with required Group (%v)", existing.Group, required.Group) + klog.V(4).Infof("Vxlan Group (%v) of existing interface does not match with required Group (%v)", existing.Group, required.Group) return false } if len(required.SrcAddr) > 0 && len(existing.SrcAddr) > 0 && !required.SrcAddr.Equal(existing.SrcAddr) { - klog.V(4).Infof("Vxlan SrcAddr of existing interface (%v) does not match with required SrcAddr (%v)", existing.SrcAddr, required.SrcAddr) + klog.V(4).Infof("Vxlan SrcAddr (%v) of existing interface does not match with required SrcAddr (%v)", existing.SrcAddr, required.SrcAddr) return false } if required.Port > 0 && existing.Port > 0 && required.Port != existing.Port { - klog.V(4).Infof("Vxlan Port of existing interface (%d) does not match with required Port (%d)", existing.Port, required.Port) + klog.V(4).Infof("Vxlan Port (%d) of existing interface does not match with required Port (%d)", existing.Port, required.Port) return false } @@ -127,8 +124,7 @@ func (iface *vxLanIface) configureIPAddress(ipAddress net.IP, mask net.IPMask) e if err == syscall.EEXIST { return nil } else if err != nil { - klog.Errorf("Unable to configure address (%s) on vxlan interface (%s). %v", ipAddress, iface.link.Name, err) - return err + return fmt.Errorf("unable to configure address (%s) on vxlan interface (%s). %v", ipAddress, iface.link.Name, err) } return nil } @@ -136,8 +132,7 @@ func (iface *vxLanIface) configureIPAddress(ipAddress net.IP, mask net.IPMask) e func (iface *vxLanIface) AddFDB(ipAddress net.IP, hwAddr string) error { macAddr, err := net.ParseMAC(hwAddr) if err != nil { - klog.Errorf("Invalid MAC Address (%s) supplied. %v", hwAddr, err) - return err + return fmt.Errorf("invalid MAC Address (%s) supplied. %v", hwAddr, err) } neigh := &netlink.Neigh{ @@ -152,8 +147,7 @@ func (iface *vxLanIface) AddFDB(ipAddress net.IP, hwAddr string) error { err = netlink.NeighAppend(neigh) if err != nil { - klog.Errorf("Unable to add the bridge fdb entry %v, err: %s", neigh, err) - return err + return fmt.Errorf("unable to add the bridge fdb entry %v, err: %s", neigh, err) } else { klog.V(4).Infof("Successfully added the bridge fdb entry %v", neigh) } @@ -163,8 +157,7 @@ func (iface *vxLanIface) AddFDB(ipAddress net.IP, hwAddr string) error { func (iface *vxLanIface) DelFDB(ipAddress net.IP, hwAddr string) error { macAddr, err := net.ParseMAC(hwAddr) if err != nil { - klog.Errorf("Invalid MAC Address (%s) supplied. %v", hwAddr, err) - return err + return fmt.Errorf("invalid MAC Address (%s) supplied. %v", hwAddr, err) } neigh := &netlink.Neigh{ @@ -179,8 +172,7 @@ func (iface *vxLanIface) DelFDB(ipAddress net.IP, hwAddr string) error { err = netlink.NeighDel(neigh) if err != nil { - klog.Errorf("Unable to delete the bridge fdb entry %v, err: %s", neigh, err) - return err + return fmt.Errorf("unable to delete the bridge fdb entry %v, err: %s", neigh, err) } else { klog.V(4).Infof("Successfully deleted the bridge fdb entry %v", neigh) } diff --git a/pkg/routeagent/main.go b/pkg/routeagent/main.go index 4457794db..135a8159c 100644 --- a/pkg/routeagent/main.go +++ b/pkg/routeagent/main.go @@ -65,14 +65,26 @@ func main() { clientSet, err := kubernetes.NewForConfig(cfg) if err != nil { - klog.Errorf("Error building clientset: %s", err.Error()) - return + klog.Fatalf("Error building clientset: %s", err.Error()) } informerFactory := informers.NewSharedInformerFactoryWithOptions(clientSet, time.Second*60, informers.WithNamespace(srcs.Namespace), informers.WithTweakListOptions(filterRouteAgentPods)) + informerConfig := route.InformerConfigStruct{ + SubmarinerClientSet: submarinerClient, + ClientSet: clientSet, + ClusterInformer: submarinerInformerFactory.Submariner().V1().Clusters(), + EndpointInformer: submarinerInformerFactory.Submariner().V1().Endpoints(), + PodInformer: informerFactory.Core().V1().Pods(), + } + defLink, err := util.GetDefaultGatewayInterface() - routeController := route.NewController(srcs.ClusterID, srcs.ClusterCidr, srcs.ServiceCidr, srcs.Namespace, defLink, submarinerClient, clientSet, submarinerInformerFactory.Submariner().V1().Clusters(), submarinerInformerFactory.Submariner().V1().Endpoints(), informerFactory.Core().V1().Pods()) + if err != nil { + klog.Errorf("Unable to find the default interface on host: %s", err.Error()) + return + } + + routeController := route.NewController(srcs.ClusterID, srcs.ClusterCidr, srcs.ServiceCidr, srcs.Namespace, defLink, informerConfig) submarinerInformerFactory.Start(stopCh) informerFactory.Start(stopCh) From abe5b0123ac1c8c73edea4cb5e0d35867ee58141 Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Tue, 10 Sep 2019 20:56:39 +0530 Subject: [PATCH 7/9] Updated prependUnique API to remove any stale flows --- pkg/routeagent/controllers/route/iptables.go | 39 +++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go index aa3def0ec..75b2ebe09 100644 --- a/pkg/routeagent/controllers/route/iptables.go +++ b/pkg/routeagent/controllers/route/iptables.go @@ -80,7 +80,43 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s return fmt.Errorf("error listing the rules in %s chain: %v", chain, err) } - if strings.Contains(rules[1], strings.Join(ruleSpec, " ")) { + /* Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain so that we can + preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes to the traffic. + + In this API, we check if the required iptable rule is present at the beginning of the chain. If the rule is + already present and there are no stale[1] flows, we simply return. If not, we create one. + + [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some new rules + ahead of the rule that we programmed. In such cases, the rule that we programmed will not be the first rule + to hit and Submariner behavior might get affected. So, we query the rules in the chain to see if the rule + slipped its position, and if so, delete all such occurrences. We then re-program a new rule at the beginning + of the chain as required. + */ + isPresentAtRequiredPosition := false + numOccurrences := 0 + for index, rule := range rules { + if strings.Contains(rule, strings.Join(ruleSpec, " ")) { + klog.V(4).Infof("In %s table, iptables rule \"%s\", exists at index %d.", index, table, strings.Join(ruleSpec, " ")) + numOccurrences++ + + if index == 1 { + isPresentAtRequiredPosition = true + } + } + } + + /* The required rule is present in the Chain, but either there are multiple occurrences or its not at the desired + location */ + if numOccurrences > 1 || isPresentAtRequiredPosition == false { + for i := 0; i < numOccurrences; i++ { + if err = ipt.Delete(table, chain, ruleSpec...); err != nil { + return fmt.Errorf("error deleting stale iptable rule \"%s\": %v", strings.Join(ruleSpec, " "), err) + } + } + } + + /* The required rule is present only once and is at the desired location */ + if numOccurrences == 1 && isPresentAtRequiredPosition == true { klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " ")) return nil } else { @@ -88,6 +124,7 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s return err } } + return nil } From 3a2ab8e6a7da69f7b250e5fabeaac0cf2deef8df Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Fri, 13 Sep 2019 23:14:08 +0530 Subject: [PATCH 8/9] Added sync.Mutex for protecting isGatewayNode and vxlanDevice --- pkg/routeagent/controllers/route/iptables.go | 29 +++++----- pkg/routeagent/controllers/route/route.go | 61 +++++++++++++------- pkg/routeagent/controllers/route/vxlan.go | 2 +- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go index 75b2ebe09..e1ae1d0f2 100644 --- a/pkg/routeagent/controllers/route/iptables.go +++ b/pkg/routeagent/controllers/route/iptables.go @@ -80,18 +80,17 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s return fmt.Errorf("error listing the rules in %s chain: %v", chain, err) } - /* Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain so that we can - preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes to the traffic. - - In this API, we check if the required iptable rule is present at the beginning of the chain. If the rule is - already present and there are no stale[1] flows, we simply return. If not, we create one. - - [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some new rules - ahead of the rule that we programmed. In such cases, the rule that we programmed will not be the first rule - to hit and Submariner behavior might get affected. So, we query the rules in the chain to see if the rule - slipped its position, and if so, delete all such occurrences. We then re-program a new rule at the beginning - of the chain as required. - */ + // Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain + // so that we can preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes + // to the traffic. + // In this API, we check if the required iptable rule is present at the beginning of the chain. + // If the rule is already present and there are no stale[1] flows, we simply return. If not, we create one. + // [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some + // new rules ahead of the rule that we programmed. In such cases, the rule that we programmed will + // not be the first rule to hit and Submariner behavior might get affected. So, we query the rules + // in the chain to see if the rule slipped its position, and if so, delete all such occurrences. + // We then re-program a new rule at the beginning of the chain as required. + isPresentAtRequiredPosition := false numOccurrences := 0 for index, rule := range rules { @@ -105,8 +104,8 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s } } - /* The required rule is present in the Chain, but either there are multiple occurrences or its not at the desired - location */ + // The required rule is present in the Chain, but either there are multiple occurrences or its + // not at the desired location if numOccurrences > 1 || isPresentAtRequiredPosition == false { for i := 0; i < numOccurrences; i++ { if err = ipt.Delete(table, chain, ruleSpec...); err != nil { @@ -115,7 +114,7 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s } } - /* The required rule is present only once and is at the desired location */ + // The required rule is present only once and is at the desired location if numOccurrences == 1 && isPresentAtRequiredPosition == true { klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " ")) return nil diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index 142550386..d7d982067 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -54,8 +54,9 @@ type Controller struct { localServiceCidr []string remoteSubnets *util.StringSet - vxlanDevice *vxLanIface - remoteVTEPs *util.StringSet + gwVxLanMutex *sync.Mutex + vxlanDevice *vxLanIface + remoteVTEPs *util.StringSet isGatewayNode bool defaultHostIface *net.Interface @@ -67,23 +68,24 @@ const ( VxInterfaceGateway = 1 VxLANPort = 4800 - /* Why VxLANVTepNetworkPrefix is 240? - On VxLAN interfaces we need a unique IPAddress which does not collide with the - host ip-address. This is going to be tricky as currently there is no specific - CIDR in K8s that can be used for this purpose. If really required, this can be - taken as an input from the user (i.e., as a configuration parameter). - But we want to avoid any additional inputs particularly if there is a way to automate it. - - So, the approach we are taking is to derive the VxLAN ip from the hostIPAddress as shown below. - Example: Say the host ipaddress is "192.168.1.100/16", we prepend 240 to the host-ip address - and configure it on the VxLAN interface (i.e., 240.168.1.100/16). - - The reason behind choosing 240 is that "240.0.0.0/4" is a Reserved IPAddress [*] which - normally will not be assigned on any of the hosts. Also, note that the VxLAN IPs that are - so configured are only used within the local cluster and traffic will not leave the cluster - with the VxLAN ipaddress. - - [*] https://en.wikipedia.org/wiki/Reserved_IP_addresses */ + // Why VxLANVTepNetworkPrefix is 240? + // On VxLAN interfaces we need a unique IPAddress which does not collide with the + // host ip-address. This is going to be tricky as currently there is no specific + // CIDR in K8s that can be used for this purpose. One option is to take this as an + // input from the user (i.e., as a configuration parameter), but we want to avoid + // any additional inputs particularly if there is a way to automate it. + + // So, the approach we are taking is to derive the VxLAN ip from the hostIPAddress + // as shown below. + // For example: Say, the host ipaddress is "192.168.1.100/16", we prepend 240 to the + // host-ip address, derive the vxlan vtepIP (i.e., 240.168.1.100/16) and configure it + // on the VxLAN interface. + + // The reason behind choosing 240 is that "240.0.0.0/4" is a Reserved IPAddress [*] + // which normally will not be assigned on any of the hosts. Also, note that the VxLAN + // IPs are only used within the local cluster and traffic will not leave the cluster + // with the VxLAN ipaddress. + // [*] https://en.wikipedia.org/wiki/Reserved_IP_addresses VxLANVTepNetworkPrefix = 240 SmPostRoutingChain = "SUBMARINER-POSTROUTING" @@ -106,6 +108,7 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, clustersSynced: config.ClusterInformer.Informer().HasSynced, endpointsSynced: config.EndpointInformer.Informer().HasSynced, smRouteAgentPodsSynced: config.PodInformer.Informer().HasSynced, + gwVxLanMutex: &sync.Mutex{}, clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"), endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"), podWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"), @@ -388,6 +391,9 @@ func (r *Controller) processNextPod() bool { klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP) r.populateRemoteVtepIps(pod.Status.PodIP) + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + // A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster. // On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node. if r.isGatewayNode { @@ -452,8 +458,12 @@ func (r *Controller) processNextEndpoint() bool { // If the endpoint hostname matches with our hostname, it implies we are on gateway node if endpoint.Spec.Hostname == hostname { r.cleanRoutes() + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + r.isGatewayNode = true - if r.createVxLANInterface(VxInterfaceGateway, nil) != nil { + err = r.createVxLANInterface(VxInterfaceGateway, nil) + if err != nil { klog.Fatalf("Unable to create VxLAN interface on GatewayNode (%s): %v", hostname, err) } klog.V(6).Infof("not reconciling routes because we appear to be the gateway host") @@ -466,10 +476,13 @@ func (r *Controller) processNextEndpoint() bool { return fmt.Errorf("failed to derive the remoteVtepIP %v", err) } + r.gwVxLanMutex.Lock() r.isGatewayNode = false - if r.createVxLANInterface(VxInterfaceWorker, localClusterGwNodeIP) != nil { + err = r.createVxLANInterface(VxInterfaceWorker, localClusterGwNodeIP) + if err != nil { klog.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err) } + r.gwVxLanMutex.Unlock() r.cleanXfrmPolicies() err = r.reconcileRoutes(remoteVtepIP) @@ -557,6 +570,9 @@ func (r *Controller) handleRemovedEndpoint(obj interface{}) { } if object.Spec.ClusterID == r.clusterID { + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + klog.V(6).Infof("Endpoint matches the cluster ID of this cluster") err := r.vxlanDevice.deleteVxLanIface() if err != nil { @@ -579,6 +595,9 @@ func (r *Controller) handleRemovedPod(obj interface{}) { if r.remoteVTEPs.Contains(pod.Status.HostIP) { r.remoteVTEPs.Delete(pod.Status.HostIP) + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + if r.isGatewayNode && r.vxlanDevice != nil { ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") if ret != nil { diff --git a/pkg/routeagent/controllers/route/vxlan.go b/pkg/routeagent/controllers/route/vxlan.go index 0754821d1..f7a81cd51 100644 --- a/pkg/routeagent/controllers/route/vxlan.go +++ b/pkg/routeagent/controllers/route/vxlan.go @@ -53,7 +53,7 @@ func createVxLanIface(iface *vxLanIface) error { // Get the properties of existing vxlan interface existing, err := netlink.LinkByName(iface.link.Name) if err != nil { - return err + return fmt.Errorf("failed to retrieve link info: %v", err) } if isVxlanConfigTheSame(iface.link, existing) { From dce88ec2ee334f83e7a06c64440cf8898734c60d Mon Sep 17 00:00:00 2001 From: Sridhar Gaddam Date: Mon, 16 Sep 2019 09:16:36 +0530 Subject: [PATCH 9/9] Addressed golangci-lint errors --- pkg/routeagent/controllers/route/iptables.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go index e1ae1d0f2..20fa82e34 100644 --- a/pkg/routeagent/controllers/route/iptables.go +++ b/pkg/routeagent/controllers/route/iptables.go @@ -95,7 +95,7 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s numOccurrences := 0 for index, rule := range rules { if strings.Contains(rule, strings.Join(ruleSpec, " ")) { - klog.V(4).Infof("In %s table, iptables rule \"%s\", exists at index %d.", index, table, strings.Join(ruleSpec, " ")) + klog.V(4).Infof("In %s table, iptables rule \"%s\", exists at index %d.", table, strings.Join(ruleSpec, " "), index) numOccurrences++ if index == 1 { @@ -106,7 +106,7 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s // The required rule is present in the Chain, but either there are multiple occurrences or its // not at the desired location - if numOccurrences > 1 || isPresentAtRequiredPosition == false { + if numOccurrences > 1 || !isPresentAtRequiredPosition { for i := 0; i < numOccurrences; i++ { if err = ipt.Delete(table, chain, ruleSpec...); err != nil { return fmt.Errorf("error deleting stale iptable rule \"%s\": %v", strings.Join(ruleSpec, " "), err) @@ -115,7 +115,7 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s } // The required rule is present only once and is at the desired location - if numOccurrences == 1 && isPresentAtRequiredPosition == true { + if numOccurrences == 1 && isPresentAtRequiredPosition { klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " ")) return nil } else {