diff --git a/go.mod b/go.mod index da02f1549..16d7f8842 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/vishvananda/netlink v1.0.0 github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect golang.org/x/oauth2 v0.0.0-20170412232759-a6bd8cefa181 // indirect + golang.org/x/sys v0.0.0-20190606165138-5da285871e9c golang.org/x/time v0.0.0-20161028155119-f51c12702a4d // indirect google.golang.org/appengine v1.6.1 // indirect gopkg.in/inf.v0 v0.0.0-20150911125757-3887ee99ecf0 // indirect diff --git a/pkg/cableengine/ipsec/ipsec.go b/pkg/cableengine/ipsec/ipsec.go index 248c29eff..6e88be251 100644 --- a/pkg/cableengine/ipsec/ipsec.go +++ b/pkg/cableengine/ipsec/ipsec.go @@ -2,12 +2,9 @@ package ipsec import ( "fmt" - "net" "reflect" - "strings" "sync" - "github.com/coreos/go-iptables/iptables" "github.com/submariner-io/submariner/pkg/cableengine" "github.com/submariner-io/submariner/pkg/types" "github.com/submariner-io/submariner/pkg/util" @@ -39,62 +36,6 @@ func NewEngine(localSubnets []string, localCluster types.SubmarinerCluster, loca func (i *engine) StartEngine() error { klog.Infof("Starting IPSec Engine (Charon)") - ifi, err := util.GetDefaultGatewayInterface() - if err != nil { - return err - } - - klog.V(8).Infof("Device of default gateway interface was %s", ifi.Name) - ipt, err := iptables.New() - if err != nil { - return fmt.Errorf("error while initializing iptables: %v", err) - } - - klog.V(6).Infof("Installing/ensuring the SUBMARINER-POSTROUTING and SUBMARINER-FORWARD chains") - if err = ipt.NewChain("nat", "SUBMARINER-POSTROUTING"); err != nil { - klog.Errorf("Unable to create SUBMARINER-POSTROUTING chain in iptables: %v", err) - } - - if err = ipt.NewChain("filter", "SUBMARINER-FORWARD"); err != nil { - klog.Errorf("Unable to create SUBMARINER-FORWARD chain in iptables: %v", err) - } - - forwardToSubPostroutingRuleSpec := []string{"-j", "SUBMARINER-POSTROUTING"} - if err = ipt.AppendUnique("nat", "POSTROUTING", forwardToSubPostroutingRuleSpec...); err != nil { - klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubPostroutingRuleSpec, " "), err) - } - - forwardToSubForwardRuleSpec := []string{"-j", "SUBMARINER-FORWARD"} - rules, err := ipt.List("filter", "FORWARD") - if err != nil { - return fmt.Errorf("error listing the rules in FORWARD chain: %v", err) - } - - appendAt := len(rules) + 1 - insertAt := appendAt - for i, rule := range rules { - if rule == "-A FORWARD -j SUBMARINER-FORWARD" { - insertAt = -1 - break - } else if rule == "-A FORWARD -j REJECT --reject-with icmp-host-prohibited" { - insertAt = i - break - } - } - - if insertAt == appendAt { - // Append the rule at the end of FORWARD Chain. - if err = ipt.Append("filter", "FORWARD", forwardToSubForwardRuleSpec...); err != nil { - klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubForwardRuleSpec, " "), err) - } - } else if insertAt > 0 { - // Insert the rule in the FORWARD Chain. - if err = ipt.Insert("filter", "FORWARD", insertAt, forwardToSubForwardRuleSpec...); err != nil { - klog.Errorf("Unable to insert iptables rule \"%s\" at position %d: %v\n", strings.Join(forwardToSubForwardRuleSpec, " "), - insertAt, err) - } - } - return i.driver.Init() } @@ -131,66 +72,7 @@ func (i *engine) InstallCable(endpoint types.SubmarinerEndpoint) error { if err != nil { return err } - - ifi, err := util.GetDefaultGatewayInterface() - if err != nil { - return err - } - klog.V(4).Infof("Device of default gateway interface was %s", ifi.Name) - ipt, err := iptables.New() - if err != nil { - return fmt.Errorf("error while initializing iptables: %v", err) - } - - addresses, err := ifi.Addrs() - if err != nil { - return err - } - - for _, addr := range addresses { - ipAddr, ipNet, err := net.ParseCIDR(addr.String()) - if err != nil { - klog.Errorf("Error while parsing CIDR %s: %v", addr.String(), err) - continue - } - - if ipAddr.To4() != nil { - for _, subnet := range endpoint.Spec.Subnets { - ruleSpec := []string{"-s", ipNet.String(), "-d", subnet, "-i", ifi.Name, "-j", "ACCEPT"} - klog.V(8).Infof("Installing iptables rule: %s", strings.Join(ruleSpec, " ")) - if err = ipt.AppendUnique("filter", "SUBMARINER-FORWARD", ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - - ruleSpec = []string{"-d", ipNet.String(), "-s", subnet, "-i", ifi.Name, "-j", "ACCEPT"} - klog.V(8).Infof("Installing iptables rule: %v", ruleSpec) - if err = ipt.AppendUnique("filter", "SUBMARINER-FORWARD", ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - - // -t nat -I POSTROUTING -s -d -j SNAT --to-source - ruleSpec = []string{"-s", ipNet.String(), "-d", subnet, "-j", "SNAT", "--to-source", ipAddr.String()} - klog.V(8).Infof("Installing iptables rule: %s", strings.Join(ruleSpec, " ")) - if err = ipt.AppendUnique("nat", "SUBMARINER-POSTROUTING", ruleSpec...); err != nil { - klog.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - } - } else { - klog.V(6).Infof("Skipping adding rule because IPv6 network %s found", ipNet.String()) - } - } - - // MASQUERADE (on the GatewayNode) the incoming traffic from the remote cluster (i.e, remoteEndpointIP) - // and destined to the local PODs (i.e., localSubnet) scheduled on the non-gateway node. - // This will make the return traffic from the POD to go via the GatewayNode. - for _, localSubnet := range i.localSubnets { - ruleSpec := []string{"-s", remoteEndpointIP, "-d", localSubnet, "-j", "MASQUERADE"} - klog.V(8).Infof("Installing iptables rule for MASQ incoming traffic: %v", ruleSpec) - if err = ipt.AppendUnique("nat", "SUBMARINER-POSTROUTING", ruleSpec...); err != nil { - klog.Errorf("error appending iptables MASQ rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) - } - } - + klog.V(4).Infof("Connected to remoteEndpointIP %s", remoteEndpointIP) return nil } diff --git a/pkg/routeagent/controllers/route/iptables.go b/pkg/routeagent/controllers/route/iptables.go new file mode 100644 index 000000000..20fa82e34 --- /dev/null +++ b/pkg/routeagent/controllers/route/iptables.go @@ -0,0 +1,144 @@ +package route + +import ( + "fmt" + "strconv" + "strings" + + "github.com/coreos/go-iptables/iptables" + "k8s.io/klog" +) + +func (r *Controller) createIPTableChains() error { + ipt, err := iptables.New() + if err != nil { + return fmt.Errorf("error initializing iptables: %v", err) + } + + klog.V(4).Infof("Install/ensure %s chain exists", SmPostRoutingChain) + if err = r.createChainIfNotExists(ipt, "nat", SmPostRoutingChain); err != nil { + return fmt.Errorf("Unable to create %s chain in iptables: %v", SmPostRoutingChain, err) + } + + klog.V(4).Infof("Insert %s rule that has rules for inter-cluster traffic", SmPostRoutingChain) + forwardToSubPostroutingRuleSpec := []string{"-j", SmPostRoutingChain} + if err = r.prependUnique(ipt, "nat", "POSTROUTING", forwardToSubPostroutingRuleSpec); err != nil { + klog.Errorf("Unable to insert iptable rule in NAT table, POSTROUTING chain: %v", err) + } + + klog.V(4).Infof("Install/ensure SUBMARINER-INPUT chain exists") + if err = r.createChainIfNotExists(ipt, "filter", "SUBMARINER-INPUT"); err != nil { + return fmt.Errorf("Unable to create SUBMARINER-INPUT chain in iptables: %v", err) + } + + forwardToSubInputRuleSpec := []string{"-p", "udp", "-m", "udp", "-j", "SUBMARINER-INPUT"} + if err = ipt.AppendUnique("filter", "INPUT", forwardToSubInputRuleSpec...); err != nil { + klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(forwardToSubInputRuleSpec, " "), err) + } + + klog.V(4).Infof("Allow VxLAN incoming traffic in SUBMARINER-INPUT Chain") + ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", strconv.Itoa(VxLANPort), "-j", "ACCEPT"} + if err = ipt.AppendUnique("filter", "SUBMARINER-INPUT", ruleSpec...); err != nil { + klog.Errorf("Unable to append iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + } + + klog.V(4).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface) + ruleSpec = []string{"-o", VxLANIface, "-j", "ACCEPT"} + if err = r.prependUnique(ipt, "filter", "FORWARD", ruleSpec); err != nil { + klog.Errorf("Unable to insert iptable rule in filter table to allow vxlan traffic: %v", err) + } + + return nil +} + +func (r *Controller) programIptableRulesForInterClusterTraffic(remoteCidrBlock string) error { + ipt, err := iptables.New() + if err != nil { + return fmt.Errorf("error initializing iptables: %v", err) + } + + for _, localClusterCidr := range r.localClusterCidr { + ruleSpec := []string{"-s", localClusterCidr, "-d", remoteCidrBlock, "-j", "ACCEPT"} + klog.V(4).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(ruleSpec, " ")) + if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { + return fmt.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + } + + // TODO: revisit, we only have to program rules to allow traffic from the podCidr + ruleSpec = []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"} + klog.V(4).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(ruleSpec, " ")) + if err = ipt.AppendUnique("nat", SmPostRoutingChain, ruleSpec...); err != nil { + return fmt.Errorf("error appending iptables rule \"%s\": %v\n", strings.Join(ruleSpec, " "), err) + } + } + return nil +} + +func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain string, ruleSpec []string) error { + rules, err := ipt.List(table, chain) + if err != nil { + return fmt.Errorf("error listing the rules in %s chain: %v", chain, err) + } + + // Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain + // so that we can preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes + // to the traffic. + // In this API, we check if the required iptable rule is present at the beginning of the chain. + // If the rule is already present and there are no stale[1] flows, we simply return. If not, we create one. + // [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some + // new rules ahead of the rule that we programmed. In such cases, the rule that we programmed will + // not be the first rule to hit and Submariner behavior might get affected. So, we query the rules + // in the chain to see if the rule slipped its position, and if so, delete all such occurrences. + // We then re-program a new rule at the beginning of the chain as required. + + isPresentAtRequiredPosition := false + numOccurrences := 0 + for index, rule := range rules { + if strings.Contains(rule, strings.Join(ruleSpec, " ")) { + klog.V(4).Infof("In %s table, iptables rule \"%s\", exists at index %d.", table, strings.Join(ruleSpec, " "), index) + numOccurrences++ + + if index == 1 { + isPresentAtRequiredPosition = true + } + } + } + + // The required rule is present in the Chain, but either there are multiple occurrences or its + // not at the desired location + if numOccurrences > 1 || !isPresentAtRequiredPosition { + for i := 0; i < numOccurrences; i++ { + if err = ipt.Delete(table, chain, ruleSpec...); err != nil { + return fmt.Errorf("error deleting stale iptable rule \"%s\": %v", strings.Join(ruleSpec, " "), err) + } + } + } + + // The required rule is present only once and is at the desired location + if numOccurrences == 1 && isPresentAtRequiredPosition { + klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " ")) + return nil + } else { + if err = ipt.Insert(table, chain, 1, ruleSpec...); err != nil { + return err + } + } + + return nil +} + +func (r *Controller) createChainIfNotExists(ipt *iptables.IPTables, table, chain string) error { + existingChains, err := ipt.ListChains(table) + if err != nil { + return err + } + + for _, val := range existingChains { + if val == chain { + // Chain already exists + return nil + } + } + + return ipt.NewChain(table, chain) +} diff --git a/pkg/routeagent/controllers/route/route.go b/pkg/routeagent/controllers/route/route.go index f22707ec7..d7d982067 100644 --- a/pkg/routeagent/controllers/route/route.go +++ b/pkg/routeagent/controllers/route/route.go @@ -2,8 +2,11 @@ package route import ( "fmt" + "io/ioutil" "net" "os" + "strconv" + "strings" "sync" "syscall" "time" @@ -11,46 +14,107 @@ import ( v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" clientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" informers "github.com/submariner-io/submariner/pkg/client/informers/externalversions/submariner.io/v1" + "github.com/submariner-io/submariner/pkg/util" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + k8sv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + podinformer "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) +type InformerConfigStruct struct { + SubmarinerClientSet clientset.Interface + ClientSet kubernetes.Interface + ClusterInformer informers.ClusterInformer + EndpointInformer informers.EndpointInformer + PodInformer podinformer.PodInformer +} + type Controller struct { clusterID string objectNamespace string - submarinerClientSet clientset.Interface - clustersSynced cache.InformerSynced - endpointsSynced cache.InformerSynced + submarinerClientSet clientset.Interface + clientSet kubernetes.Interface + clustersSynced cache.InformerSynced + endpointsSynced cache.InformerSynced + smRouteAgentPodsSynced cache.InformerSynced clusterWorkqueue workqueue.RateLimitingInterface endpointWorkqueue workqueue.RateLimitingInterface + podWorkqueue workqueue.RateLimitingInterface + + localClusterCidr []string + localServiceCidr []string + remoteSubnets *util.StringSet - gw net.IP - subnets []string + gwVxLanMutex *sync.Mutex + vxlanDevice *vxLanIface + remoteVTEPs *util.StringSet - link *net.Interface + isGatewayNode bool + defaultHostIface *net.Interface } -func NewController(clusterID string, objectNamespace string, link *net.Interface, submarinerClientSet clientset.Interface, - clusterInformer informers.ClusterInformer, endpointInformer informers.EndpointInformer) *Controller { +const ( + VxLANIface = "vx-submariner" + VxInterfaceWorker = 0 + VxInterfaceGateway = 1 + VxLANPort = 4800 + + // Why VxLANVTepNetworkPrefix is 240? + // On VxLAN interfaces we need a unique IPAddress which does not collide with the + // host ip-address. This is going to be tricky as currently there is no specific + // CIDR in K8s that can be used for this purpose. One option is to take this as an + // input from the user (i.e., as a configuration parameter), but we want to avoid + // any additional inputs particularly if there is a way to automate it. + + // So, the approach we are taking is to derive the VxLAN ip from the hostIPAddress + // as shown below. + // For example: Say, the host ipaddress is "192.168.1.100/16", we prepend 240 to the + // host-ip address, derive the vxlan vtepIP (i.e., 240.168.1.100/16) and configure it + // on the VxLAN interface. + + // The reason behind choosing 240 is that "240.0.0.0/4" is a Reserved IPAddress [*] + // which normally will not be assigned on any of the hosts. Also, note that the VxLAN + // IPs are only used within the local cluster and traffic will not leave the cluster + // with the VxLAN ipaddress. + // [*] https://en.wikipedia.org/wiki/Reserved_IP_addresses + + VxLANVTepNetworkPrefix = 240 + SmPostRoutingChain = "SUBMARINER-POSTROUTING" + SmRouteAgentFilter = "app=submariner-routeagent" +) + +func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string, objectNamespace string, + link *net.Interface, config InformerConfigStruct) *Controller { controller := Controller{ - clusterID: clusterID, - objectNamespace: objectNamespace, - submarinerClientSet: submarinerClientSet, - link: link, - clustersSynced: clusterInformer.Informer().HasSynced, - endpointsSynced: endpointInformer.Informer().HasSynced, - clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"), - endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"), + clusterID: clusterID, + objectNamespace: objectNamespace, + localClusterCidr: ClusterCidr, + localServiceCidr: ServiceCidr, + submarinerClientSet: config.SubmarinerClientSet, + clientSet: config.ClientSet, + defaultHostIface: link, + isGatewayNode: false, + remoteSubnets: util.NewStringSet(), + remoteVTEPs: util.NewStringSet(), + clustersSynced: config.ClusterInformer.Informer().HasSynced, + endpointsSynced: config.EndpointInformer.Informer().HasSynced, + smRouteAgentPodsSynced: config.PodInformer.Informer().HasSynced, + gwVxLanMutex: &sync.Mutex{}, + clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"), + endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"), + podWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"), } - clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + config.ClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueCluster, UpdateFunc: func(old, new interface{}) { controller.enqueueCluster(new) @@ -58,7 +122,7 @@ func NewController(clusterID string, objectNamespace string, link *net.Interface DeleteFunc: controller.handleRemovedCluster, }) - endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + config.EndpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueEndpoint, UpdateFunc: func(old, new interface{}) { controller.enqueueEndpoint(new) @@ -66,6 +130,14 @@ func NewController(clusterID string, objectNamespace string, link *net.Interface DeleteFunc: controller.handleRemovedEndpoint, }) + config.PodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueuePod, + UpdateFunc: func(old, new interface{}) { + controller.enqueuePod(new) + }, + DeleteFunc: controller.handleRemovedPod, + }) + return &controller } @@ -75,31 +147,48 @@ func (r *Controller) Run(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() // Start the informer factories to begin populating the informer caches - klog.Info("Starting Route Controller") + klog.Infof("Starting Route Controller. ClusterID: %s, localClusterCIDR: %v, localServiceCIDR: %v", r.clusterID, r.localClusterCidr, r.localServiceCidr) // Wait for the caches to be synced before starting workers - klog.Info("Waiting for endpoint informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, r.endpointsSynced, r.clustersSynced); !ok { + klog.Info("Waiting for endpoint informer caches to sync.") + if ok := cache.WaitForCacheSync(stopCh, r.endpointsSynced, r.clustersSynced, r.smRouteAgentPodsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } - // let's go ahead and pre-populate clusters + // Create the necessary IPTable chains in the filter and nat tables. + err := r.createIPTableChains() + if err != nil { + return fmt.Errorf("createIPTableChains returned error. %v", err) + } + // let's go ahead and pre-populate clusters clusters, err := r.submarinerClientSet.SubmarinerV1().Clusters(r.objectNamespace).List(metav1.ListOptions{}) - if err != nil { - klog.Fatalf("error while retrieving all clusters: %v", err) + return fmt.Errorf("error while retrieving all clusters: %v", err) } + // Program iptables rules for traffic destined to all the remote cluster CIDRs for _, cluster := range clusters.Items { if cluster.Spec.ClusterID != r.clusterID { - r.populateCidrBlockList(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) + r.updateIptableRulesForInterclusterTraffic(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) } } + // Query all the submariner-route-agent daemonSet PODs running in the local cluster. + podList, err := r.clientSet.CoreV1().Pods(r.objectNamespace).List(metav1.ListOptions{LabelSelector: SmRouteAgentFilter}) + if err != nil { + return fmt.Errorf("error while retrieving submariner-route-agent pods: %v", err) + } + + for index, pod := range podList.Items { + klog.V(4).Infof("In %s, podIP of submariner-route-agent[%d] is %s", r.clusterID, index, pod.Status.PodIP) + r.populateRemoteVtepIps(pod.Status.PodIP) + } + klog.Info("Starting workers") go wait.Until(r.runClusterWorker, time.Second, stopCh) go wait.Until(r.runEndpointWorker, time.Second, stopCh) + go wait.Until(r.runPodWorker, time.Second, stopCh) wg.Wait() <-stopCh klog.Info("Shutting down workers") @@ -118,14 +207,30 @@ func (r *Controller) runEndpointWorker() { } } -func (r *Controller) populateCidrBlockList(inputCidrBlocks []string) { - for _, cidrBlock := range inputCidrBlocks { - if !containsString(r.subnets, cidrBlock) { - r.subnets = append(r.subnets, cidrBlock) +func (r *Controller) runPodWorker() { + for r.processNextPod() { + + } +} + +func (r *Controller) updateIptableRulesForInterclusterTraffic(inputCidrBlocks []string) { + for _, inputCidrBlock := range inputCidrBlocks { + if !r.remoteSubnets.Contains(inputCidrBlock) { + r.remoteSubnets.Add(inputCidrBlock) + err := r.programIptableRulesForInterClusterTraffic(inputCidrBlock) + if err != nil { + klog.Errorf("Failed to program iptable rule. %v", err) + } } } } +func (r *Controller) populateRemoteVtepIps(vtepIP string) { + if !r.remoteVTEPs.Contains(vtepIP) { + r.remoteVTEPs.Add(vtepIP) + } +} + func (r *Controller) processNextCluster() bool { obj, shutdown := r.clusterWorkqueue.Get() if shutdown { @@ -137,11 +242,11 @@ func (r *Controller) processNextCluster() bool { key := obj.(string) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return fmt.Errorf("Error while splitting meta namespace key %s: %v", key, err) + return fmt.Errorf("error while splitting meta namespace key %s: %v", key, err) } cluster, err := r.submarinerClientSet.SubmarinerV1().Clusters(ns).Get(name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("Error retrieving submariner cluster object %s: %v", name, err) + return fmt.Errorf("error retrieving submariner cluster object %s: %v", name, err) } if cluster.Spec.ClusterID == r.clusterID { @@ -150,7 +255,7 @@ func (r *Controller) processNextCluster() bool { // no need to reconcile because this endpoint isn't ours } - r.populateCidrBlockList(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) + r.updateIptableRulesForInterclusterTraffic(append(cluster.Spec.ClusterCIDR, cluster.Spec.ServiceCIDR...)) r.clusterWorkqueue.Forget(obj) klog.V(4).Infof("cluster processed by route controller") @@ -165,6 +270,159 @@ func (r *Controller) processNextCluster() bool { return true } +func (r *Controller) getVxlanVtepIPAddress(ipAddr string) (net.IP, error) { + ipSlice := strings.Split(ipAddr, ".") + if len(ipSlice) < 4 { + return nil, fmt.Errorf("invalid ipAddr [%s]", ipAddr) + } + + ipSlice[0] = strconv.Itoa(VxLANVTepNetworkPrefix) + vxlanIP := net.ParseIP(strings.Join(ipSlice, ".")) + return vxlanIP, nil +} + +func (r *Controller) getHostIfaceIPAddress() (net.IP, *net.IPNet, error) { + addrs, err := r.defaultHostIface.Addrs() + if err != nil { + return nil, nil, err + } + + if len(addrs) > 0 { + for i := range addrs { + ipAddr, ipNetwork, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + klog.Errorf("Unable to ParseCIDR : %v\n", addrs) + } + if ipAddr.To4() != nil { + return ipAddr, ipNetwork, nil + } + } + } + return nil, nil, nil +} + +func (r *Controller) createVxLANInterface(ifaceType int, gatewayNodeIP net.IP) error { + ipAddr, vtepMask, err := r.getHostIfaceIPAddress() + if err != nil { + return fmt.Errorf("unable to retrieve the IPv4 address on the Host %v", err) + } + + vtepIP, err := r.getVxlanVtepIPAddress(ipAddr.String()) + if err != nil { + return fmt.Errorf("failed to derive the vxlan vtepIP for %s, %v", ipAddr, err) + } + + if ifaceType == VxInterfaceGateway { + attrs := &vxLanAttributes{ + name: VxLANIface, + vxlanId: 100, + group: nil, + srcAddr: nil, + vtepPort: VxLANPort, + mtu: 1450, + } + + r.vxlanDevice, err = newVxlanIface(attrs) + if err != nil { + return fmt.Errorf("failed to create vxlan interface on Gateway Node: %v", err) + } + + for fdbAddress := range r.remoteVTEPs.Set { + err = r.vxlanDevice.AddFDB(net.ParseIP(fdbAddress), "00:00:00:00:00:00") + if err != nil { + return fmt.Errorf("failed to add FDB entry on the Gateway Node vxlan iface %v", err) + } + } + + // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/"+VxLANIface+"/rp_filter", []byte("2"), 0644) + if err != nil { + return fmt.Errorf("unable to update vxlan rp_filter proc entry, err: %s", err) + } else { + klog.Info("Successfully configured rp_filter to loose mode(2) ") + } + + } else if ifaceType == VxInterfaceWorker { + // non-Gateway/Worker Node + attrs := &vxLanAttributes{ + name: VxLANIface, + vxlanId: 100, + group: gatewayNodeIP, + srcAddr: vtepIP, + vtepPort: VxLANPort, + mtu: 1450, + } + + r.vxlanDevice, err = newVxlanIface(attrs) + if err != nil { + return fmt.Errorf("failed to create vxlan interface on non-Gateway Node: %v", err) + } + } + + err = r.vxlanDevice.configureIPAddress(vtepIP, vtepMask.Mask) + if err != nil { + return fmt.Errorf("failed to configure vxlan interface ipaddress on the Gateway Node %v", err) + } + + return nil +} + +func (r *Controller) processNextPod() bool { + obj, shutdown := r.podWorkqueue.Get() + if shutdown { + return false + } + err := func() error { + defer r.podWorkqueue.Done(obj) + + key := obj.(string) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + r.podWorkqueue.Forget(obj) + return fmt.Errorf("error while splitting meta namespace key %s: %v", key, err) + } + + pod, err := r.clientSet.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) + if err != nil { + r.podWorkqueue.Forget(obj) + return fmt.Errorf("error retrieving submariner-route-agent pod object %s: %v", name, err) + } + + klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP) + r.populateRemoteVtepIps(pod.Status.PodIP) + + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + + // A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster. + // On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node. + if r.isGatewayNode { + if r.vxlanDevice != nil { + err := r.vxlanDevice.AddFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") + if err != nil { + r.podWorkqueue.Forget(obj) + return fmt.Errorf("failed to add FDB entry on the Gateway Node vxlan iface %v", err) + } + } else { + r.podWorkqueue.AddRateLimited(obj) + klog.Errorf("vxlanDevice is not yet created on the Gateway node") + return nil + } + } + + r.podWorkqueue.Forget(obj) + klog.V(4).Infof("Pod event processed by route controller") + return nil + }() + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + func (r *Controller) processNextEndpoint() bool { obj, shutdown := r.endpointWorkqueue.Get() if shutdown { @@ -177,11 +435,11 @@ func (r *Controller) processNextEndpoint() bool { key := obj.(string) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return fmt.Errorf("Error while splitting meta namespace key %s: %v", key, err) + return fmt.Errorf("error while splitting meta namespace key %s: %v", key, err) } endpoint, err := r.submarinerClientSet.SubmarinerV1().Endpoints(ns).Get(name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("Error retrieving submariner endpoint object %s: %v", name, err) + return fmt.Errorf("error retrieving submariner endpoint object %s: %v", name, err) } if endpoint.Spec.ClusterID != r.clusterID { @@ -195,21 +453,42 @@ func (r *Controller) processNextEndpoint() bool { klog.Fatalf("unable to determine hostname: %v", err) } + klog.V(6).Infof("Local Cluster Gateway Node IP is %s", endpoint.Spec.PrivateIP) + + // If the endpoint hostname matches with our hostname, it implies we are on gateway node if endpoint.Spec.Hostname == hostname { r.cleanRoutes() + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + + r.isGatewayNode = true + err = r.createVxLANInterface(VxInterfaceGateway, nil) + if err != nil { + klog.Fatalf("Unable to create VxLAN interface on GatewayNode (%s): %v", hostname, err) + } klog.V(6).Infof("not reconciling routes because we appear to be the gateway host") return nil } - klog.V(6).Infof("Setting gateway to gw: %s", endpoint.Spec.PrivateIP) + localClusterGwNodeIP := net.ParseIP(endpoint.Spec.PrivateIP) + remoteVtepIP, err := r.getVxlanVtepIPAddress(localClusterGwNodeIP.String()) + if err != nil { + return fmt.Errorf("failed to derive the remoteVtepIP %v", err) + } - r.gw = net.ParseIP(endpoint.Spec.PrivateIP) - r.cleanXfrmPolicies() - err = r.reconcileRoutes() + r.gwVxLanMutex.Lock() + r.isGatewayNode = false + err = r.createVxLANInterface(VxInterfaceWorker, localClusterGwNodeIP) + if err != nil { + klog.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err) + } + r.gwVxLanMutex.Unlock() + r.cleanXfrmPolicies() + err = r.reconcileRoutes(remoteVtepIP) if err != nil { r.endpointWorkqueue.AddRateLimited(obj) - return fmt.Errorf("Error while reconciling routes %v", err) + return fmt.Errorf("error while reconciling routes %v", err) } r.endpointWorkqueue.Forget(obj) @@ -247,6 +526,22 @@ func (r *Controller) enqueueEndpoint(obj interface{}) { r.endpointWorkqueue.AddRateLimited(key) } +func (r *Controller) enqueuePod(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + + pod := obj.(*k8sv1.Pod) + // Add the POD event to the workqueue only if the sm-route-agent podIP does not exist in the local cache. + if !r.remoteVTEPs.Contains(pod.Status.HostIP) { + klog.V(4).Infof("Enqueueing sm-route-agent-pod event, ip: %s", pod.Status.HostIP) + r.podWorkqueue.AddRateLimited(key) + } +} + func (r *Controller) handleRemovedEndpoint(obj interface{}) { // ideally we should attempt to remove all routes if the endpoint matches our cluster ID var object *v1.Endpoint @@ -273,6 +568,20 @@ func (r *Controller) handleRemovedEndpoint(obj interface{}) { if object.Spec.Hostname == hostname { r.cleanRoutes() } + + if object.Spec.ClusterID == r.clusterID { + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + + klog.V(6).Infof("Endpoint matches the cluster ID of this cluster") + err := r.vxlanDevice.deleteVxLanIface() + if err != nil { + klog.Errorf("Failed to delete the the vxlan interface on endpoint removal: %v", err) + return + } + r.vxlanDevice = nil + } + klog.V(4).Infof("Removed routes from host") } @@ -280,15 +589,33 @@ func (r *Controller) handleRemovedCluster(obj interface{}) { // ideally we should attempt to remove all routes if the endpoint matches our cluster ID } +func (r *Controller) handleRemovedPod(obj interface{}) { + klog.V(6).Infof("Removing podIP in route controller %v", obj) + pod := obj.(*k8sv1.Pod) + + if r.remoteVTEPs.Contains(pod.Status.HostIP) { + r.remoteVTEPs.Delete(pod.Status.HostIP) + r.gwVxLanMutex.Lock() + defer r.gwVxLanMutex.Unlock() + + if r.isGatewayNode && r.vxlanDevice != nil { + ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00") + if ret != nil { + klog.Errorf("Failed to delete FDB entry on the Gateway Node vxlan iface %v", ret) + } + } + } +} + func (r *Controller) cleanRoutes() { - link, err := netlink.LinkByName(r.link.Name) + link, err := netlink.LinkByName(VxLANIface) if err != nil { - klog.Errorf("Error retrieving link by name %s: %v", r.link.Name, err) + klog.Errorf("Error retrieving link by name %s: %v", VxLANIface, err) return } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - klog.Errorf("Error retrieving routes: %v", err) + klog.Errorf("Error retrieving routes on the link %s: %v", VxLANIface, err) return } for _, route := range currentRouteList { @@ -296,7 +623,7 @@ func (r *Controller) cleanRoutes() { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if containsString(r.subnets, route.Dst.String()) { + if r.remoteSubnets.Contains(route.Dst.String()) { klog.V(6).Infof("Removing route %s", route.String()) if err = netlink.RouteDel(&route); err != nil { klog.Errorf("Error removing route %s: %v", route.String(), err) @@ -324,16 +651,16 @@ func (r *Controller) cleanXfrmPolicies() { } // Reconcile the routes installed on this device using rtnetlink -func (r *Controller) reconcileRoutes() error { - link, err := netlink.LinkByName(r.link.Name) +func (r *Controller) reconcileRoutes(vxlanGw net.IP) error { + link, err := netlink.LinkByName(VxLANIface) if err != nil { - return fmt.Errorf("Error retrieving link by name %s: %v", r.link.Name, err) + return fmt.Errorf("error retrieving link by name %s: %v", VxLANIface, err) } currentRouteList, err := netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", r.link.Name, err) + return fmt.Errorf("error retrieving routes for link %s: %v", VxLANIface, err) } // First lets delete all of the routes that don't match @@ -343,7 +670,7 @@ func (r *Controller) reconcileRoutes() error { if route.Dst == nil || route.Gw == nil { klog.V(6).Infof("Found nil gw or dst") } else { - if containsString(r.subnets, route.Dst.String()) && route.Gw.Equal(r.gw) { + if r.remoteSubnets.Contains(route.Dst.String()) && route.Gw.Equal(vxlanGw) { klog.V(6).Infof("Found route %s with gw %s already installed", route.String(), route.Gw.String()) } else { klog.V(6).Infof("Removing route %s", route.String()) @@ -357,11 +684,11 @@ func (r *Controller) reconcileRoutes() error { currentRouteList, err = netlink.RouteList(link, syscall.AF_INET) if err != nil { - return fmt.Errorf("Error retrieving routes for link %s: %v", r.link.Name, err) + return fmt.Errorf("error retrieving routes for link %s: %v", VxLANIface, err) } // let's now add the routes that are missing - for _, cidrBlock := range r.subnets { + for cidrBlock := range r.remoteSubnets.Set { _, dst, err := net.ParseCIDR(cidrBlock) if err != nil { klog.Errorf("Error parsing cidr block %s: %v", cidrBlock, err) @@ -369,15 +696,17 @@ func (r *Controller) reconcileRoutes() error { } route := netlink.Route{ Dst: dst, - Gw: r.gw, + Gw: vxlanGw, + Scope: unix.RT_SCOPE_UNIVERSE, LinkIndex: link.Attrs().Index, + Protocol: 4, } found := false for _, curRoute := range currentRouteList { if curRoute.Gw == nil || curRoute.Dst == nil { } else { - if curRoute.Gw.Equal(route.Gw) && curRoute.Dst == route.Dst { + if curRoute.Gw.Equal(route.Gw) && curRoute.Dst.String() == route.Dst.String() { klog.V(6).Infof("Found equivalent route, not adding") found = true } @@ -393,12 +722,3 @@ func (r *Controller) reconcileRoutes() error { } return nil } - -func containsString(c []string, s string) bool { - for _, v := range c { - if v == s { - return true - } - } - return false -} diff --git a/pkg/routeagent/controllers/route/route_test.go b/pkg/routeagent/controllers/route/route_test.go index edde3256c..ca1c4d810 100644 --- a/pkg/routeagent/controllers/route/route_test.go +++ b/pkg/routeagent/controllers/route/route_test.go @@ -1,44 +1,30 @@ package route import ( + "strconv" "testing" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -var _ = Describe("Route", func() { - Describe("Function populateCidrBlockList", func() { - Context("When input CIDR blocks are not present in the existing subnets", func() { - It("Should append the CIDR blocks to subnets", func() { - routeController := Controller{subnets: []string{"192.168.1.0/24"}} - routeController.populateCidrBlockList([]string{"10.10.10.0/24", "192.168.1.0/24"}) - want := []string{"192.168.1.0/24", "10.10.10.0/24"} - Expect(routeController.subnets).To(Equal(want)) +var _ = Describe("getVxlanVtepIPAddress", func() { + Describe("Unit tests for getVxlanVtepIPAddress", func() { + Context("When a valid ipaddress is provided to getVxlanVtepIPAddress", func() { + It("Should return the VxLAN VtepIP that can be configured", func() { + routeController := Controller{} + vtepIP, _ := routeController.getVxlanVtepIPAddress("192.168.100.24") + Expect(vtepIP.String()).Should(Equal(strconv.Itoa(VxLANVTepNetworkPrefix) + ".168.100.24")) }) }) - Context("When input CIDR blocks are present in the existing subnets", func() { - It("Should not append the CIDR blocks to subnets", func() { - routeController := Controller{subnets: []string{"10.10.10.0/24"}} - routeController.populateCidrBlockList([]string{"10.10.10.0/24", "192.168.1.0/24"}) - want := []string{"10.10.10.0/24", "192.168.1.0/24"} - Expect(routeController.subnets).To(Equal(want)) - }) - }) - }) - Describe("Function containsString", func() { - Context("When the given array of strings contains specified string", func() { - It("Should return true", func() { - Expect(containsString([]string{"unit", "test"}, "unit")).To(BeTrue()) + Context("When an invalid ipaddress is provided to getVxlanVtepIPAddress", func() { + It("Should return an error", func() { + routeController := Controller{} + _, err := routeController.getVxlanVtepIPAddress("10.0.0") + Expect(err).ShouldNot(Equal(nil)) }) }) - Context("When the given array of strings does not contain specified string", func() { - It("Should return false", func() { - Expect(containsString([]string{"unit", "test"}, "ginkgo")).To(BeFalse()) - }) - - }) }) }) diff --git a/pkg/routeagent/controllers/route/vxlan.go b/pkg/routeagent/controllers/route/vxlan.go new file mode 100644 index 000000000..f7a81cd51 --- /dev/null +++ b/pkg/routeagent/controllers/route/vxlan.go @@ -0,0 +1,180 @@ +package route + +import ( + "fmt" + "net" + "syscall" + + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "k8s.io/klog" +) + +type vxLanAttributes struct { + name string + vxlanId int + group net.IP + srcAddr net.IP + vtepPort int + mtu int +} + +type vxLanIface struct { + link *netlink.Vxlan +} + +func newVxlanIface(attrs *vxLanAttributes) (*vxLanIface, error) { + iface := &netlink.Vxlan{ + LinkAttrs: netlink.LinkAttrs{ + Name: attrs.name, + MTU: attrs.mtu, + Flags: net.FlagUp, + }, + VxlanId: attrs.vxlanId, + SrcAddr: attrs.srcAddr, + Group: attrs.group, + Port: attrs.vtepPort, + } + + vxLANIface := &vxLanIface{ + link: iface, + } + + if err := createVxLanIface(vxLANIface); err != nil { + return nil, err + } + + return vxLANIface, nil +} + +func createVxLanIface(iface *vxLanIface) error { + err := netlink.LinkAdd(iface.link) + if err == syscall.EEXIST { + // Get the properties of existing vxlan interface + existing, err := netlink.LinkByName(iface.link.Name) + if err != nil { + return fmt.Errorf("failed to retrieve link info: %v", err) + } + + if isVxlanConfigTheSame(iface.link, existing) { + klog.V(6).Infof("VxLAN interface already exists with same configuration.") + iface.link = existing.(*netlink.Vxlan) + return nil + } + + // Config does not match, delete the existing interface and re-create it. + if err = netlink.LinkDel(existing); err != nil { + return fmt.Errorf("failed to delete the existing vxlan interface: %v", err) + } + + if err = netlink.LinkAdd(iface.link); err != nil { + return fmt.Errorf("failed to re-create the the vxlan interface: %v", err) + } + } else if err != nil { + return fmt.Errorf("failed to create the the vxlan interface: %v", err) + } + + return nil +} + +func (iface *vxLanIface) deleteVxLanIface() error { + err := netlink.LinkDel(iface.link) + if err != nil { + return fmt.Errorf("failed to delete the the vxlan interface: %v", err) + } + + return nil +} + +func isVxlanConfigTheSame(new, current netlink.Link) bool { + + required := new.(*netlink.Vxlan) + existing := current.(*netlink.Vxlan) + + if required.VxlanId != existing.VxlanId { + klog.V(4).Infof("VxlanId of existing interface (%d) does not match with required VxlanId (%d)", existing.VxlanId, required.VxlanId) + return false + } + + if len(required.Group) > 0 && len(existing.Group) > 0 && !required.Group.Equal(existing.Group) { + klog.V(4).Infof("Vxlan Group (%v) of existing interface does not match with required Group (%v)", existing.Group, required.Group) + return false + } + + if len(required.SrcAddr) > 0 && len(existing.SrcAddr) > 0 && !required.SrcAddr.Equal(existing.SrcAddr) { + klog.V(4).Infof("Vxlan SrcAddr (%v) of existing interface does not match with required SrcAddr (%v)", existing.SrcAddr, required.SrcAddr) + return false + } + + if required.Port > 0 && existing.Port > 0 && required.Port != existing.Port { + klog.V(4).Infof("Vxlan Port (%d) of existing interface does not match with required Port (%d)", existing.Port, required.Port) + return false + } + + return true +} + +func (iface *vxLanIface) configureIPAddress(ipAddress net.IP, mask net.IPMask) error { + ipConfig := &netlink.Addr{IPNet: &net.IPNet{ + IP: ipAddress, + Mask: mask, + }} + + err := netlink.AddrAdd(iface.link, ipConfig) + if err == syscall.EEXIST { + return nil + } else if err != nil { + return fmt.Errorf("unable to configure address (%s) on vxlan interface (%s). %v", ipAddress, iface.link.Name, err) + } + return nil +} + +func (iface *vxLanIface) AddFDB(ipAddress net.IP, hwAddr string) error { + macAddr, err := net.ParseMAC(hwAddr) + if err != nil { + return fmt.Errorf("invalid MAC Address (%s) supplied. %v", hwAddr, err) + } + + neigh := &netlink.Neigh{ + LinkIndex: iface.link.Index, + Family: unix.AF_BRIDGE, + Flags: netlink.NTF_SELF, + Type: netlink.NDA_DST, + IP: ipAddress, + State: netlink.NUD_PERMANENT | netlink.NUD_NOARP, + HardwareAddr: macAddr, + } + + err = netlink.NeighAppend(neigh) + if err != nil { + return fmt.Errorf("unable to add the bridge fdb entry %v, err: %s", neigh, err) + } else { + klog.V(4).Infof("Successfully added the bridge fdb entry %v", neigh) + } + return nil +} + +func (iface *vxLanIface) DelFDB(ipAddress net.IP, hwAddr string) error { + macAddr, err := net.ParseMAC(hwAddr) + if err != nil { + return fmt.Errorf("invalid MAC Address (%s) supplied. %v", hwAddr, err) + } + + neigh := &netlink.Neigh{ + LinkIndex: iface.link.Index, + Family: unix.AF_BRIDGE, + Flags: netlink.NTF_SELF, + Type: netlink.NDA_DST, + IP: ipAddress, + State: netlink.NUD_PERMANENT | netlink.NUD_NOARP, + HardwareAddr: macAddr, + } + + err = netlink.NeighDel(neigh) + if err != nil { + return fmt.Errorf("unable to delete the bridge fdb entry %v, err: %s", neigh, err) + } else { + klog.V(4).Infof("Successfully deleted the bridge fdb entry %v", neigh) + } + return nil +} diff --git a/pkg/routeagent/main.go b/pkg/routeagent/main.go index 298241083..135a8159c 100644 --- a/pkg/routeagent/main.go +++ b/pkg/routeagent/main.go @@ -6,6 +6,10 @@ import ( "time" "github.com/kelseyhightower/envconfig" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "github.com/submariner-io/submariner/pkg/routeagent/controllers/route" "github.com/submariner-io/submariner/pkg/util" @@ -23,8 +27,14 @@ var ( ) type SubmarinerRouteControllerSpecification struct { - ClusterID string - Namespace string + ClusterID string + Namespace string + ClusterCidr []string + ServiceCidr []string +} + +func filterRouteAgentPods(options *v1.ListOptions) { + options.LabelSelector = route.SmRouteAgentFilter } func main() { @@ -53,10 +63,31 @@ func main() { submarinerInformerFactory := submarinerInformers.NewSharedInformerFactoryWithOptions(submarinerClient, time.Second*30, submarinerInformers.WithNamespace(srcs.Namespace)) + clientSet, err := kubernetes.NewForConfig(cfg) + if err != nil { + klog.Fatalf("Error building clientset: %s", err.Error()) + } + + informerFactory := informers.NewSharedInformerFactoryWithOptions(clientSet, time.Second*60, informers.WithNamespace(srcs.Namespace), informers.WithTweakListOptions(filterRouteAgentPods)) + + informerConfig := route.InformerConfigStruct{ + SubmarinerClientSet: submarinerClient, + ClientSet: clientSet, + ClusterInformer: submarinerInformerFactory.Submariner().V1().Clusters(), + EndpointInformer: submarinerInformerFactory.Submariner().V1().Endpoints(), + PodInformer: informerFactory.Core().V1().Pods(), + } + defLink, err := util.GetDefaultGatewayInterface() - routeController := route.NewController(srcs.ClusterID, srcs.Namespace, defLink, submarinerClient, submarinerInformerFactory.Submariner().V1().Clusters(), submarinerInformerFactory.Submariner().V1().Endpoints()) + if err != nil { + klog.Errorf("Unable to find the default interface on host: %s", err.Error()) + return + } + + routeController := route.NewController(srcs.ClusterID, srcs.ClusterCidr, srcs.ServiceCidr, srcs.Namespace, defLink, informerConfig) submarinerInformerFactory.Start(stopCh) + informerFactory.Start(stopCh) var wg sync.WaitGroup diff --git a/pkg/util/stringset.go b/pkg/util/stringset.go new file mode 100644 index 000000000..9ff06af62 --- /dev/null +++ b/pkg/util/stringset.go @@ -0,0 +1,47 @@ +package util + +import ( + "sync" +) + +type StringSet struct { + syncMutex *sync.Mutex + Set map[string]bool +} + +func NewStringSet() *StringSet { + return &StringSet{ + syncMutex: &sync.Mutex{}, + Set: make(map[string]bool)} +} + +func (set *StringSet) Add(s string) bool { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + + _, found := set.Set[s] + set.Set[s] = true + return !found +} + +func (set *StringSet) Contains(s string) bool { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + + _, found := set.Set[s] + return found +} + +func (set *StringSet) Size() int { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + + return len(set.Set) +} + +func (set *StringSet) Delete(s string) { + set.syncMutex.Lock() + defer set.syncMutex.Unlock() + + delete(set.Set, s) +} diff --git a/pkg/util/stringset_test.go b/pkg/util/stringset_test.go new file mode 100644 index 000000000..0d3e6959b --- /dev/null +++ b/pkg/util/stringset_test.go @@ -0,0 +1,55 @@ +package util + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("StringSet", func() { + Describe("Unit tests for NewStringSet", func() { + Context("When subnetList contains a specified string", func() { + It("Should return true", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + Expect(subnetList.Contains("192.168.2.0/24")).Should(Equal(true)) + }) + }) + Context("When subnetList does not contain a specified string", func() { + It("Should return false", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + Expect(subnetList.Contains("192.168.3.0/24")).Should(Equal(false)) + }) + }) + Context("When subnetList already has an entry", func() { + It("Should not append to the list", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + subnetList.Add("192.168.3.0/24") + subnetList.Add("192.168.2.0/24") + Expect(subnetList.Size()).Should(Equal(3)) + }) + }) + Context("When an entry is deleted from subnetList", func() { + It("Should be removed from the list", func() { + subnetList := NewStringSet() + subnetList.Add("192.168.1.0/24") + subnetList.Add("192.168.2.0/24") + subnetList.Add("192.168.3.0/24") + subnetList.Delete("192.168.2.0/24") + Expect(subnetList.Contains("192.168.2.0/24")).Should(Equal(false)) + Expect(subnetList.Size()).Should(Equal(2)) + }) + }) + }) +}) + +func TestStringSet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "StringSet Suite") +} diff --git a/test/e2e/framework/network_pods.go b/test/e2e/framework/network_pods.go index e9aa9cf9c..bb07145fb 100644 --- a/test/e2e/framework/network_pods.go +++ b/test/e2e/framework/network_pods.go @@ -70,7 +70,7 @@ func (f *Framework) CreateTCPCheckConnectorPod(cluster int, remoteCheckPod *v1.P Image: "busybox", // We send the string 50 times to put more pressure on the TCP connection and avoid limited // resource environments from not sending at least some data before timeout. - Command: []string{"sh", "-c", "for in in $(seq 50); do echo connector says $SEND_STRING; done | nc -v $REMOTE_IP $REMOTE_PORT -w 5 >/dev/termination-log 2>&1"}, + Command: []string{"sh", "-c", "for in in $(seq 50); do echo connector says $SEND_STRING; done | nc -v $REMOTE_IP $REMOTE_PORT -w 8 >/dev/termination-log 2>&1"}, Env: []v1.EnvVar{ {Name: "REMOTE_PORT", Value: strconv.Itoa(TestPort)}, {Name: "SEND_STRING", Value: sendString},