Skip to content

Commit

Permalink
Fix data race in serviceexport_controller
Browse files Browse the repository at this point in the history
Data race can occur if multiple workers read or write r.localClusterID
at the same time. Refactored the Reconcile loop so that the
localClusterID is only set when it is missing.

Signed-off-by: Dyanngg <dingyang@vmware.com>
  • Loading branch information
Dyanngg committed Oct 17, 2022
1 parent cd4ccdd commit 0293390
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multicluster
import (
"context"
"reflect"
"sync"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -60,8 +61,10 @@ type (
// ServiceExportReconciler reconciles a ServiceExport object in the member cluster.
ServiceExportReconciler struct {
client.Client
mutex sync.Mutex
Scheme *runtime.Scheme
commonAreaGetter RemoteCommonAreaGetter
remoteCommonArea commonarea.RemoteCommonArea
installedSvcs cache.Indexer
installedEps cache.Indexer
leaderNamespace string
Expand Down Expand Up @@ -131,15 +134,9 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
klog.InfoS("Skip reconciling, no corresponding ServiceExport")
return ctrl.Result{}, nil
}
var commonArea commonarea.RemoteCommonArea
commonArea, r.localClusterID, err = r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return ctrl.Result{Requeue: true}, err
if requeue := r.checkRemoteCommonArea(); requeue {
return ctrl.Result{Requeue: true}, nil
}

r.leaderNamespace = commonArea.GetNamespace()
r.leaderClusterID = string(commonArea.GetClusterID())

var svcExport k8smcsv1alpha1.ServiceExport
svcObj, svcInstalled, _ := r.installedSvcs.GetByKey(req.String())
epsObj, epsInstalled, _ := r.installedEps.GetByKey(req.String())
Expand All @@ -150,11 +147,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// When controller restarts, the Service is not in cache, but it is still possible
// we need to remove ResourceExports. So leave it to the caller to check the 'svcInstalled'
// before deletion or try to delete any way.
err = r.handleServiceDeleteEvent(ctx, req, commonArea)
err = r.handleServiceDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
err = r.handleEndpointDeleteEvent(ctx, req, commonArea)
err = r.handleEndpointDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +296,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !svcNoChange {
klog.InfoS("Service has new changes, update ResourceExport", "service", req.String(),
"resourceexport", svcExportNSName)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, commonArea)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Service change", "service", req.String())
return ctrl.Result{}, err
Expand All @@ -314,7 +311,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !epNoChange {
klog.InfoS("Endpoints have new change, update ResourceExport", "endpoints",
req.String(), "resourceexport", epExportNSName)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, commonArea)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String())
return ctrl.Result{}, err
Expand All @@ -324,6 +321,24 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

// checkRemoteCommonArea initializes remoteCommonArea for the reconciler if necessary,
// or tells the Reconcile function to requeue if the remoteCommonArea is not ready.
func (r *ServiceExportReconciler) checkRemoteCommonArea() bool {
r.mutex.Lock()
defer r.mutex.Unlock()

if r.remoteCommonArea == nil {
commonArea, localClusterID, _ := r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return true
}
r.leaderClusterID, r.localClusterID = string(commonArea.GetClusterID()), localClusterID
r.leaderNamespace = commonArea.GetNamespace()
r.remoteCommonArea = commonArea
}
return false
}

func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea) error {
svcResExportName := getResourceExportName(r.localClusterID, req, "service")
Expand Down

0 comments on commit 0293390

Please sign in to comment.