Skip to content

Commit

Permalink
NodeController simplified/stateless
Browse files Browse the repository at this point in the history
This commit changes the node controller to close race conditions
where we might delete a Calico node for reasons other than a
delete of the corresponding Kubernetes node.

It simplifies the controller to remove caches and instead
runs a simple sync routine that queries the data sources.

It also removes the FV test that expects Calico Nodes that
are written without a corresponding K8s node to be removed.
This is a change in behavior; we don't actually need to do
this because startup.go will always ensure the K8s node
exists before writing the Calico node.

Signed-off-by: Spike Curtis <spike@tigera.io>
  • Loading branch information
Spike Curtis committed Aug 3, 2018
1 parent 307a3ed commit e16464e
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 220 deletions.
312 changes: 115 additions & 197 deletions pkg/controllers/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,111 +16,66 @@ package node

import (
"context"
"fmt"
"reflect"
"sync"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
uruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
corecache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

calicocache "github.com/projectcalico/kube-controllers/pkg/cache"
"github.com/projectcalico/kube-controllers/pkg/controllers/controller"
api "github.com/projectcalico/libcalico-go/lib/apis/v3"
client "github.com/projectcalico/libcalico-go/lib/clientv3"
"github.com/projectcalico/libcalico-go/lib/errors"
"github.com/projectcalico/libcalico-go/lib/options"
)

// This cache maps a kubernetesNodeName to its corresponding calicoNode.
type cache struct {
sync.RWMutex
nodes map[string]string
}

// NodeController implements the Controller interface. It is responsible for monitoring
// kubernetes nodes and responding to delete events by removing them from the Calico datastore.
// It keeps a cache of known calico nodes and their corresponding kubernetes nodes to
// accomplish this.
type NodeController struct {
ctx context.Context
informer corecache.Controller
k8sResourceCache calicocache.ResourceCache
nodeLookupCache *cache
calicoClient client.Interface
k8sClientset *kubernetes.Clientset
}

type nodeData struct {
ctx context.Context
informer cache.Controller
calicoClient client.Interface
k8sClientset *kubernetes.Clientset
rl workqueue.RateLimiter
schedule chan interface{}

// the two bools are protected by the Mutex.
m sync.Mutex
syncInProgress bool
syncScheduled bool
}

// NewNodeController Constructor for NodeController
func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset, calicoClient client.Interface) controller.Controller {
cacheArgs := calicocache.ResourceCacheArgs{
ObjectType: reflect.TypeOf(nodeData{}),
ListFunc: func() (map[string]interface{}, error) {
// Get all nodes from the Calico datastore
calicoNodes, err := calicoClient.Nodes().List(ctx, options.ListOptions{})
if err != nil {
return nil, err
}

// Iterate through and store the k8s nodes in our cache.
m := make(map[string]interface{})
for _, calicoNode := range calicoNodes.Items {
// find its kubernetes orchRef
if k8sNodeName := getK8sNodeName(calicoNode); k8sNodeName != "" {
m[k8sNodeName] = nodeData{}
}
}

log.Debugf("Found %d nodes in Calico datastore:", len(m))
return m, nil
},
ReconcilerConfig: calicocache.ReconcilerConfig{
DisableMissingInDatastore: true,
DisableMissingInCache: false,
DisableUpdateOnChange: false,
},
}

k8sResourceCache := calicocache.NewResourceCache(cacheArgs)
nodeLookupCache := cache{nodes: make(map[string]string)}
// channel used to kick the controller into scheduling a sync
schedule := make(chan interface{})

// Create a Node watcher.
listWatcher := corecache.NewListWatchFromClient(k8sClientset.Core().RESTClient(), "nodes", "", fields.Everything())
listWatcher := cache.NewListWatchFromClient(k8sClientset.CoreV1().RESTClient(), "nodes", "", fields.Everything())

// Bind the Calico cache to kubernetes cache with the help of an informer. This way we make sure that
// whenever the kubernetes cache is updated, changes get reflected in the Calico cache as well.
_, informer := corecache.NewIndexerInformer(listWatcher, &v1.Node{}, 0, corecache.ResourceEventHandlerFuncs{
// Informer handles managing the watch and signals us when nodes are deleted.
_, informer := cache.NewIndexerInformer(listWatcher, &v1.Node{}, 0, cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
nodeName, err := extractK8sNodeName(obj)
if err != nil {
log.WithError(err).Errorf("Error while converting %#v to k8s node", obj)
return
}
log.Debugf("Got DELETE event for node: %s", nodeName)
k8sResourceCache.Delete(nodeName)
// Just kick controller to wake up and perform a sync. No need to bother what node it was
// as we sync everything.
schedule <- nil
},

AddFunc: func(obj interface{}) {
nodeName, err := extractK8sNodeName(obj)
if err != nil {
log.WithError(err).Errorf("Error while converting %#v to k8s node", nodeName)
return
}
// Use an empty value here because the only thing we care about is the kuberneteNodeName,
// so there's no other relevant information we want to store in the cache besides the name (which
// is unavailable at this time because the calicoNode is created after the k8sNode).
k8sResourceCache.Set(nodeName, nodeData{})
},
}, corecache.Indexers{})

return &NodeController{ctx, informer, k8sResourceCache, &nodeLookupCache, calicoClient, k8sClientset}
}, cache.Indexers{})

return &NodeController{
ctx: ctx,
informer: informer,
calicoClient: calicoClient,
k8sClientset: k8sClientset,
rl: workqueue.DefaultControllerRateLimiter(),
schedule: schedule,
}
}

// getK8sNodeName is a helper method that searches a calicoNode for its kubernetes nodeRef.
Expand All @@ -134,23 +89,13 @@ func getK8sNodeName(calicoNode api.Node) string {
}

// Run starts the node controller. It does start-of-day preparation
// and then launches worker threads.
// and then launches worker threads. We ignore reconcilerPeriod and threadiness
// as this controller does not use a cache and runs only one worker thread.
func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh chan struct{}) {
defer uruntime.HandleCrash()

// Let the workers stop when we are done
workqueue := c.k8sResourceCache.GetQueue()
defer workqueue.ShutDown()

log.Info("Starting Node controller")

// Load node cache. Retry when failed.
log.Debug("Loading node cache at start of day")
for err := c.populateNodeLookupCache(); err != nil; {
log.WithError(err).Errorf("Failed to load Node cache, retrying in 5s")
time.Sleep(5 * time.Second)
}

// Wait till k8s cache is synced
go c.informer.Run(stopCh)
log.Debug("Waiting to sync with Kubernetes API (Nodes)")
Expand All @@ -159,139 +104,112 @@ func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh ch
log.Debug("Finished syncing with Kubernetes API (Nodes)")

// Start Calico cache.
c.k8sResourceCache.Run(reconcilerPeriod)
go c.acceptScheduleRequests(stopCh)

// Start a number of worker threads to read from the queue.
for i := 0; i < threadiness; i++ {
go c.runWorker()
}
log.Info("Node controller is now running")

// Kick off a start of day sync.
c.schedule <- nil

<-stopCh
log.Info("Stopping Node controller")
}

func (c *NodeController) runWorker() {
for c.processNextItem() {
// acceptScheduleRequests monitors the schedule channel for kicks to wake up
// and schedule syncs.
func (c *NodeController) acceptScheduleRequests(stopCh <-chan struct{}) {
for {
// Wait until something wakes us up, or we are stopped
select {
case <-c.schedule:
c.doSchedule(stopCh)
case <-stopCh:
return
}
}
}

func (c *NodeController) processNextItem() bool {
// Wait until there is a new item in the work queue.
workqueue := c.k8sResourceCache.GetQueue()
key, quit := workqueue.Get()
if quit {
return false
// doSchedule actually performs the scheduling of syncs. It is a separate method
// so that we don't introduce locking into the acceptScheduleRequests method.
func (c *NodeController) doSchedule(stopCh <-chan struct{}) {
c.m.Lock()
defer c.m.Unlock()
c.syncScheduled = true
if c.syncInProgress {
return
}
c.syncInProgress = true
go c.syncUntilDone(stopCh)
}

// Sync the object to the Calico datastore.
if err := c.syncToCalico(key.(string)); err != nil {
c.handleErr(err, key.(string))
// syncUntilDone kicks off the sync and handles re-synching if something schedules
// a sync while one is in progress. This method assumes the syncInProgress
// and syncScheduled flags are set when it is called.
func (c *NodeController) syncUntilDone(stopCh <-chan struct{}) {
for {
// Maybe stop?
select {
case <-stopCh:
return
default:
c.m.Lock()
if c.syncScheduled {
c.syncScheduled = false
c.m.Unlock()
err := c.syncDelete()
if err != nil {
// If we hit an error, reschedule another sync. SyncDelete
// handles its own rate limiting.
c.m.Lock()
c.syncScheduled = true
c.m.Unlock()
}
} else {
c.syncInProgress = false
c.m.Unlock()
return
}
}
}

// Indicate that we're done processing this key, allowing for safe parallel processing such that
// two objects with the same key are never processed in parallel.
workqueue.Done(key)
return true
}

// populateNodeLookupCache fills the nodeLookupCache with initial data
// by querying the existing data stored in Calico.
func (c *NodeController) populateNodeLookupCache() error {
nodes, err := c.calicoClient.Nodes().List(c.ctx, options.ListOptions{})
// syncDelete is the main work routine of the controller. It queries Calico and
// K8s, and deletes any Calico nodes which do not exist in K8s.
func (c *NodeController) syncDelete() error {
// Possibly rate limit calls to Calico
time.Sleep(c.rl.When("calico-list"))
cNodes, err := c.calicoClient.Nodes().List(c.ctx, options.ListOptions{})
if err != nil {
log.WithError(err).Errorf("Error listing Calico nodes", err)
return err
}
c.rl.Forget("calico")

c.nodeLookupCache.Lock()
for _, node := range nodes.Items {
if k8sNodeName := getK8sNodeName(node); k8sNodeName != "" {
c.nodeLookupCache.nodes[k8sNodeName] = node.Name
}
time.Sleep(c.rl.When("k8s"))
kNodes, err := c.k8sClientset.CoreV1().Nodes().List(meta_v1.ListOptions{})
if err != nil {
log.WithError(err).Errorf("Error listing K8s nodes", err)
return err
}
c.rl.Forget("k8s")
kNodeIdx := make(map[string]bool)
for _, node := range kNodes.Items {
kNodeIdx[node.Name] = true
}
c.nodeLookupCache.Unlock()
return nil
}

// syncToCalico syncs the given update to the Calico datastore.
func (c *NodeController) syncToCalico(key string) error {
// Check if it exists in the controller's cache.
_, exists := c.k8sResourceCache.Get(key)
if !exists {
// The object no longer exists - delete from the datastore.
c.nodeLookupCache.RLock()
calicoNodeName, ok := c.nodeLookupCache.nodes[key]
c.nodeLookupCache.RUnlock()
clog := log.WithField("node", calicoNodeName)

if !ok {
clog.Warnf("No corresponding Node in cache, re-loading cache from datastore")

if err := c.populateNodeLookupCache(); err != nil {
clog.WithError(err).Error("Failed to load nodeLookup cache")
return err
}
c.nodeLookupCache.RLock()
calicoNodeName, ok = c.nodeLookupCache.nodes[key]
c.nodeLookupCache.RUnlock()
clog = log.WithField("node", calicoNodeName)
}

if ok {
clog.Infof("Deleting node from Calico datastore.")
_, err := c.calicoClient.Nodes().Delete(c.ctx, calicoNodeName, options.DeleteOptions{})
if _, ok := err.(errors.ErrorResourceDoesNotExist); !ok {
for _, node := range cNodes.Items {
k8sNodeName := getK8sNodeName(node)
if k8sNodeName != "" && !kNodeIdx[k8sNodeName] {
// No matching Kubernetes node with that name
time.Sleep(c.rl.When("calico-delete"))
_, err := c.calicoClient.Nodes().Delete(c.ctx, node.Name, options.DeleteOptions{})
if _, doesNotExist := err.(errors.ErrorResourceDoesNotExist); err != nil && !doesNotExist {
// We hit an error other than "does not exist".
log.WithError(err).Errorf("Error deleting Calico node: %v", node.Name, err)
return err
} else {
// Remove from the node lookup cache.
c.nodeLookupCache.Lock()
delete(c.nodeLookupCache.nodes, key)
c.nodeLookupCache.Unlock()
}
c.rl.Forget("calico-delete")
}
}
return nil
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *NodeController) handleErr(err error, key string) {
workqueue := c.k8sResourceCache.GetQueue()
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
workqueue.Forget(key)
return
}

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if workqueue.NumRequeues(key) < 5 {
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
log.WithError(err).Errorf("Error syncing Policy %v: %v", key, err)
workqueue.AddRateLimited(key)
return
}
workqueue.Forget(key)

// Report to an external entity that, even after several retries, we could not successfully process this key
uruntime.HandleError(err)
log.WithError(err).Errorf("Dropping Policy %q out of the queue: %v", key, err)
}

func extractK8sNodeName(k8sObj interface{}) (string, error) {
node, ok := k8sObj.(*v1.Node)

if !ok {
tombstone, ok := k8sObj.(corecache.DeletedFinalStateUnknown)
if !ok {
return "", fmt.Errorf("couldn't get object from tombstone %+v", k8sObj)
}
node, ok = tombstone.Obj.(*v1.Node)
if !ok {
return "", fmt.Errorf("tombstone contained object that is not a Node %+v", k8sObj)
}
}
return node.ObjectMeta.Name, nil
}
Loading

0 comments on commit e16464e

Please sign in to comment.