Skip to content

Commit

Permalink
Use buffered channel instead of Mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Spike Curtis <spike@tigera.io>
  • Loading branch information
Spike Curtis committed Aug 6, 2018
1 parent e16464e commit bb6b487
Showing 1 changed file with 41 additions and 62 deletions.
103 changes: 41 additions & 62 deletions pkg/controllers/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package node

import (
"context"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -35,6 +34,12 @@ import (
"github.com/projectcalico/libcalico-go/lib/options"
)

const (
RateLimitCalicoList = "calico-list"
RateLimitK8s = "k8s"
RateLimitCalicoDelete = "calico-delete"
)

// NodeController implements the Controller interface. It is responsible for monitoring
// kubernetes nodes and responding to delete events by removing them from the Calico datastore.
type NodeController struct {
Expand All @@ -44,17 +49,14 @@ type NodeController struct {
k8sClientset *kubernetes.Clientset
rl workqueue.RateLimiter
schedule chan interface{}

// the two bools are protected by the Mutex.
m sync.Mutex
syncInProgress bool
syncScheduled bool
}

// NewNodeController Constructor for NodeController
func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset, calicoClient client.Interface) controller.Controller {
// channel used to kick the controller into scheduling a sync
schedule := make(chan interface{})
// channel used to kick the controller into scheduling a sync. It has length
// 1 so that we coalesce multiple kicks while a sync is happening down to
// just one additional sync.
schedule := make(chan interface{}, 1)

// Create a Node watcher.
listWatcher := cache.NewListWatchFromClient(k8sClientset.CoreV1().RESTClient(), "nodes", "", fields.Everything())
Expand All @@ -64,7 +66,7 @@ func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset,
DeleteFunc: func(obj interface{}) {
// Just kick controller to wake up and perform a sync. No need to bother what node it was
// as we sync everything.
schedule <- nil
kick(schedule)
},
}, cache.Indexers{})

Expand Down Expand Up @@ -100,6 +102,7 @@ func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh ch
go c.informer.Run(stopCh)
log.Debug("Waiting to sync with Kubernetes API (Nodes)")
for !c.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
log.Debug("Finished syncing with Kubernetes API (Nodes)")

Expand All @@ -108,8 +111,9 @@ func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh ch

log.Info("Node controller is now running")

// Kick off a start of day sync.
c.schedule <- nil
// Kick off a start of day sync. Write non-blocking so that if a sync is
// already scheduled, we don't schedule another.
kick(c.schedule)

<-stopCh
log.Info("Stopping Node controller")
Expand All @@ -122,53 +126,15 @@ func (c *NodeController) acceptScheduleRequests(stopCh <-chan struct{}) {
// Wait until something wakes us up, or we are stopped
select {
case <-c.schedule:
c.doSchedule(stopCh)
case <-stopCh:
return
}
}
}

// doSchedule actually performs the scheduling of syncs. It is a separate method
// so that we don't introduce locking into the acceptScheduleRequests method.
func (c *NodeController) doSchedule(stopCh <-chan struct{}) {
c.m.Lock()
defer c.m.Unlock()
c.syncScheduled = true
if c.syncInProgress {
return
}
c.syncInProgress = true
go c.syncUntilDone(stopCh)
}

// syncUntilDone kicks off the sync and handles re-synching if something schedules
// a sync while one is in progress. This method assumes the syncInProgress
// and syncScheduled flags are set when it is called.
func (c *NodeController) syncUntilDone(stopCh <-chan struct{}) {
for {
// Maybe stop?
select {
err := c.syncDelete()
if err != nil {
// Reschedule the sync since we hit an error. Note that
// syncDelete() does its own rate limiting, so it's fine to
// reschedule immediately.
kick(c.schedule)
}
case <-stopCh:
return
default:
c.m.Lock()
if c.syncScheduled {
c.syncScheduled = false
c.m.Unlock()
err := c.syncDelete()
if err != nil {
// If we hit an error, reschedule another sync. SyncDelete
// handles its own rate limiting.
c.m.Lock()
c.syncScheduled = true
c.m.Unlock()
}
} else {
c.syncInProgress = false
c.m.Unlock()
return
}
}
}
}
Expand All @@ -177,21 +143,21 @@ func (c *NodeController) syncUntilDone(stopCh <-chan struct{}) {
// K8s, and deletes any Calico nodes which do not exist in K8s.
func (c *NodeController) syncDelete() error {
// Possibly rate limit calls to Calico
time.Sleep(c.rl.When("calico-list"))
time.Sleep(c.rl.When(RateLimitCalicoList))
cNodes, err := c.calicoClient.Nodes().List(c.ctx, options.ListOptions{})
if err != nil {
log.WithError(err).Errorf("Error listing Calico nodes", err)
return err
}
c.rl.Forget("calico")
c.rl.Forget(RateLimitCalicoList)

time.Sleep(c.rl.When("k8s"))
time.Sleep(c.rl.When(RateLimitK8s))
kNodes, err := c.k8sClientset.CoreV1().Nodes().List(meta_v1.ListOptions{})
if err != nil {
log.WithError(err).Errorf("Error listing K8s nodes", err)
return err
}
c.rl.Forget("k8s")
c.rl.Forget(RateLimitK8s)
kNodeIdx := make(map[string]bool)
for _, node := range kNodes.Items {
kNodeIdx[node.Name] = true
Expand All @@ -201,15 +167,28 @@ func (c *NodeController) syncDelete() error {
k8sNodeName := getK8sNodeName(node)
if k8sNodeName != "" && !kNodeIdx[k8sNodeName] {
// No matching Kubernetes node with that name
time.Sleep(c.rl.When("calico-delete"))
time.Sleep(c.rl.When(RateLimitCalicoDelete))
_, err := c.calicoClient.Nodes().Delete(c.ctx, node.Name, options.DeleteOptions{})
if _, doesNotExist := err.(errors.ErrorResourceDoesNotExist); err != nil && !doesNotExist {
// We hit an error other than "does not exist".
log.WithError(err).Errorf("Error deleting Calico node: %v", node.Name, err)
return err
}
c.rl.Forget("calico-delete")
c.rl.Forget(RateLimitCalicoDelete)
}
}
return nil
}

// kick puts an item on the channel in non-blocking write. This means if there
// is already something pending, it has no effect. This allows us to coalesce
// multiple requests into a single pending request.
func kick(c chan<- interface{}) {
select {
case c <- nil:
// pass
default:
// pass
}

}

0 comments on commit bb6b487

Please sign in to comment.