From e16464e9a25c2af43340e5ef1ef6eae3decfc9b2 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Fri, 3 Aug 2018 11:49:08 -0700 Subject: [PATCH] NodeController simplified/stateless This commit changes the node controller to close race conditions where we might delete a Calico node for reasons other than a delete of the corresponding Kubernetes node. It simplifies the controller to remove caches and instead runs a simple sync routine that queries the data sources. It also removes the FV test that expects Calico Nodes that are written without a corresponding K8s node to be removed. This is a change in behavior; we don't actually need to do this because startup.go will always ensure the K8s node exists before writing the Calico node. Signed-off-by: Spike Curtis --- pkg/controllers/node/node_controller.go | 312 +++++++++--------------- tests/fv/fv_test.go | 23 -- 2 files changed, 115 insertions(+), 220 deletions(-) diff --git a/pkg/controllers/node/node_controller.go b/pkg/controllers/node/node_controller.go index af82b6e0..51ef3898 100644 --- a/pkg/controllers/node/node_controller.go +++ b/pkg/controllers/node/node_controller.go @@ -16,19 +16,18 @@ package node import ( "context" - "fmt" - "reflect" "sync" "time" log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" uruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - corecache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" - calicocache "github.com/projectcalico/kube-controllers/pkg/cache" "github.com/projectcalico/kube-controllers/pkg/controllers/controller" api "github.com/projectcalico/libcalico-go/lib/apis/v3" client "github.com/projectcalico/libcalico-go/lib/clientv3" @@ -36,91 +35,47 @@ import ( "github.com/projectcalico/libcalico-go/lib/options" ) -// This cache maps a kubernetesNodeName to its corresponding calicoNode. -type cache struct { - sync.RWMutex - nodes map[string]string -} - // NodeController implements the Controller interface. It is responsible for monitoring // kubernetes nodes and responding to delete events by removing them from the Calico datastore. -// It keeps a cache of known calico nodes and their corresponding kubernetes nodes to -// accomplish this. type NodeController struct { - ctx context.Context - informer corecache.Controller - k8sResourceCache calicocache.ResourceCache - nodeLookupCache *cache - calicoClient client.Interface - k8sClientset *kubernetes.Clientset -} - -type nodeData struct { + ctx context.Context + informer cache.Controller + calicoClient client.Interface + k8sClientset *kubernetes.Clientset + rl workqueue.RateLimiter + schedule chan interface{} + + // the two bools are protected by the Mutex. + m sync.Mutex + syncInProgress bool + syncScheduled bool } // NewNodeController Constructor for NodeController func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset, calicoClient client.Interface) controller.Controller { - cacheArgs := calicocache.ResourceCacheArgs{ - ObjectType: reflect.TypeOf(nodeData{}), - ListFunc: func() (map[string]interface{}, error) { - // Get all nodes from the Calico datastore - calicoNodes, err := calicoClient.Nodes().List(ctx, options.ListOptions{}) - if err != nil { - return nil, err - } - - // Iterate through and store the k8s nodes in our cache. - m := make(map[string]interface{}) - for _, calicoNode := range calicoNodes.Items { - // find its kubernetes orchRef - if k8sNodeName := getK8sNodeName(calicoNode); k8sNodeName != "" { - m[k8sNodeName] = nodeData{} - } - } - - log.Debugf("Found %d nodes in Calico datastore:", len(m)) - return m, nil - }, - ReconcilerConfig: calicocache.ReconcilerConfig{ - DisableMissingInDatastore: true, - DisableMissingInCache: false, - DisableUpdateOnChange: false, - }, - } - - k8sResourceCache := calicocache.NewResourceCache(cacheArgs) - nodeLookupCache := cache{nodes: make(map[string]string)} + // channel used to kick the controller into scheduling a sync + schedule := make(chan interface{}) // Create a Node watcher. - listWatcher := corecache.NewListWatchFromClient(k8sClientset.Core().RESTClient(), "nodes", "", fields.Everything()) + listWatcher := cache.NewListWatchFromClient(k8sClientset.CoreV1().RESTClient(), "nodes", "", fields.Everything()) - // Bind the Calico cache to kubernetes cache with the help of an informer. This way we make sure that - // whenever the kubernetes cache is updated, changes get reflected in the Calico cache as well. - _, informer := corecache.NewIndexerInformer(listWatcher, &v1.Node{}, 0, corecache.ResourceEventHandlerFuncs{ + // Informer handles managing the watch and signals us when nodes are deleted. + _, informer := cache.NewIndexerInformer(listWatcher, &v1.Node{}, 0, cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { - nodeName, err := extractK8sNodeName(obj) - if err != nil { - log.WithError(err).Errorf("Error while converting %#v to k8s node", obj) - return - } - log.Debugf("Got DELETE event for node: %s", nodeName) - k8sResourceCache.Delete(nodeName) + // Just kick controller to wake up and perform a sync. No need to bother what node it was + // as we sync everything. + schedule <- nil }, - - AddFunc: func(obj interface{}) { - nodeName, err := extractK8sNodeName(obj) - if err != nil { - log.WithError(err).Errorf("Error while converting %#v to k8s node", nodeName) - return - } - // Use an empty value here because the only thing we care about is the kuberneteNodeName, - // so there's no other relevant information we want to store in the cache besides the name (which - // is unavailable at this time because the calicoNode is created after the k8sNode). - k8sResourceCache.Set(nodeName, nodeData{}) - }, - }, corecache.Indexers{}) - - return &NodeController{ctx, informer, k8sResourceCache, &nodeLookupCache, calicoClient, k8sClientset} + }, cache.Indexers{}) + + return &NodeController{ + ctx: ctx, + informer: informer, + calicoClient: calicoClient, + k8sClientset: k8sClientset, + rl: workqueue.DefaultControllerRateLimiter(), + schedule: schedule, + } } // getK8sNodeName is a helper method that searches a calicoNode for its kubernetes nodeRef. @@ -134,23 +89,13 @@ func getK8sNodeName(calicoNode api.Node) string { } // Run starts the node controller. It does start-of-day preparation -// and then launches worker threads. +// and then launches worker threads. We ignore reconcilerPeriod and threadiness +// as this controller does not use a cache and runs only one worker thread. func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh chan struct{}) { defer uruntime.HandleCrash() - // Let the workers stop when we are done - workqueue := c.k8sResourceCache.GetQueue() - defer workqueue.ShutDown() - log.Info("Starting Node controller") - // Load node cache. Retry when failed. - log.Debug("Loading node cache at start of day") - for err := c.populateNodeLookupCache(); err != nil; { - log.WithError(err).Errorf("Failed to load Node cache, retrying in 5s") - time.Sleep(5 * time.Second) - } - // Wait till k8s cache is synced go c.informer.Run(stopCh) log.Debug("Waiting to sync with Kubernetes API (Nodes)") @@ -159,139 +104,112 @@ func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh ch log.Debug("Finished syncing with Kubernetes API (Nodes)") // Start Calico cache. - c.k8sResourceCache.Run(reconcilerPeriod) + go c.acceptScheduleRequests(stopCh) - // Start a number of worker threads to read from the queue. - for i := 0; i < threadiness; i++ { - go c.runWorker() - } log.Info("Node controller is now running") + // Kick off a start of day sync. + c.schedule <- nil + <-stopCh log.Info("Stopping Node controller") } -func (c *NodeController) runWorker() { - for c.processNextItem() { +// acceptScheduleRequests monitors the schedule channel for kicks to wake up +// and schedule syncs. +func (c *NodeController) acceptScheduleRequests(stopCh <-chan struct{}) { + for { + // Wait until something wakes us up, or we are stopped + select { + case <-c.schedule: + c.doSchedule(stopCh) + case <-stopCh: + return + } } } -func (c *NodeController) processNextItem() bool { - // Wait until there is a new item in the work queue. - workqueue := c.k8sResourceCache.GetQueue() - key, quit := workqueue.Get() - if quit { - return false +// doSchedule actually performs the scheduling of syncs. It is a separate method +// so that we don't introduce locking into the acceptScheduleRequests method. +func (c *NodeController) doSchedule(stopCh <-chan struct{}) { + c.m.Lock() + defer c.m.Unlock() + c.syncScheduled = true + if c.syncInProgress { + return } + c.syncInProgress = true + go c.syncUntilDone(stopCh) +} - // Sync the object to the Calico datastore. - if err := c.syncToCalico(key.(string)); err != nil { - c.handleErr(err, key.(string)) +// syncUntilDone kicks off the sync and handles re-synching if something schedules +// a sync while one is in progress. This method assumes the syncInProgress +// and syncScheduled flags are set when it is called. +func (c *NodeController) syncUntilDone(stopCh <-chan struct{}) { + for { + // Maybe stop? + select { + case <-stopCh: + return + default: + c.m.Lock() + if c.syncScheduled { + c.syncScheduled = false + c.m.Unlock() + err := c.syncDelete() + if err != nil { + // If we hit an error, reschedule another sync. SyncDelete + // handles its own rate limiting. + c.m.Lock() + c.syncScheduled = true + c.m.Unlock() + } + } else { + c.syncInProgress = false + c.m.Unlock() + return + } + } } - - // Indicate that we're done processing this key, allowing for safe parallel processing such that - // two objects with the same key are never processed in parallel. - workqueue.Done(key) - return true } -// populateNodeLookupCache fills the nodeLookupCache with initial data -// by querying the existing data stored in Calico. -func (c *NodeController) populateNodeLookupCache() error { - nodes, err := c.calicoClient.Nodes().List(c.ctx, options.ListOptions{}) +// syncDelete is the main work routine of the controller. It queries Calico and +// K8s, and deletes any Calico nodes which do not exist in K8s. +func (c *NodeController) syncDelete() error { + // Possibly rate limit calls to Calico + time.Sleep(c.rl.When("calico-list")) + cNodes, err := c.calicoClient.Nodes().List(c.ctx, options.ListOptions{}) if err != nil { + log.WithError(err).Errorf("Error listing Calico nodes", err) return err } + c.rl.Forget("calico") - c.nodeLookupCache.Lock() - for _, node := range nodes.Items { - if k8sNodeName := getK8sNodeName(node); k8sNodeName != "" { - c.nodeLookupCache.nodes[k8sNodeName] = node.Name - } + time.Sleep(c.rl.When("k8s")) + kNodes, err := c.k8sClientset.CoreV1().Nodes().List(meta_v1.ListOptions{}) + if err != nil { + log.WithError(err).Errorf("Error listing K8s nodes", err) + return err + } + c.rl.Forget("k8s") + kNodeIdx := make(map[string]bool) + for _, node := range kNodes.Items { + kNodeIdx[node.Name] = true } - c.nodeLookupCache.Unlock() - return nil -} - -// syncToCalico syncs the given update to the Calico datastore. -func (c *NodeController) syncToCalico(key string) error { - // Check if it exists in the controller's cache. - _, exists := c.k8sResourceCache.Get(key) - if !exists { - // The object no longer exists - delete from the datastore. - c.nodeLookupCache.RLock() - calicoNodeName, ok := c.nodeLookupCache.nodes[key] - c.nodeLookupCache.RUnlock() - clog := log.WithField("node", calicoNodeName) - - if !ok { - clog.Warnf("No corresponding Node in cache, re-loading cache from datastore") - - if err := c.populateNodeLookupCache(); err != nil { - clog.WithError(err).Error("Failed to load nodeLookup cache") - return err - } - c.nodeLookupCache.RLock() - calicoNodeName, ok = c.nodeLookupCache.nodes[key] - c.nodeLookupCache.RUnlock() - clog = log.WithField("node", calicoNodeName) - } - if ok { - clog.Infof("Deleting node from Calico datastore.") - _, err := c.calicoClient.Nodes().Delete(c.ctx, calicoNodeName, options.DeleteOptions{}) - if _, ok := err.(errors.ErrorResourceDoesNotExist); !ok { + for _, node := range cNodes.Items { + k8sNodeName := getK8sNodeName(node) + if k8sNodeName != "" && !kNodeIdx[k8sNodeName] { + // No matching Kubernetes node with that name + time.Sleep(c.rl.When("calico-delete")) + _, err := c.calicoClient.Nodes().Delete(c.ctx, node.Name, options.DeleteOptions{}) + if _, doesNotExist := err.(errors.ErrorResourceDoesNotExist); err != nil && !doesNotExist { // We hit an error other than "does not exist". + log.WithError(err).Errorf("Error deleting Calico node: %v", node.Name, err) return err - } else { - // Remove from the node lookup cache. - c.nodeLookupCache.Lock() - delete(c.nodeLookupCache.nodes, key) - c.nodeLookupCache.Unlock() } + c.rl.Forget("calico-delete") } } return nil } - -// handleErr checks if an error happened and makes sure we will retry later. -func (c *NodeController) handleErr(err error, key string) { - workqueue := c.k8sResourceCache.GetQueue() - if err == nil { - // Forget about the #AddRateLimited history of the key on every successful synchronization. - // This ensures that future processing of updates for this key is not delayed because of - // an outdated error history. - workqueue.Forget(key) - return - } - - // This controller retries 5 times if something goes wrong. After that, it stops trying. - if workqueue.NumRequeues(key) < 5 { - // Re-enqueue the key rate limited. Based on the rate limiter on the - // queue and the re-enqueue history, the key will be processed later again. - log.WithError(err).Errorf("Error syncing Policy %v: %v", key, err) - workqueue.AddRateLimited(key) - return - } - workqueue.Forget(key) - - // Report to an external entity that, even after several retries, we could not successfully process this key - uruntime.HandleError(err) - log.WithError(err).Errorf("Dropping Policy %q out of the queue: %v", key, err) -} - -func extractK8sNodeName(k8sObj interface{}) (string, error) { - node, ok := k8sObj.(*v1.Node) - - if !ok { - tombstone, ok := k8sObj.(corecache.DeletedFinalStateUnknown) - if !ok { - return "", fmt.Errorf("couldn't get object from tombstone %+v", k8sObj) - } - node, ok = tombstone.Obj.(*v1.Node) - if !ok { - return "", fmt.Errorf("tombstone contained object that is not a Node %+v", k8sObj) - } - } - return node.ObjectMeta.Name, nil -} diff --git a/tests/fv/fv_test.go b/tests/fv/fv_test.go index 0a8966b5..cf6a7f07 100644 --- a/tests/fv/fv_test.go +++ b/tests/fv/fv_test.go @@ -193,29 +193,6 @@ var _ = Describe("kube-controllers FV tests", func() { }, time.Second*2, 500*time.Millisecond).Should(BeNil()) }) - It("should be removed if they reference a k8sNode that doesn't exist", func() { - cn := &api.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: cNodeName, - }, - Spec: api.NodeSpec{ - OrchRefs: []api.OrchRef{ - { - NodeName: "k8sfakenode", - Orchestrator: "k8s", - }, - }, - }, - } - _, err := calicoClient.Nodes().Create(context.Background(), cn, options.SetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - Eventually(func() *api.Node { - node, _ := calicoClient.Nodes().Get(context.Background(), cNodeName, options.GetOptions{}) - return node - }, time.Second*15, 500*time.Millisecond).Should(BeNil()) - }) - It("should not be removed in response to a k8s node delete if another orchestrator owns it", func() { kn := &v1.Node{ ObjectMeta: metav1.ObjectMeta{