Skip to content

Commit

Permalink
Added sync.Mutex for protecting isGatewayNode and vxlanDevice
Browse files Browse the repository at this point in the history
  • Loading branch information
sridhargaddam committed Sep 13, 2019
1 parent abe5b01 commit 3a2ab8e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
29 changes: 14 additions & 15 deletions pkg/routeagent/controllers/route/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,17 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s
return fmt.Errorf("error listing the rules in %s chain: %v", chain, err)
}

/* Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain so that we can
preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes to the traffic.
In this API, we check if the required iptable rule is present at the beginning of the chain. If the rule is
already present and there are no stale[1] flows, we simply return. If not, we create one.
[1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some new rules
ahead of the rule that we programmed. In such cases, the rule that we programmed will not be the first rule
to hit and Submariner behavior might get affected. So, we query the rules in the chain to see if the rule
slipped its position, and if so, delete all such occurrences. We then re-program a new rule at the beginning
of the chain as required.
*/
// Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain
// so that we can preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes
// to the traffic.
// In this API, we check if the required iptable rule is present at the beginning of the chain.
// If the rule is already present and there are no stale[1] flows, we simply return. If not, we create one.
// [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some
// new rules ahead of the rule that we programmed. In such cases, the rule that we programmed will
// not be the first rule to hit and Submariner behavior might get affected. So, we query the rules
// in the chain to see if the rule slipped its position, and if so, delete all such occurrences.
// We then re-program a new rule at the beginning of the chain as required.

isPresentAtRequiredPosition := false
numOccurrences := 0
for index, rule := range rules {
Expand All @@ -105,8 +104,8 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s
}
}

/* The required rule is present in the Chain, but either there are multiple occurrences or its not at the desired
location */
// The required rule is present in the Chain, but either there are multiple occurrences or its
// not at the desired location
if numOccurrences > 1 || isPresentAtRequiredPosition == false {
for i := 0; i < numOccurrences; i++ {
if err = ipt.Delete(table, chain, ruleSpec...); err != nil {
Expand All @@ -115,7 +114,7 @@ func (r *Controller) prependUnique(ipt *iptables.IPTables, table string, chain s
}
}

/* The required rule is present only once and is at the desired location */
// The required rule is present only once and is at the desired location
if numOccurrences == 1 && isPresentAtRequiredPosition == true {
klog.V(4).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " "))
return nil
Expand Down
61 changes: 40 additions & 21 deletions pkg/routeagent/controllers/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type Controller struct {
localServiceCidr []string
remoteSubnets *util.StringSet

vxlanDevice *vxLanIface
remoteVTEPs *util.StringSet
gwVxLanMutex *sync.Mutex
vxlanDevice *vxLanIface
remoteVTEPs *util.StringSet

isGatewayNode bool
defaultHostIface *net.Interface
Expand All @@ -67,23 +68,24 @@ const (
VxInterfaceGateway = 1
VxLANPort = 4800

/* Why VxLANVTepNetworkPrefix is 240?
On VxLAN interfaces we need a unique IPAddress which does not collide with the
host ip-address. This is going to be tricky as currently there is no specific
CIDR in K8s that can be used for this purpose. If really required, this can be
taken as an input from the user (i.e., as a configuration parameter).
But we want to avoid any additional inputs particularly if there is a way to automate it.
So, the approach we are taking is to derive the VxLAN ip from the hostIPAddress as shown below.
Example: Say the host ipaddress is "192.168.1.100/16", we prepend 240 to the host-ip address
and configure it on the VxLAN interface (i.e., 240.168.1.100/16).
The reason behind choosing 240 is that "240.0.0.0/4" is a Reserved IPAddress [*] which
normally will not be assigned on any of the hosts. Also, note that the VxLAN IPs that are
so configured are only used within the local cluster and traffic will not leave the cluster
with the VxLAN ipaddress.
[*] https://en.wikipedia.org/wiki/Reserved_IP_addresses */
// Why VxLANVTepNetworkPrefix is 240?
// On VxLAN interfaces we need a unique IPAddress which does not collide with the
// host ip-address. This is going to be tricky as currently there is no specific
// CIDR in K8s that can be used for this purpose. One option is to take this as an
// input from the user (i.e., as a configuration parameter), but we want to avoid
// any additional inputs particularly if there is a way to automate it.

// So, the approach we are taking is to derive the VxLAN ip from the hostIPAddress
// as shown below.
// For example: Say, the host ipaddress is "192.168.1.100/16", we prepend 240 to the
// host-ip address, derive the vxlan vtepIP (i.e., 240.168.1.100/16) and configure it
// on the VxLAN interface.

// The reason behind choosing 240 is that "240.0.0.0/4" is a Reserved IPAddress [*]
// which normally will not be assigned on any of the hosts. Also, note that the VxLAN
// IPs are only used within the local cluster and traffic will not leave the cluster
// with the VxLAN ipaddress.
// [*] https://en.wikipedia.org/wiki/Reserved_IP_addresses

VxLANVTepNetworkPrefix = 240
SmPostRoutingChain = "SUBMARINER-POSTROUTING"
Expand All @@ -106,6 +108,7 @@ func NewController(clusterID string, ClusterCidr []string, ServiceCidr []string,
clustersSynced: config.ClusterInformer.Informer().HasSynced,
endpointsSynced: config.EndpointInformer.Informer().HasSynced,
smRouteAgentPodsSynced: config.PodInformer.Informer().HasSynced,
gwVxLanMutex: &sync.Mutex{},
clusterWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Clusters"),
endpointWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Endpoints"),
podWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"),
Expand Down Expand Up @@ -388,6 +391,9 @@ func (r *Controller) processNextPod() bool {
klog.V(4).Infof("In processNextPod, POD HostIP is %s", pod.Status.HostIP)
r.populateRemoteVtepIps(pod.Status.PodIP)

r.gwVxLanMutex.Lock()
defer r.gwVxLanMutex.Unlock()

// A new Node (identified via a Submariner-route-agent daemonset pod event) is added to the cluster.
// On the GatewayDevice, update the vxlan fdb entry (i.e., remote Vtep) for the newly added node.
if r.isGatewayNode {
Expand Down Expand Up @@ -452,8 +458,12 @@ func (r *Controller) processNextEndpoint() bool {
// If the endpoint hostname matches with our hostname, it implies we are on gateway node
if endpoint.Spec.Hostname == hostname {
r.cleanRoutes()
r.gwVxLanMutex.Lock()
defer r.gwVxLanMutex.Unlock()

r.isGatewayNode = true
if r.createVxLANInterface(VxInterfaceGateway, nil) != nil {
err = r.createVxLANInterface(VxInterfaceGateway, nil)
if err != nil {
klog.Fatalf("Unable to create VxLAN interface on GatewayNode (%s): %v", hostname, err)
}
klog.V(6).Infof("not reconciling routes because we appear to be the gateway host")
Expand All @@ -466,10 +476,13 @@ func (r *Controller) processNextEndpoint() bool {
return fmt.Errorf("failed to derive the remoteVtepIP %v", err)
}

r.gwVxLanMutex.Lock()
r.isGatewayNode = false
if r.createVxLANInterface(VxInterfaceWorker, localClusterGwNodeIP) != nil {
err = r.createVxLANInterface(VxInterfaceWorker, localClusterGwNodeIP)
if err != nil {
klog.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err)
}
r.gwVxLanMutex.Unlock()

r.cleanXfrmPolicies()
err = r.reconcileRoutes(remoteVtepIP)
Expand Down Expand Up @@ -557,6 +570,9 @@ func (r *Controller) handleRemovedEndpoint(obj interface{}) {
}

if object.Spec.ClusterID == r.clusterID {
r.gwVxLanMutex.Lock()
defer r.gwVxLanMutex.Unlock()

klog.V(6).Infof("Endpoint matches the cluster ID of this cluster")
err := r.vxlanDevice.deleteVxLanIface()
if err != nil {
Expand All @@ -579,6 +595,9 @@ func (r *Controller) handleRemovedPod(obj interface{}) {

if r.remoteVTEPs.Contains(pod.Status.HostIP) {
r.remoteVTEPs.Delete(pod.Status.HostIP)
r.gwVxLanMutex.Lock()
defer r.gwVxLanMutex.Unlock()

if r.isGatewayNode && r.vxlanDevice != nil {
ret := r.vxlanDevice.DelFDB(net.ParseIP(pod.Status.PodIP), "00:00:00:00:00:00")
if ret != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/routeagent/controllers/route/vxlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func createVxLanIface(iface *vxLanIface) error {
// Get the properties of existing vxlan interface
existing, err := netlink.LinkByName(iface.link.Name)
if err != nil {
return err
return fmt.Errorf("failed to retrieve link info: %v", err)
}

if isVxlanConfigTheSame(iface.link, existing) {
Expand Down

0 comments on commit 3a2ab8e

Please sign in to comment.