Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup error handling in datastoresyncer #70

Merged
merged 3 commits into from
Jan 28, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 49 additions & 41 deletions pkg/controllers/datastoresyncer/datastoresyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,35 @@ func NewDatastoreSyncer(thisClusterID string, objectNamespace string, kubeClient
return &newDatastoreSyncer
}

func (d *DatastoreSyncer) ensureExclusiveEndpoint() {
func (d *DatastoreSyncer) ensureExclusiveEndpoint() error {
klog.V(4).Infof("Ensuring we are the only endpoint active for this cluster")
endpoints, err := d.datastore.GetEndpoints(d.localCluster.ID)
if err != nil {
klog.Fatalf("Error while retrieving endpoints %v", err)
return fmt.Errorf("error retrieving endpoints %v", err)
}

for _, endpoint := range endpoints {
if !util.CompareEndpointSpec(endpoint.Spec, d.localEndpoint.Spec) {
endpointCrdName, err := util.GetEndpointCRDName(&endpoint)
if err != nil {
klog.Errorf("Error while converting endpoint to CRD Name %s", endpoint.Spec.CableName)
break
klog.Errorf("error converting endpoint %#v to CRD Name: %v", endpoint, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the reasoning here (and elsewhere in the loop) which leads to logging an error rather than returning the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I changed these sites to return error as well but I'm unsure as to the original intent. These are questions went thru my mind: Why was the error above fatal but not this one and the 2 below it... Why does it break here and not below... Are these errors catastrophic enough to propagate and thus exit the process... When deleting, would it return an error if the object doesn't exist... The latter 2 made me squeamish about changing the behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oats87 what do you think?

continue
}
// we need to remove this endpoint
klog.V(4).Infof("Found endpoint (%s) that wasn't us but is part of our cluster, triggered delete in central datastore as well as removing CRD", endpointCrdName)
err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Delete(endpointCrdName, &metav1.DeleteOptions{})
if err != nil {
klog.Errorf("Error while deleting endpoint CRD for %s: %v", endpointCrdName, err)
klog.Errorf("error deleting endpoint CRD for %s: %v", endpointCrdName, err)
}
err = d.datastore.RemoveEndpoint(d.localCluster.ID, endpoint.Spec.CableName)
if err != nil {
klog.Errorf("Error while removing endpoint in remote datastore for %s: %v", endpoint.Spec.CableName, d.localCluster.ID)
klog.Errorf("error removing endpoint in remote datastore for %s: %v", endpoint.Spec.CableName, d.localCluster.ID)
}
klog.V(4).Infof("Removed endpoint %s", endpointCrdName)
}
}

return nil
}

func (d *DatastoreSyncer) enqueueCluster(obj interface{}) {
Expand Down Expand Up @@ -135,16 +137,16 @@ func (d *DatastoreSyncer) Run(stopCh <-chan struct{}) error {
return fmt.Errorf("failed to wait for caches to sync")
}

d.ensureExclusiveEndpoint()
if err := d.ensureExclusiveEndpoint(); err != nil {
return fmt.Errorf("could not ensure exclusive endpoint: %v", err)
}

err := d.reconcileClusterCRD(&d.localCluster, false)
if err != nil {
return fmt.Errorf("Error reconciling local Cluster CRD: %v", err)
if err := d.reconcileClusterCRD(&d.localCluster, false); err != nil {
return fmt.Errorf("error reconciling local Cluster CRD: %v", err)
}

err = d.reconcileEndpointCRD(&d.localEndpoint, false)
if err != nil {
return fmt.Errorf("Error reconciling local Endpoint CRD: %v", err)
if err := d.reconcileEndpointCRD(&d.localEndpoint, false); err != nil {
return fmt.Errorf("error reconciling local Endpoint CRD: %v", err)
}

go utilruntime.HandleError(d.datastore.WatchClusters(context.TODO(), d.thisClusterID, d.colorCodes, d.reconcileClusterCRD))
Expand Down Expand Up @@ -178,31 +180,33 @@ func (d *DatastoreSyncer) processNextClusterWorkItem() bool {
klog.V(8).Infof("Processing cluster object: %v", obj)
ns, key, err := cache.SplitMetaNamespaceKey(obj.(string))
if err != nil {
klog.Errorf("error while splitting meta namespace key: %v", err)
return nil
d.clusterWorkqueue.Forget(obj)
return fmt.Errorf("error splitting meta namespace key for %s: %v", obj, err)
}

if d.thisClusterID != key {
klog.V(6).Infof("The updated cluster object was not for this cluster, skipping updating the datastore")
klog.V(6).Infof("The updated cluster object for key %s was not for this cluster, skipping updating the datastore", key)
// not actually an error but we should forget about this and return
d.clusterWorkqueue.Forget(obj)
return nil
}

cluster, err := d.submarinerClusterInformer.Lister().Clusters(ns).Get(key)
if err != nil {
klog.Errorf("Error while retrieving submariner cluster object %s", obj)
d.clusterWorkqueue.Forget(obj)
return nil
return fmt.Errorf("error retrieving submariner cluster object for %s: %v", key, err)
}
myCluster := types.SubmarinerCluster{
ID: cluster.Name,
Spec: cluster.Spec,
}
klog.V(4).Infof("Attempting to trigger an update of the central datastore with the updated CRD")
err = d.datastore.SetCluster(&myCluster)
klog.V(4).Infof("Update of cluster in central datastore was successful")
if err != nil {
klog.Errorf("There was an error updating the cluster in the central datastore, error: %v", err)
if err = d.datastore.SetCluster(&myCluster); err != nil {
d.clusterWorkqueue.Forget(obj)
return fmt.Errorf("error updating the cluster %#v in the central datastore: %v", myCluster, err)
}

klog.V(4).Infof("Update of cluster %#v in central datastore was successful", myCluster)
d.clusterWorkqueue.Forget(obj)
return nil
}()
Expand All @@ -229,36 +233,39 @@ func (d *DatastoreSyncer) processNextEndpointWorkItem() bool {
klog.V(8).Infof("Processing endpoint object: %v", obj)
ns, key, err := cache.SplitMetaNamespaceKey(obj.(string))
if err != nil {
klog.Errorf("error while splitting meta namespace key: %v", err)
return nil
d.endpointWorkqueue.Forget(obj)
return fmt.Errorf("error splitting meta namespace key for %s: %v", obj, err)
}

endpoint, err := d.submarinerClientset.SubmarinerV1().Endpoints(ns).Get(key, metav1.GetOptions{})
if err != nil {
klog.Errorf("Error while retrieving submariner endpoint object %s", obj)
d.endpointWorkqueue.Forget(obj)
return nil
return fmt.Errorf("error retrieving submariner endpoint object for %s: %v", key, err)
}

if d.thisClusterID != endpoint.Spec.ClusterID {
klog.V(4).Infof("The updated endpoint object was not for this cluster, skipping updating the datastore")
// not actually an error but we should forget about this and return
d.endpointWorkqueue.Forget(obj)
return nil
}

if d.localEndpoint.Spec.CableName != endpoint.Spec.CableName {
klog.V(4).Infof("This endpoint is not me, not updating central datastore")
d.endpointWorkqueue.Forget(obj)
return nil
}

myEndpoint := types.SubmarinerEndpoint{
Spec: endpoint.Spec,
}
klog.V(4).Infof("Attempting to trigger an update of the central datastore with the updated endpoint CRD")
err = d.datastore.SetEndpoint(&myEndpoint)
if err != nil {
klog.Errorf("There was an error updating the endpoint in the central datastore, error: %v", err)
} else {
klog.V(4).Infof("Update of endpoint in central datastore was successful")
if err = d.datastore.SetEndpoint(&myEndpoint); err != nil {
d.endpointWorkqueue.Forget(obj)
return fmt.Errorf("error updating the endpoint %#v in the central datastore: %v", myEndpoint, err)
}

klog.V(4).Infof("Update of endpoint %#v in central datastore was successful", myEndpoint)
d.endpointWorkqueue.Forget(obj)
return nil
}()
Expand All @@ -273,8 +280,9 @@ func (d *DatastoreSyncer) processNextEndpointWorkItem() bool {
func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerCluster, delete bool) error {
clusterCRDName, err := util.GetClusterCRDName(localCluster)
if err != nil {
return fmt.Errorf("Error converting the Cluster CRD name: %v", err)
return fmt.Errorf("error extracting the Cluster CRD name for %#v: %v", localCluster, err)
}

var found bool
cluster, err := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Get(clusterCRDName, metav1.GetOptions{})
if err != nil {
Expand All @@ -290,7 +298,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
klog.V(6).Infof("Attempting to delete Cluster CRD %s from the local datastore", clusterCRDName)
err = d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Delete(clusterCRDName, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("Error deleting Cluster CRD %s from the local datastore: %v", clusterCRDName, err)
return fmt.Errorf("error deleting Cluster CRD %s from the local datastore: %v", clusterCRDName, err)
}
} else {
klog.V(6).Infof("Cluster CRD %s was not found for deletion", clusterCRDName)
Expand All @@ -305,7 +313,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
}
_, err = d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Create(cluster)
if err != nil {
return fmt.Errorf("Error creating Cluster CRD %s in the local datastore: %v", clusterCRDName, err)
return fmt.Errorf("error creating Cluster CRD %s in the local datastore: %v", clusterCRDName, err)
}
} else {
if reflect.DeepEqual(cluster.Spec, localCluster.Spec) {
Expand All @@ -315,14 +323,14 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, getErr := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Get(clusterCRDName, metav1.GetOptions{})
if getErr != nil {
return fmt.Errorf("Error retrieving latest version of Cluster %s: %v", clusterCRDName, getErr)
return fmt.Errorf("error retrieving latest version of Cluster %s: %v", clusterCRDName, getErr)
}
result.Spec = localCluster.Spec
_, updateErr := d.submarinerClientset.SubmarinerV1().Clusters(d.objectNamespace).Update(result)
return updateErr
})
if retryErr != nil {
return fmt.Errorf("Error updating cluster CRD %s: %v", clusterCRDName, retryErr)
return fmt.Errorf("error updating cluster CRD %s: %v", clusterCRDName, retryErr)
}
}
}
Expand All @@ -332,7 +340,7 @@ func (d *DatastoreSyncer) reconcileClusterCRD(localCluster *types.SubmarinerClus
func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndpoint, delete bool) error {
endpointName, err := util.GetEndpointCRDName(rawEndpoint)
if err != nil {
return fmt.Errorf("Error converting the Enndpoint CRD name: %v", err)
return fmt.Errorf("error extracting the Endpoint CRD name for %#v: %v", rawEndpoint, err)
}

var found bool
Expand All @@ -350,7 +358,7 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp
klog.V(6).Infof("Attempting to delete Endpoint CRD %s from local datastore", endpointName)
err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Delete(endpointName, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("Error deleting Endpoint CRD %s from the local datastore: %v", endpointName, err)
return fmt.Errorf("error deleting Endpoint CRD %s from the local datastore: %v", endpointName, err)
}
} else {
klog.V(6).Infof("Endpoint CRD %s was not found for deletion", endpointName)
Expand All @@ -365,7 +373,7 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp
}
_, err = d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Create(endpoint)
if err != nil {
return fmt.Errorf("Error creating Endpoint CRD %s in the local datastore: %v", endpointName, err)
return fmt.Errorf("error creating Endpoint CRD %s in the local datastore: %v", endpointName, err)
}
} else {
if reflect.DeepEqual(endpoint.Spec, rawEndpoint.Spec) {
Expand All @@ -375,14 +383,14 @@ func (d *DatastoreSyncer) reconcileEndpointCRD(rawEndpoint *types.SubmarinerEndp
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, getErr := d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Get(endpointName, metav1.GetOptions{})
if getErr != nil {
return fmt.Errorf("Error retrieving latest version of Endpoint %s: %v", endpointName, getErr)
return fmt.Errorf("error retrieving latest version of Endpoint %s: %v", endpointName, getErr)
}
result.Spec = rawEndpoint.Spec
_, updateErr := d.submarinerClientset.SubmarinerV1().Endpoints(d.objectNamespace).Update(result)
return updateErr
})
if retryErr != nil {
return fmt.Errorf("Error updating endpoint CRD %s: %v", endpointName, retryErr)
return fmt.Errorf("error updating endpoint CRD %s: %v", endpointName, retryErr)
}
}
}
Expand Down