From 9a6c39837fa4bf1fba4bf4ee48c13d4f40132267 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 12 Jun 2019 16:43:00 -0400 Subject: [PATCH] Cleanup error handling in datastoresyncer - Propagate or log errors instead of calling Fatalf - Propagate errors for functions that return error - Improved logging messages where appropriate Signed-off-by: Tom Pantelis --- .../datastoresyncer/datastoresyncer.go | 90 ++++++++++--------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/pkg/controllers/datastoresyncer/datastoresyncer.go b/pkg/controllers/datastoresyncer/datastoresyncer.go index 5b40d590a..c62e50c83 100644 --- a/pkg/controllers/datastoresyncer/datastoresyncer.go +++ b/pkg/controllers/datastoresyncer/datastoresyncer.go @@ -73,33 +73,35 @@ func NewDatastoreSyncer(thisClusterID string, objectNamespace string, kubeClient return &newDatastoreSyncer } -func (d *DatastoreSyncer) ensureExclusiveEndpoint() { +func (d *DatastoreSyncer) ensureExclusiveEndpoint() error { klog.V(4).Infof("Ensuring we are the only endpoint active for this cluster") endpoints, err := d.datastore.GetEndpoints(d.localCluster.ID) if err != nil { - klog.Fatalf("Error while retrieving endpoints %v", err) + return fmt.Errorf("error retrieving endpoints %v", err) } for _, endpoint := range endpoints { if !util.CompareEndpointSpec(endpoint.Spec, d.localEndpoint.Spec) { endpointCrdName, err := util.GetEndpointCRDName(&endpoint) if err != nil { - klog.Errorf("Error while converting endpoint to CRD Name %s", endpoint.Spec.CableName) - break + klog.Errorf("error converting endpoint %#v to CRD Name: %v", endpoint, err) + continue } // we need to remove this endpoint klog.V(4).Infof("Found endpoint (%s) that wasn't us but is part of our cluster, triggered delete in central datastore as well as removing CRD", endpointCrdName) err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Delete(endpointCrdName, &metav1.DeleteOptions{}) if err != nil { - klog.Errorf("Error while deleting endpoint CRD for %s: %v", endpointCrdName, err) + klog.Errorf("error deleting endpoint CRD for %s: %v", endpointCrdName, err) } err = d.datastore.RemoveEndpoint(d.localCluster.ID, endpoint.Spec.CableName) if err != nil { - klog.Errorf("Error while removing endpoint in remote datastore for %s: %v", endpoint.Spec.CableName, d.localCluster.ID) + klog.Errorf("error removing endpoint in remote datastore for %s: %v", endpoint.Spec.CableName, d.localCluster.ID) } klog.V(4).Infof("Removed endpoint %s", endpointCrdName) } } + + return nil } func (d *DatastoreSyncer) enqueueCluster(obj interface{}) { @@ -135,16 +137,16 @@ func (d *DatastoreSyncer) Run(stopCh <-chan struct{}) error { return fmt.Errorf("failed to wait for caches to sync") } - d.ensureExclusiveEndpoint() + if err := d.ensureExclusiveEndpoint(); err != nil { + return fmt.Errorf("could not ensure exclusive endpoint: %v", err) + } - err := d.reconcileClusterCRD(&d.localCluster, false) - if err != nil { - return fmt.Errorf("Error reconciling local Cluster CRD: %v", err) + if err := d.reconcileClusterCRD(&d.localCluster, false); err != nil { + return fmt.Errorf("error reconciling local Cluster CRD: %v", err) } - err = d.reconcileEndpointCRD(&d.localEndpoint, false) - if err != nil { - return fmt.Errorf("Error reconciling local Endpoint CRD: %v", err) + if err := d.reconcileEndpointCRD(&d.localEndpoint, false); err != nil { + return fmt.Errorf("error reconciling local Endpoint CRD: %v", err) } go utilruntime.HandleError(d.datastore.WatchClusters(context.TODO(), d.thisClusterID, d.colorCodes, d.reconcileClusterCRD)) @@ -178,31 +180,33 @@ func (d *DatastoreSyncer) processNextClusterWorkItem() bool { klog.V(8).Infof("Processing cluster object: %v", obj) ns, key, err := cache.SplitMetaNamespaceKey(obj.(string)) if err != nil { - klog.Errorf("error while splitting meta namespace key: %v", err) - return nil + d.clusterWorkqueue.Forget(obj) + return fmt.Errorf("error splitting meta namespace key for %s: %v", obj, err) } + if d.thisClusterID != key { - klog.V(6).Infof("The updated cluster object was not for this cluster, skipping updating the datastore") + klog.V(6).Infof("The updated cluster object for key %s was not for this cluster, skipping updating the datastore", key) // not actually an error but we should forget about this and return d.clusterWorkqueue.Forget(obj) return nil } + cluster, err := d.submarinerClusterInformer.Lister().Clusters(ns).Get(key) if err != nil { - klog.Errorf("Error while retrieving submariner cluster object %s", obj) d.clusterWorkqueue.Forget(obj) - return nil + return fmt.Errorf("error retrieving submariner cluster object for %s: %v", key, err) } myCluster := types.SubmarinerCluster{ ID: cluster.Name, Spec: cluster.Spec, } klog.V(4).Infof("Attempting to trigger an update of the central datastore with the updated CRD") - err = d.datastore.SetCluster(&myCluster) - klog.V(4).Infof("Update of cluster in central datastore was successful") - if err != nil { - klog.Errorf("There was an error updating the cluster in the central datastore, error: %v", err) + if err = d.datastore.SetCluster(&myCluster); err != nil { + d.clusterWorkqueue.Forget(obj) + return fmt.Errorf("error updating the cluster %#v in the central datastore: %v", myCluster, err) } + + klog.V(4).Infof("Update of cluster %#v in central datastore was successful", myCluster) d.clusterWorkqueue.Forget(obj) return nil }() @@ -229,36 +233,39 @@ func (d *DatastoreSyncer) processNextEndpointWorkItem() bool { klog.V(8).Infof("Processing endpoint object: %v", obj) ns, key, err := cache.SplitMetaNamespaceKey(obj.(string)) if err != nil { - klog.Errorf("error while splitting meta namespace key: %v", err) - return nil + d.endpointWorkqueue.Forget(obj) + return fmt.Errorf("error splitting meta namespace key for %s: %v", obj, err) } + endpoint, err := d.submarinerClientset.SubmarinerV1().Endpoints(ns).Get(key, metav1.GetOptions{}) if err != nil { - klog.Errorf("Error while retrieving submariner endpoint object %s", obj) d.endpointWorkqueue.Forget(obj) - return nil + return fmt.Errorf("error retrieving submariner endpoint object for %s: %v", key, err) } + if d.thisClusterID != endpoint.Spec.ClusterID { klog.V(4).Infof("The updated endpoint object was not for this cluster, skipping updating the datastore") // not actually an error but we should forget about this and return d.endpointWorkqueue.Forget(obj) return nil } + if d.localEndpoint.Spec.CableName != endpoint.Spec.CableName { klog.V(4).Infof("This endpoint is not me, not updating central datastore") d.endpointWorkqueue.Forget(obj) return nil } + myEndpoint := types.SubmarinerEndpoint{ Spec: endpoint.Spec, } klog.V(4).Infof("Attempting to trigger an update of the central datastore with the updated endpoint CRD") - err = d.datastore.SetEndpoint(&myEndpoint) - if err != nil { - klog.Errorf("There was an error updating the endpoint in the central datastore, error: %v", err) - } else { - klog.V(4).Infof("Update of endpoint in central datastore was successful") + if err = d.datastore.SetEndpoint(&myEndpoint); err != nil { + d.endpointWorkqueue.Forget(obj) + return fmt.Errorf("error updating the endpoint %#v in the central datastore: %v", myEndpoint, err) } + + klog.V(4).Infof("Update of endpoint %#v in central datastore was successful", myEndpoint) d.endpointWorkqueue.Forget(obj) return nil }() @@ -273,8 +280,9 @@ func (d *DatastoreSyncer) processNextEndpointWorkItem() bool { func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerCluster, delete bool) error { clusterCRDName, err := util.GetClusterCRDName(localCluster) if err != nil { - return fmt.Errorf("Error converting the Cluster CRD name: %v", err) + return fmt.Errorf("error extracting the Cluster CRD name for %#v: %v", localCluster, err) } + var found bool cluster, err := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Get(clusterCRDName, metav1.GetOptions{}) if err != nil { @@ -290,7 +298,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus klog.V(6).Infof("Attempting to delete Cluster CRD %s from the local datastore", clusterCRDName) err = d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Delete(clusterCRDName, &metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("Error deleting Cluster CRD %s from the local datastore: %v", clusterCRDName, err) + return fmt.Errorf("error deleting Cluster CRD %s from the local datastore: %v", clusterCRDName, err) } } else { klog.V(6).Infof("Cluster CRD %s was not found for deletion", clusterCRDName) @@ -305,7 +313,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus } _, err = d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Create(cluster) if err != nil { - return fmt.Errorf("Error creating Cluster CRD %s in the local datastore: %v", clusterCRDName, err) + return fmt.Errorf("error creating Cluster CRD %s in the local datastore: %v", clusterCRDName, err) } } else { if reflect.DeepEqual(cluster.Spec, localCluster.Spec) { @@ -315,14 +323,14 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { result, getErr := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Get(clusterCRDName, metav1.GetOptions{}) if getErr != nil { - return fmt.Errorf("Error retrieving latest version of Cluster %s: %v", clusterCRDName, getErr) + return fmt.Errorf("error retrieving latest version of Cluster %s: %v", clusterCRDName, getErr) } result.Spec = localCluster.Spec _, updateErr := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Update(result) return updateErr }) if retryErr != nil { - return fmt.Errorf("Error updating cluster CRD %s: %v", clusterCRDName, retryErr) + return fmt.Errorf("error updating cluster CRD %s: %v", clusterCRDName, retryErr) } } } @@ -332,7 +340,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndpoint, delete bool) error { endpointName, err := util.GetEndpointCRDName(rawEndpoint) if err != nil { - return fmt.Errorf("Error converting the Enndpoint CRD name: %v", err) + return fmt.Errorf("error extracting the Endpoint CRD name for %#v: %v", rawEndpoint, err) } var found bool @@ -350,7 +358,7 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp klog.V(6).Infof("Attempting to delete Endpoint CRD %s from local datastore", endpointName) err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Delete(endpointName, &metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("Error deleting Endpoint CRD %s from the local datastore: %v", endpointName, err) + return fmt.Errorf("error deleting Endpoint CRD %s from the local datastore: %v", endpointName, err) } } else { klog.V(6).Infof("Endpoint CRD %s was not found for deletion", endpointName) @@ -365,7 +373,7 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp } _, err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Create(endpoint) if err != nil { - return fmt.Errorf("Error creating Endpoint CRD %s in the local datastore: %v", endpointName, err) + return fmt.Errorf("error creating Endpoint CRD %s in the local datastore: %v", endpointName, err) } } else { if reflect.DeepEqual(endpoint.Spec, rawEndpoint.Spec) { @@ -375,14 +383,14 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { result, getErr := d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Get(endpointName, metav1.GetOptions{}) if getErr != nil { - return fmt.Errorf("Error retrieving latest version of Endpoint %s: %v", endpointName, getErr) + return fmt.Errorf("error retrieving latest version of Endpoint %s: %v", endpointName, getErr) } result.Spec = rawEndpoint.Spec _, updateErr := d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Update(result) return updateErr }) if retryErr != nil { - return fmt.Errorf("Error updating endpoint CRD %s: %v", endpointName, retryErr) + return fmt.Errorf("error updating endpoint CRD %s: %v", endpointName, retryErr) } } }