Skip to content

Commit

Permalink
Cleanup error handling in datastoresyncer (submariner-io#70)
Browse files Browse the repository at this point in the history
- Propagate or log errors instead of calling Fatalf
- Propagate errors for functions that return error
- Improved logging messages where appropriate

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>

Co-authored-by: Miguel Angel Ajo Pelayo <miguelangel@ajo.es>
  • Loading branch information
tpantelis and mangelajo authored Jan 28, 2020
1 parent 73e442e commit 87b9209
Showing 1 changed file with 49 additions and 41 deletions.
90 changes: 49 additions & 41 deletions pkg/controllers/datastoresyncer/datastoresyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,35 @@ func NewDatastoreSyncer(thisClusterID string, objectNamespace string, kubeClient
return &newDatastoreSyncer
}

func (d *DatastoreSyncer) ensureExclusiveEndpoint() {
func (d *DatastoreSyncer) ensureExclusiveEndpoint() error {
klog.V(4).Infof("Ensuring we are the only endpoint active for this cluster")
endpoints, err := d.datastore.GetEndpoints(d.localCluster.ID)
if err != nil {
klog.Fatalf("Error while retrieving endpoints %v", err)
return fmt.Errorf("error retrieving endpoints %v", err)
}

for _, endpoint := range endpoints {
if !util.CompareEndpointSpec(endpoint.Spec, d.localEndpoint.Spec) {
endpointCrdName, err := util.GetEndpointCRDName(&endpoint)
if err != nil {
klog.Errorf("Error while converting endpoint to CRD Name %s", endpoint.Spec.CableName)
break
klog.Errorf("error converting endpoint %#v to CRD Name: %v", endpoint, err)
continue
}
// we need to remove this endpoint
klog.V(4).Infof("Found endpoint (%s) that wasn't us but is part of our cluster, triggered delete in central datastore as well as removing CRD", endpointCrdName)
err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Delete(endpointCrdName, &metav1.DeleteOptions{})
if err != nil {
klog.Errorf("Error while deleting endpoint CRD for %s: %v", endpointCrdName, err)
klog.Errorf("error deleting endpoint CRD for %s: %v", endpointCrdName, err)
}
err = d.datastore.RemoveEndpoint(d.localCluster.ID, endpoint.Spec.CableName)
if err != nil {
klog.Errorf("Error while removing endpoint in remote datastore for %s: %v", endpoint.Spec.CableName, d.localCluster.ID)
klog.Errorf("error removing endpoint in remote datastore for %s: %v", endpoint.Spec.CableName, d.localCluster.ID)
}
klog.V(4).Infof("Removed endpoint %s", endpointCrdName)
}
}

return nil
}

func (d *DatastoreSyncer) enqueueCluster(obj interface{}) {
Expand Down Expand Up @@ -135,16 +137,16 @@ func (d *DatastoreSyncer) Run(stopCh <-chan struct{}) error {
return fmt.Errorf("failed to wait for caches to sync")
}

d.ensureExclusiveEndpoint()
if err := d.ensureExclusiveEndpoint(); err != nil {
return fmt.Errorf("could not ensure exclusive endpoint: %v", err)
}

err := d.reconcileClusterCRD(&d.localCluster, false)
if err != nil {
return fmt.Errorf("Error reconciling local Cluster CRD: %v", err)
if err := d.reconcileClusterCRD(&d.localCluster, false); err != nil {
return fmt.Errorf("error reconciling local Cluster CRD: %v", err)
}

err = d.reconcileEndpointCRD(&d.localEndpoint, false)
if err != nil {
return fmt.Errorf("Error reconciling local Endpoint CRD: %v", err)
if err := d.reconcileEndpointCRD(&d.localEndpoint, false); err != nil {
return fmt.Errorf("error reconciling local Endpoint CRD: %v", err)
}

go utilruntime.HandleError(d.datastore.WatchClusters(context.TODO(), d.thisClusterID, d.colorCodes, d.reconcileClusterCRD))
Expand Down Expand Up @@ -178,31 +180,33 @@ func (d *DatastoreSyncer) processNextClusterWorkItem() bool {
klog.V(8).Infof("Processing cluster object: %v", obj)
ns, key, err := cache.SplitMetaNamespaceKey(obj.(string))
if err != nil {
klog.Errorf("error while splitting meta namespace key: %v", err)
return nil
d.clusterWorkqueue.Forget(obj)
return fmt.Errorf("error splitting meta namespace key for %s: %v", obj, err)
}

if d.thisClusterID != key {
klog.V(6).Infof("The updated cluster object was not for this cluster, skipping updating the datastore")
klog.V(6).Infof("The updated cluster object for key %s was not for this cluster, skipping updating the datastore", key)
// not actually an error but we should forget about this and return
d.clusterWorkqueue.Forget(obj)
return nil
}

cluster, err := d.submarinerClusterInformer.Lister().Clusters(ns).Get(key)
if err != nil {
klog.Errorf("Error while retrieving submariner cluster object %s", obj)
d.clusterWorkqueue.Forget(obj)
return nil
return fmt.Errorf("error retrieving submariner cluster object for %s: %v", key, err)
}
myCluster := types.SubmarinerCluster{
ID: cluster.Name,
Spec: cluster.Spec,
}
klog.V(4).Infof("Attempting to trigger an update of the central datastore with the updated CRD")
err = d.datastore.SetCluster(&myCluster)
klog.V(4).Infof("Update of cluster in central datastore was successful")
if err != nil {
klog.Errorf("There was an error updating the cluster in the central datastore, error: %v", err)
if err = d.datastore.SetCluster(&myCluster); err != nil {
d.clusterWorkqueue.Forget(obj)
return fmt.Errorf("error updating the cluster %#v in the central datastore: %v", myCluster, err)
}

klog.V(4).Infof("Update of cluster %#v in central datastore was successful", myCluster)
d.clusterWorkqueue.Forget(obj)
return nil
}()
Expand All @@ -229,36 +233,39 @@ func (d *DatastoreSyncer) processNextEndpointWorkItem() bool {
klog.V(8).Infof("Processing endpoint object: %v", obj)
ns, key, err := cache.SplitMetaNamespaceKey(obj.(string))
if err != nil {
klog.Errorf("error while splitting meta namespace key: %v", err)
return nil
d.endpointWorkqueue.Forget(obj)
return fmt.Errorf("error splitting meta namespace key for %s: %v", obj, err)
}

endpoint, err := d.submarinerClientset.SubmarinerV1().Endpoints(ns).Get(key, metav1.GetOptions{})
if err != nil {
klog.Errorf("Error while retrieving submariner endpoint object %s", obj)
d.endpointWorkqueue.Forget(obj)
return nil
return fmt.Errorf("error retrieving submariner endpoint object for %s: %v", key, err)
}

if d.thisClusterID != endpoint.Spec.ClusterID {
klog.V(4).Infof("The updated endpoint object was not for this cluster, skipping updating the datastore")
// not actually an error but we should forget about this and return
d.endpointWorkqueue.Forget(obj)
return nil
}

if d.localEndpoint.Spec.CableName != endpoint.Spec.CableName {
klog.V(4).Infof("This endpoint is not me, not updating central datastore")
d.endpointWorkqueue.Forget(obj)
return nil
}

myEndpoint := types.SubmarinerEndpoint{
Spec: endpoint.Spec,
}
klog.V(4).Infof("Attempting to trigger an update of the central datastore with the updated endpoint CRD")
err = d.datastore.SetEndpoint(&myEndpoint)
if err != nil {
klog.Errorf("There was an error updating the endpoint in the central datastore, error: %v", err)
} else {
klog.V(4).Infof("Update of endpoint in central datastore was successful")
if err = d.datastore.SetEndpoint(&myEndpoint); err != nil {
d.endpointWorkqueue.Forget(obj)
return fmt.Errorf("error updating the endpoint %#v in the central datastore: %v", myEndpoint, err)
}

klog.V(4).Infof("Update of endpoint %#v in central datastore was successful", myEndpoint)
d.endpointWorkqueue.Forget(obj)
return nil
}()
Expand All @@ -273,8 +280,9 @@ func (d *DatastoreSyncer) processNextEndpointWorkItem() bool {
func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerCluster, delete bool) error {
clusterCRDName, err := util.GetClusterCRDName(localCluster)
if err != nil {
return fmt.Errorf("Error converting the Cluster CRD name: %v", err)
return fmt.Errorf("error extracting the Cluster CRD name for %#v: %v", localCluster, err)
}

var found bool
cluster, err := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Get(clusterCRDName, metav1.GetOptions{})
if err != nil {
Expand All @@ -290,7 +298,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
klog.V(6).Infof("Attempting to delete Cluster CRD %s from the local datastore", clusterCRDName)
err = d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Delete(clusterCRDName, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("Error deleting Cluster CRD %s from the local datastore: %v", clusterCRDName, err)
return fmt.Errorf("error deleting Cluster CRD %s from the local datastore: %v", clusterCRDName, err)
}
} else {
klog.V(6).Infof("Cluster CRD %s was not found for deletion", clusterCRDName)
Expand All @@ -305,7 +313,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
}
_, err = d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Create(cluster)
if err != nil {
return fmt.Errorf("Error creating Cluster CRD %s in the local datastore: %v", clusterCRDName, err)
return fmt.Errorf("error creating Cluster CRD %s in the local datastore: %v", clusterCRDName, err)
}
} else {
if reflect.DeepEqual(cluster.Spec, localCluster.Spec) {
Expand All @@ -315,14 +323,14 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, getErr := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Get(clusterCRDName, metav1.GetOptions{})
if getErr != nil {
return fmt.Errorf("Error retrieving latest version of Cluster %s: %v", clusterCRDName, getErr)
return fmt.Errorf("error retrieving latest version of Cluster %s: %v", clusterCRDName, getErr)
}
result.Spec = localCluster.Spec
_, updateErr := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Update(result)
return updateErr
})
if retryErr != nil {
return fmt.Errorf("Error updating cluster CRD %s: %v", clusterCRDName, retryErr)
return fmt.Errorf("error updating cluster CRD %s: %v", clusterCRDName, retryErr)
}
}
}
Expand All @@ -332,7 +340,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndpoint, delete bool) error {
endpointName, err := util.GetEndpointCRDName(rawEndpoint)
if err != nil {
return fmt.Errorf("Error converting the Enndpoint CRD name: %v", err)
return fmt.Errorf("error extracting the Endpoint CRD name for %#v: %v", rawEndpoint, err)
}

var found bool
Expand All @@ -350,7 +358,7 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp
klog.V(6).Infof("Attempting to delete Endpoint CRD %s from local datastore", endpointName)
err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Delete(endpointName, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("Error deleting Endpoint CRD %s from the local datastore: %v", endpointName, err)
return fmt.Errorf("error deleting Endpoint CRD %s from the local datastore: %v", endpointName, err)
}
} else {
klog.V(6).Infof("Endpoint CRD %s was not found for deletion", endpointName)
Expand All @@ -365,7 +373,7 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp
}
_, err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Create(endpoint)
if err != nil {
return fmt.Errorf("Error creating Endpoint CRD %s in the local datastore: %v", endpointName, err)
return fmt.Errorf("error creating Endpoint CRD %s in the local datastore: %v", endpointName, err)
}
} else {
if reflect.DeepEqual(endpoint.Spec, rawEndpoint.Spec) {
Expand All @@ -375,14 +383,14 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, getErr := d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Get(endpointName, metav1.GetOptions{})
if getErr != nil {
return fmt.Errorf("Error retrieving latest version of Endpoint %s: %v", endpointName, getErr)
return fmt.Errorf("error retrieving latest version of Endpoint %s: %v", endpointName, getErr)
}
result.Spec = rawEndpoint.Spec
_, updateErr := d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Update(result)
return updateErr
})
if retryErr != nil {
return fmt.Errorf("Error updating endpoint CRD %s: %v", endpointName, retryErr)
return fmt.Errorf("error updating endpoint CRD %s: %v", endpointName, retryErr)
}
}
}
Expand Down

0 comments on commit 87b9209

Please sign in to comment.