From 5f2d5e905c3a7f0d6c70c12337c7d84c0f06d988 Mon Sep 17 00:00:00 2001 From: hujiajing Date: Tue, 28 Jun 2022 23:48:33 +0800 Subject: [PATCH] Auto update ClusterSet in leader cluster Signed-off-by: hujiajing --- .../antrea-multicluster-leader-namespaced.yml | 6 + .../cmd/multicluster-controller/leader.go | 8 +- .../memberclusterannounce_webhook.go | 64 ++++-- .../memberclusterannounce_webhook_test.go | 196 +++++++++++++++++- .../commonarea/remote_common_area.go | 1 + .../memberclusterannounce_controller.go | 127 ++++++++++-- .../memberclusterannounce_controller_test.go | 71 +++++++ 7 files changed, 420 insertions(+), 53 deletions(-) diff --git a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml index 5ebb7c78cdc..f8a51ff4e1a 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml @@ -23,6 +23,12 @@ metadata: name: antrea-mc-controller-role namespace: antrea-multicluster rules: +- apiGroups: + - "" + resources: + - serviceaccounts + verbs: + - get - apiGroups: - "" resources: diff --git a/multicluster/cmd/multicluster-controller/leader.go b/multicluster/cmd/multicluster-controller/leader.go index 72addf032e9..2321622d1ea 100644 --- a/multicluster/cmd/multicluster-controller/leader.go +++ b/multicluster/cmd/multicluster-controller/leader.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook" multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" @@ -63,10 +64,15 @@ func runLeader(o *Options) error { if err = memberClusterStatusManager.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating MemberClusterAnnounce controller: %v", err) } + + noCachedClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) + if err != nil { + return err + } hookServer := mgr.GetWebhookServer() hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-memberclusterannounce", &webhook.Admission{Handler: &memberClusterAnnounceValidator{ - Client: mgr.GetClient(), + Client: noCachedClient, namespace: env.GetPodNamespace()}}) clusterSetReconciler := &multiclustercontrollers.LeaderClusterSetReconciler{ diff --git a/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go b/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go index de7253b4a7c..0c2ad4e70df 100644 --- a/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go +++ b/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go @@ -18,9 +18,11 @@ package main import ( "context" - "fmt" + "encoding/json" "net/http" + admissionv1 "k8s.io/api/admission/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apiserver/pkg/authentication/serviceaccount" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -54,33 +56,53 @@ func (v *memberClusterAnnounceValidator) Handle(ctx context.Context, req admissi return admission.Errored(http.StatusBadRequest, err) } - // read the ClusterSet info - clusterSetList := &multiclusterv1alpha1.ClusterSetList{} - if err := v.Client.List(context.TODO(), clusterSetList, client.InNamespace(v.namespace)); err != nil { - klog.ErrorS(err, "Error reading ClusterSet", "Namespace", v.namespace) + serviceAccount := &v1.ServiceAccount{} + if err := v.Client.Get(ctx, client.ObjectKey{Namespace: v.namespace, Name: saName}, serviceAccount); err != nil { + klog.ErrorS(err, "Error getting ServiceAccount", "ServiceAccount", saName, "Namespace", v.namespace, "MemberClusterAnnounce", klog.KObj(memberClusterAnnounce)) return admission.Errored(http.StatusPreconditionFailed, err) } - if len(clusterSetList.Items) != 1 { - return admission.Errored(http.StatusPreconditionFailed, - fmt.Errorf("invalid ClusterSet config in the leader cluster, please contact your administrator")) + var newObj *multiclusterv1alpha1.MemberClusterAnnounce + var oldObj *multiclusterv1alpha1.MemberClusterAnnounce + if req.Object.Raw != nil { + if err := json.Unmarshal(req.Object.Raw, &newObj); err != nil { + klog.ErrorS(err, "Error while decoding new MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberClusterAnnounce)) + return admission.Errored(http.StatusBadRequest, err) + } } - - clusterSet := clusterSetList.Items[0] - if clusterSet.Name == memberClusterAnnounce.ClusterSetID { - for _, member := range clusterSet.Spec.Members { - if member.ClusterID == memberClusterAnnounce.ClusterID { - // validate the ServiceAccount used is correct - if member.ServiceAccount == saName { - return admission.Allowed("") - } else { - return admission.Denied("Member does not have permissions") - } - } + if req.OldObject.Raw != nil { + if err := json.Unmarshal(req.OldObject.Raw, &oldObj); err != nil { + klog.ErrorS(err, "Error while decoding old MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberClusterAnnounce)) + return admission.Errored(http.StatusBadRequest, err) } } - return admission.Denied("Unknown member") + switch req.Operation { + case admissionv1.Create: + // read the ClusterSet info + clusterSetList := &multiclusterv1alpha1.ClusterSetList{} + if err := v.Client.List(context.TODO(), clusterSetList, client.InNamespace(v.namespace)); err != nil { + klog.ErrorS(err, "Error reading ClusterSet", "Namespace", v.namespace) + return admission.Errored(http.StatusPreconditionFailed, err) + } + + clusterSet := clusterSetList.Items[0] + if clusterSet.Name != memberClusterAnnounce.ClusterSetID { + return admission.Denied("Unknown ClusterSet ID") + } + if clusterSet.Spec.Leaders[0].ClusterID != memberClusterAnnounce.LeaderClusterID { + return admission.Denied("Leader cluster ID in the MemberClusterAnnounce does not match that in the ClusterSet") + } + return admission.Allowed("") + case admissionv1.Update: + // Member cluster will never change ClusterSet ID in MemberClusterAnnounce + if newObj.ClusterSetID != oldObj.ClusterSetID || newObj.LeaderClusterID != oldObj.LeaderClusterID { + return admission.Denied("ClusterSetID or LeaderClusterID cannot be changed") + } + return admission.Allowed("") + default: + return admission.Allowed("") + } } func (v *memberClusterAnnounceValidator) InjectDecoder(d *admission.Decoder) error { diff --git a/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook_test.go b/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook_test.go index c4bdb659716..bb664570fb9 100644 --- a/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook_test.go +++ b/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/admission/v1" authenticationv1 "k8s.io/api/authentication/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -63,11 +64,28 @@ func setup() { }, } + existingServiceAccounts := &corev1.ServiceAccountList{ + Items: []corev1.ServiceAccount{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "east-access-sa", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "west-access-sa", + }, + }, + }, + } + newScheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) utilruntime.Must(k8smcsv1alpha1.AddToScheme(newScheme)) utilruntime.Must(mcsv1alpha1.AddToScheme(newScheme)) - fakeClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects(existingClusterSet).Build() + fakeClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects(existingClusterSet).WithLists(existingServiceAccounts).Build() mcaWebhookUnderTest = &memberClusterAnnounceValidator{ Client: fakeClient, @@ -130,7 +148,56 @@ func TestWebhookAllow(t *testing.T) { assert.Equal(t, true, response.Allowed) } -func TestWebhookDeniedUnknownMember(t *testing.T) { +func TestWebhookJoinAllow(t *testing.T) { + setup() + + mca := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-south", + Namespace: "mcs1", + }, + ClusterID: "south", + ClusterSetID: "clusterset1", + LeaderClusterID: "leader1", + } + b, _ := j.Marshal(mca) + + req := admission.Request{ + AdmissionRequest: v1.AdmissionRequest{ + UID: "07e52e8d-4513-11e9-a716-42010a800270", + Kind: metav1.GroupVersionKind{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Kind: "MemberClusterAnnounce", + }, + Resource: metav1.GroupVersionResource{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Resource: "memberclusterannounces", + }, + Name: "member-announce-from-south", + Namespace: "mcs1", + Operation: v1.Create, + Object: runtime.RawExtension{ + Raw: b, + }, + UserInfo: authenticationv1.UserInfo{ + Username: "system:serviceaccount:mcs1:east-access-sa", + UID: "4842eb60-68e3-4e38-adad-3abfd6117241", + Groups: []string{ + "system:serviceaccounts", + "system:serviceaccounts:mcs1", + "system:authenticated", + }, + }, + }, + } + + response := mcaWebhookUnderTest.Handle(context.Background(), req) + assert.Equal(t, true, response.Allowed) +} + +func TestWebhookDeniedDifferentClusterSet(t *testing.T) { setup() mca := &mcsv1alpha1.MemberClusterAnnounce{ @@ -139,7 +206,7 @@ func TestWebhookDeniedUnknownMember(t *testing.T) { Namespace: "mcs1", }, ClusterID: "north", - ClusterSetID: "clusterset1", + ClusterSetID: "another-clusterset", LeaderClusterID: "leader1", } b, _ := j.Marshal(mca) @@ -177,18 +244,66 @@ func TestWebhookDeniedUnknownMember(t *testing.T) { response := mcaWebhookUnderTest.Handle(context.Background(), req) assert.Equal(t, false, response.Allowed) - assert.Equal(t, metav1.StatusReason("Unknown member"), response.Result.Reason) } -func TestWebhookDeniedNoPermission(t *testing.T) { +func TestWebhookDeniedDifferentLeaderCluster(t *testing.T) { setup() mca := &mcsv1alpha1.MemberClusterAnnounce{ ObjectMeta: metav1.ObjectMeta{ - Name: "member-announce-from-east", + Name: "member-announce-from-north", Namespace: "mcs1", }, - ClusterID: "east", + ClusterID: "north", + ClusterSetID: "clusterset1", + LeaderClusterID: "different-leader", + } + b, _ := j.Marshal(mca) + + req := admission.Request{ + AdmissionRequest: v1.AdmissionRequest{ + UID: "07e52e8d-4513-11e9-a716-42010a800270", + Kind: metav1.GroupVersionKind{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Kind: "MemberClusterAnnounce", + }, + Resource: metav1.GroupVersionResource{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Resource: "memberclusterannounces", + }, + Name: "member-announce-from-north", + Namespace: "mcs1", + Operation: v1.Create, + Object: runtime.RawExtension{ + Raw: b, + }, + UserInfo: authenticationv1.UserInfo{ + Username: "system:serviceaccount:mcs1:east-access-sa", + UID: "4842eb60-68e3-4e38-adad-3abfd6117241", + Groups: []string{ + "system:serviceaccounts", + "system:serviceaccounts:mcs1", + "system:authenticated", + }, + }, + }, + } + + response := mcaWebhookUnderTest.Handle(context.Background(), req) + assert.Equal(t, false, response.Allowed) +} + +func TestWebhookDeniedUnknownServiceAccount(t *testing.T) { + setup() + + mca := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-south", + Namespace: "mcs1", + }, + ClusterID: "south", ClusterSetID: "clusterset1", LeaderClusterID: "leader1", } @@ -207,14 +322,76 @@ func TestWebhookDeniedNoPermission(t *testing.T) { Version: "v1alpha1", Resource: "memberclusterannounces", }, - Name: "member-announce-from-east", + Name: "member-announce-from-south", Namespace: "mcs1", Operation: v1.Create, Object: runtime.RawExtension{ Raw: b, }, UserInfo: authenticationv1.UserInfo{ - Username: "system:serviceaccount:mcs1:north-access-sa", + Username: "system:serviceaccount:mcs1:unknown-access-sa", + UID: "4842eb60-68e3-4e38-adad-3abfd6117241", + Groups: []string{ + "system:serviceaccounts", + "system:serviceaccounts:mcs1", + "system:authenticated", + }, + }, + }, + } + + response := mcaWebhookUnderTest.Handle(context.Background(), req) + assert.Equal(t, false, response.Allowed) +} + +func TestUpdateClusterSetID(t *testing.T) { + setup() + + mca := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-south", + Namespace: "mcs1", + }, + ClusterID: "south", + ClusterSetID: "clusterset-changed", + LeaderClusterID: "leader1", + } + oldMca := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-south", + Namespace: "mcs1", + }, + ClusterID: "south", + ClusterSetID: "clusterset", + LeaderClusterID: "leader1", + } + b, _ := j.Marshal(mca) + old, _ := j.Marshal(oldMca) + + req := admission.Request{ + AdmissionRequest: v1.AdmissionRequest{ + UID: "07e52e8d-4513-11e9-a716-42010a800270", + Kind: metav1.GroupVersionKind{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Kind: "MemberClusterAnnounce", + }, + Resource: metav1.GroupVersionResource{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Resource: "memberclusterannounces", + }, + Name: "member-announce-from-south", + Namespace: "mcs1", + Operation: v1.Update, + Object: runtime.RawExtension{ + Raw: b, + }, + OldObject: runtime.RawExtension{ + Raw: old, + }, + UserInfo: authenticationv1.UserInfo{ + Username: "system:serviceaccount:mcs1:east-access-sa", UID: "4842eb60-68e3-4e38-adad-3abfd6117241", Groups: []string{ "system:serviceaccounts", @@ -227,5 +404,4 @@ func TestWebhookDeniedNoPermission(t *testing.T) { response := mcaWebhookUnderTest.Handle(context.Background(), req) assert.Equal(t, false, response.Allowed) - assert.Equal(t, metav1.StatusReason("Member does not have permissions"), response.Result.Reason) } diff --git a/multicluster/controllers/multicluster/commonarea/remote_common_area.go b/multicluster/controllers/multicluster/commonarea/remote_common_area.go index eabde6cd519..2282f2b9e6a 100644 --- a/multicluster/controllers/multicluster/commonarea/remote_common_area.go +++ b/multicluster/controllers/multicluster/commonarea/remote_common_area.go @@ -247,6 +247,7 @@ func (r *remoteCommonArea) SendMemberAnnounce() error { localClusterMemberAnnounce.Name = "member-announce-from-" + r.GetLocalClusterID() localClusterMemberAnnounce.Namespace = r.Namespace localClusterMemberAnnounce.ClusterSetID = string(r.ClusterSetID) + localClusterMemberAnnounce.LeaderClusterID = string(r.GetClusterID()) if err := r.Create(context.TODO(), &localClusterMemberAnnounce, &client.CreateOptions{}); err != nil { klog.ErrorS(err, "Error creating MemberClusterAnnounce", "cluster", r.GetClusterID()) return err diff --git a/multicluster/controllers/multicluster/memberclusterannounce_controller.go b/multicluster/controllers/multicluster/memberclusterannounce_controller.go index 9888ed25d39..edea405239b 100644 --- a/multicluster/controllers/multicluster/memberclusterannounce_controller.go +++ b/multicluster/controllers/multicluster/memberclusterannounce_controller.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -43,6 +44,8 @@ var ( ReasonConnectedLeader = "ConnectedLeader" ReasonNotLeader = "NotLeader" + MemberClusterAnnounceFinalizer = "memberclusterannounce.finalizer.antrea.io" + TimerInterval = 5 * time.Second ConnectionTimeout = 3 * TimerInterval ) @@ -65,7 +68,7 @@ type MemberClusterAnnounceReconciler struct { client.Client Scheme *runtime.Scheme - mapLock sync.Mutex + mapLock sync.RWMutex memberStatus map[common.ClusterID]*multiclusterv1alpha1.ClusterStatus timerData map[common.ClusterID]*timerData } @@ -106,8 +109,7 @@ func (r *MemberClusterAnnounceReconciler) Reconcile(ctx context.Context, req ctr } r.mapLock.Lock() - defer r.mapLock.Unlock() - + // If the member is not found in timerData, it means the member is not added to the ClusterSet or was deleted. if data, ok := r.timerData[common.ClusterID(memberAnnounce.ClusterID)]; ok { klog.V(2).InfoS("Reset lastUpdateTime", "cluster", memberAnnounce.ClusterID) // Reset lastUpdateTime and connectedLeader for this member. @@ -122,26 +124,51 @@ func (r *MemberClusterAnnounceReconciler) Reconcile(ctx context.Context, req ctr memberAnnounce.ClusterID) data.leaderStatus.reason = ReasonNotLeader // Check whether this local cluster is the leader for this member. - clusterClaimList := &multiclusterv1alpha2.ClusterClaimList{} - if err := r.List(context.TODO(), clusterClaimList, client.InNamespace(req.Namespace)); err == nil { - for _, clusterClaim := range clusterClaimList.Items { - if clusterClaim.Name == multiclusterv1alpha2.WellKnownClusterClaimID && - clusterClaim.Value == memberAnnounce.LeaderClusterID { - data.leaderStatus.connectedLeader = v1.ConditionTrue - data.leaderStatus.message = fmt.Sprintf("Local cluster is the leader of member: %v", - memberAnnounce.ClusterID) - data.leaderStatus.reason = ReasonConnectedLeader - break - } + clusterClaim := &multiclusterv1alpha2.ClusterClaim{} + if err := r.Get(context.TODO(), types.NamespacedName{Name: multiclusterv1alpha2.WellKnownClusterClaimID, Namespace: req.Namespace}, clusterClaim); err == nil { + if clusterClaim.Value == memberAnnounce.LeaderClusterID { + data.leaderStatus.connectedLeader = v1.ConditionTrue + data.leaderStatus.message = fmt.Sprintf("Local cluster is the leader of member: %v", + memberAnnounce.ClusterID) + data.leaderStatus.reason = ReasonConnectedLeader } } + // If err != nil, probably ClusterClaims were deleted during the processing of MemberClusterAnnounce. + // Nothing to handle in this case and MemberClusterAnnounce will also be deleted soon. + // TODO: Add ClusterClaim webhook to make sure it cannot be deleted while ClusterSet is present. + } + } + r.mapLock.Unlock() + + finalizer := fmt.Sprintf("%s/%s", MemberClusterAnnounceFinalizer, memberAnnounce.ClusterID) + if !memberAnnounce.DeletionTimestamp.IsZero() { + if err := r.removeMemberFromClusterSet(memberAnnounce); err != nil { + klog.ErrorS(err, "Failed to remove member cluster from the ClusterSet", "cluster", memberAnnounce.ClusterID) + return ctrl.Result{}, err + } + memberAnnounce.Finalizers = common.RemoveStringFromSlice(memberAnnounce.Finalizers, finalizer) + if err := r.Update(context.TODO(), memberAnnounce); err != nil { + klog.ErrorS(err, "Failed to update MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberAnnounce)) + return ctrl.Result{}, err } - // If err != nil, probably ClusterClaims were deleted during the processing of MemberClusterAnnounce. - // Nothing to handle in this case and MemberClusterAnnounce will also be deleted soon. - // TODO: Add ClusterClaim webhook to make sure it cannot be deleted while ClusterSet is present. + + return ctrl.Result{}, nil + } + + if common.StringExistsInSlice(memberAnnounce.Finalizers, finalizer) { + return ctrl.Result{}, nil + } + klog.InfoS("Adding member cluster to ClusterSet", "cluster", memberAnnounce.ClusterID) + if err := r.addMemberToClusterSet(memberAnnounce); err != nil { + klog.ErrorS(err, "Failed to add member cluster to ClusterSet", "cluster", memberAnnounce.ClusterID) + return ctrl.Result{}, err + } + klog.InfoS("Adding finalizer to MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberAnnounce)) + memberAnnounce.Finalizers = append(memberAnnounce.Finalizers, finalizer) + if err := r.Update(context.TODO(), memberAnnounce); err != nil { + klog.ErrorS(err, "Failed to update MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberAnnounce)) + return ctrl.Result{}, err } - // Member not found. If this happens, the MemberClusterAnnounce should soon be deleted. - // Nothing to do here. return ctrl.Result{}, nil } @@ -272,8 +299,8 @@ func (r *MemberClusterAnnounceReconciler) RemoveMember(memberID common.ClusterID } func (r *MemberClusterAnnounceReconciler) GetMemberClusterStatuses() []multiclusterv1alpha1.ClusterStatus { - r.mapLock.Lock() - defer r.mapLock.Unlock() + r.mapLock.RLock() + defer r.mapLock.RUnlock() status := make([]multiclusterv1alpha1.ClusterStatus, len(r.memberStatus)) @@ -285,3 +312,61 @@ func (r *MemberClusterAnnounceReconciler) GetMemberClusterStatuses() []multiclus return status } + +func (r *MemberClusterAnnounceReconciler) addMemberToClusterSet(memberClusterAnnounce *multiclusterv1alpha1.MemberClusterAnnounce) error { + clusterSetID := memberClusterAnnounce.ClusterSetID + clusterSet := &multiclusterv1alpha1.ClusterSet{} + if err := r.Get(context.TODO(), types.NamespacedName{Namespace: memberClusterAnnounce.Namespace, Name: clusterSetID}, clusterSet); err != nil { + klog.ErrorS(err, "Failed to get ClusterSet in leader cluster", "ClusterSet", clusterSetID) + return err + } + + exist := false + // ClusterSet ID of MemberClusterAnnounce cannot change. It is guaranteed by the MemberClusterAnnounce webhook. + for _, member := range clusterSet.Spec.Members { + if member.ClusterID == memberClusterAnnounce.ClusterID { + exist = true + break + } + } + + if exist { + return fmt.Errorf("member cluster %s already exists in ClusterSet %s", memberClusterAnnounce.ClusterID, clusterSetID) + } + + clusterSet.Spec.Members = append(clusterSet.Spec.Members, multiclusterv1alpha1.MemberCluster{ClusterID: memberClusterAnnounce.ClusterID}) + if err := r.Update(context.TODO(), clusterSet); err != nil { + klog.ErrorS(err, "Failed to update ClusterSet in leader cluster", "ClusterSet", clusterSetID, "Namespace", clusterSet.Namespace) + return err + } + return nil +} + +func (r *MemberClusterAnnounceReconciler) removeMemberFromClusterSet(memberClusterAnnounce *multiclusterv1alpha1.MemberClusterAnnounce) error { + clusterSetID := memberClusterAnnounce.ClusterSetID + clusterSet := &multiclusterv1alpha1.ClusterSet{} + if err := r.Get(context.TODO(), types.NamespacedName{Namespace: memberClusterAnnounce.Namespace, Name: clusterSetID}, clusterSet); err != nil { + klog.ErrorS(err, "Failed to get ClusterSet in leader cluster", "ClusterSet", clusterSetID) + return err + } + + found := false + for i, member := range clusterSet.Spec.Members { + if member.ClusterID == memberClusterAnnounce.ClusterID { + found = true + clusterSet.Spec.Members = append(clusterSet.Spec.Members[:i], clusterSet.Spec.Members[i+1:]...) + break + } + } + if !found { + klog.InfoS("Member cluster not found in ClusterSet", "ClusterSet", clusterSetID, "cluster", memberClusterAnnounce.ClusterID) + return nil + } + + klog.InfoS("Removing member cluster from the ClusterSet", "cluster", memberClusterAnnounce.ClusterID, "ClusterSet", klog.KObj(clusterSet)) + if err := r.Update(context.TODO(), clusterSet); err != nil { + klog.ErrorS(err, "Failed to update ClusterSet in leader cluster", "ClusterSet", clusterSetID, "Namespace", clusterSet.Namespace) + return err + } + return nil +} diff --git a/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go b/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go index 5212878a54f..c7d07777d91 100644 --- a/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go +++ b/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go @@ -16,6 +16,7 @@ package multicluster import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -91,6 +92,76 @@ func setup() { mcaTestFakeRemoteClient, scheme) } +func TestAddMemberToClusterSet(t *testing.T) { + setup() + + memberCluster := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-south", + Namespace: "mcs1", + }, + ClusterSetID: "clusterset1", + ClusterID: "south", + } + + err := memberClusterAnnounceReconcilerUnderTest.addMemberToClusterSet(memberCluster) + assert.Equal(t, nil, err) + + clusterSet := &mcsv1alpha1.ClusterSet{} + err = mcaTestFakeRemoteClient.Get(context.TODO(), types.NamespacedName{Namespace: "mcs1", Name: "clusterset1"}, clusterSet) + assert.Equal(t, nil, err) + assert.Equal(t, 3, len(clusterSet.Spec.Members)) +} + +func TestAddDuplicateMember(t *testing.T) { + setup() + + memberCluster := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-east", + Namespace: "mcs1", + }, + ClusterSetID: "clusterset1", + ClusterID: "east", + } + + err := memberClusterAnnounceReconcilerUnderTest.addMemberToClusterSet(memberCluster) + exceptErr := fmt.Errorf("member cluster east already exists in ClusterSet clusterset1") + + assert.Equal(t, exceptErr, err) +} + +func TestRemoveMemberCluster(t *testing.T) { + setup() + + memberCluster := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-east", + Namespace: "mcs1", + }, + ClusterSetID: "clusterset1", + ClusterID: "east", + } + err := memberClusterAnnounceReconcilerUnderTest.removeMemberFromClusterSet(memberCluster) + assert.Equal(t, nil, err) +} + +func TestRemoveMemberClusterNotExist(t *testing.T) { + setup() + + memberCluster := &mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-not-exist", + Namespace: "mcs1", + }, + ClusterSetID: "clusterset1", + ClusterID: "not-exist", + } + err := memberClusterAnnounceReconcilerUnderTest.removeMemberFromClusterSet(memberCluster) + + assert.Equal(t, nil, err) +} + func TestStatusAfterAdd(t *testing.T) { setup()