Skip to content

Commit

Permalink
Add stale controller for leader cluster to GC MemberClusterAnnounce
Browse files Browse the repository at this point in the history
Signed-off-by: hujiajing <hjiajing@vmware.com>
  • Loading branch information
hjiajing committed Aug 2, 2022
1 parent 694fc39 commit c32fd04
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 10 deletions.
11 changes: 11 additions & 0 deletions multicluster/cmd/multicluster-controller/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
multiclustercontrollers "antrea.io/antrea/multicluster/controllers/multicluster"
"antrea.io/antrea/pkg/log"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/env"
)

Expand Down Expand Up @@ -87,6 +88,16 @@ func runLeader(o *Options) error {
if err = (&multiclusterv1alpha1.ResourceExport{}).SetupWebhookWithManager(mgr); err != nil {
return fmt.Errorf("error creating ResourceExport webhook: %v", err)
}
stopCh := signals.RegisterSignalHandlers()
staleController := multiclustercontrollers.NewStaleResCleanupController(
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
nil,
multiclustercontrollers.LeaderCluster,
)

go staleController.Run(stopCh)

klog.InfoS("Leader MC Controller Starting Manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
commonAreaGetter)
commonAreaGetter,
multiclustercontrollers.MemberCluster,
)

go staleController.Run(stopCh)
// Member runs ResourceImportReconciler from RemoteCommonArea only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (r *remoteCommonArea) SendMemberAnnounce() error {
}
// Add timestamp to force update on MemberClusterAnnounce. Leader cluster requires
// periodic updates to detect connectivity. Without this, no-op updates will be ignored.
localClusterMemberAnnounce.Annotations[TimestampAnnotationKey] = time.Now().String()
localClusterMemberAnnounce.Annotations[TimestampAnnotationKey] = time.Now().Format(time.RFC3339)
if err := r.Update(context.TODO(), &localClusterMemberAnnounce, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Error updating MemberClusterAnnounce", "cluster", r.GetClusterID())
return err
Expand Down
59 changes: 56 additions & 3 deletions multicluster/controllers/multicluster/stale_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,26 @@ import (
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

const (
LeaderCluster = "leader"
MemberCluster = "member"

memberClusterAnnounceStaleTime = 5 * time.Minute
)

// StaleResCleanupController will clean up ServiceImport, MC Service, ACNP, ClusterInfoImport resources
// if no corresponding ResourceImports in the leader cluster and remove stale ResourceExports
// in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster.
// It will only run in the member cluster.
// in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster when it runs in
// the member cluster.
// It will clean up stale MemberClusterAnnounce resources in the leader cluster if no corresponding member
// cluster in the ClusterSet.Spec.Members when it runs in the leader cluster.
type StaleResCleanupController struct {
client.Client
Scheme *runtime.Scheme
localClusterID string
commonAreaGetter RemoteCommonAreaGetter
namespace string
clusterRole string
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}
Expand All @@ -53,12 +63,15 @@ func NewStaleResCleanupController(
Client client.Client,
Scheme *runtime.Scheme,
namespace string,
commonAreaGetter RemoteCommonAreaGetter) *StaleResCleanupController {
commonAreaGetter RemoteCommonAreaGetter,
clusterRole string,
) *StaleResCleanupController {
reconciler := &StaleResCleanupController{
Client: Client,
Scheme: Scheme,
namespace: namespace,
commonAreaGetter: commonAreaGetter,
clusterRole: clusterRole,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StaleResCleanupController"),
}
return reconciler
Expand All @@ -70,6 +83,20 @@ func NewStaleResCleanupController(
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;delete

func (c *StaleResCleanupController) cleanup() error {
switch c.clusterRole {
case LeaderCluster:
return c.cleanupStaleResourcesOnLeader()
case MemberCluster:
return c.cleanupStaleResourcesOnMember()
}
return nil
}

func (c *StaleResCleanupController) cleanupStaleResourcesOnLeader() error {
return c.cleanupMemberClusterAnnounces()
}

func (c *StaleResCleanupController) cleanupStaleResourcesOnMember() error {
var err error
var commonArea commonarea.RemoteCommonArea
commonArea, c.localClusterID, err = c.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
Expand Down Expand Up @@ -270,6 +297,32 @@ func (c *StaleResCleanupController) cleanupClusterInfoResourceExport(commonArea
return nil
}

func (c *StaleResCleanupController) cleanupMemberClusterAnnounces() error {
memberClusterAnnounceList := &mcsv1alpha1.MemberClusterAnnounceList{}
if err := c.List(ctx, memberClusterAnnounceList, &client.ListOptions{}); err != nil {
return err
}

for _, m := range memberClusterAnnounceList.Items {
memberClusterAnnounce := m
lastUpdateTime, err := time.Parse(time.RFC3339, memberClusterAnnounce.Annotations[commonarea.TimestampAnnotationKey])
if err == nil && time.Now().Sub(lastUpdateTime) < memberClusterAnnounceStaleTime {
continue
}
if err == nil {
klog.InfoS("Cleaning up stale MemberClusterAnnounce. It has not been updated within the agreed period", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce), "agreedPeriod", memberClusterAnnounceStaleTime)
} else {
klog.InfoS("Cleaning up stale MemberClusterAnnounce. The latest update time is not in RFC3339 format", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce))
}

if err := c.Client.Delete(ctx, &memberClusterAnnounce, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete stale MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce))
return err
}
}
return nil
}

// Enqueue will be called after StaleResCleanupController is initialized.
func (c *StaleResCleanupController) Enqueue() {
// The key can be anything as we only have single item.
Expand Down
125 changes: 121 additions & 4 deletions multicluster/controllers/multicluster/stale_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package multicluster
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -95,7 +97,7 @@ func TestStaleController_CleanupService(t *testing.T) {
commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default")
mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonArea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale Service and ServiceImport but got err = %v", err)
}
Expand Down Expand Up @@ -190,7 +192,7 @@ func TestStaleController_CleanupACNP(t *testing.T) {

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonArea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale ACNPs but got err = %v", err)
}
Expand Down Expand Up @@ -319,7 +321,7 @@ func TestStaleController_CleanupResourceExport(t *testing.T) {

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonArea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale ResourceExports but got err = %v", err)
}
Expand Down Expand Up @@ -400,7 +402,7 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) {

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonarea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale ClusterInfoImport but got err = %v", err)
}
Expand All @@ -418,3 +420,118 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) {
})
}
}

func TestStaleController_CleanupMemberClusterAnnounce(t *testing.T) {
tests := []struct {
name string
memberClusterAnnounceList *mcsv1alpha1.MemberClusterAnnounceList
clusterSet *mcsv1alpha1.ClusterSetList
exceptMemberClusterAnnounceNumber int
}{
{
name: "no MemberClusterAnnounce to clean up when there is no resource",
clusterSet: &mcsv1alpha1.ClusterSetList{},
memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{},
exceptMemberClusterAnnounceNumber: 0,
},
{
name: "no MemberClusterAnnounce to clean up when the resource has a valid update time",
exceptMemberClusterAnnounceNumber: 1,
clusterSet: &mcsv1alpha1.ClusterSetList{
Items: []mcsv1alpha1.ClusterSet{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "clusterset",
},
Spec: mcsv1alpha1.ClusterSetSpec{
Members: []mcsv1alpha1.MemberCluster{
{
ClusterID: "cluster-a",
},
},
},
},
},
},
memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{
Items: []mcsv1alpha1.MemberClusterAnnounce{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "member-cluster-from-cluster-a",
Annotations: map[string]string{
commonarea.TimestampAnnotationKey: time.Now().Format(time.RFC3339),
},
},
ClusterID: "cluster-a",
},
},
},
},
{
name: "clean up outdated MemberClusterAnnounce",
exceptMemberClusterAnnounceNumber: 1,
clusterSet: &mcsv1alpha1.ClusterSetList{
Items: []mcsv1alpha1.ClusterSet{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "clusterset",
},
Spec: mcsv1alpha1.ClusterSetSpec{
Members: []mcsv1alpha1.MemberCluster{
{
ClusterID: "cluster-a",
},
{
ClusterID: "cluster-outdated",
},
},
},
},
},
},
memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{
Items: []mcsv1alpha1.MemberClusterAnnounce{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "member-cluster-from-cluster-a",
Annotations: map[string]string{
commonarea.TimestampAnnotationKey: time.Now().Format(time.RFC3339),
},
},
ClusterID: "cluster-a",
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "member-cluster-from-cluster-outdated",
Annotations: map[string]string{
commonarea.TimestampAnnotationKey: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
},
},
ClusterID: "cluster-outdated",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.memberClusterAnnounceList).WithLists(tt.clusterSet).Build()

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, LeaderCluster)
assert.Equal(t, nil, c.cleanup())

memberClusterAnnounceList := &mcsv1alpha1.MemberClusterAnnounceList{}
if err := fakeClient.List(context.TODO(), memberClusterAnnounceList, &client.ListOptions{}); err != nil {
t.Errorf("Should list MemberClusterAnnounce successfully but got err = %v", err)
}

assert.Equal(t, tt.exceptMemberClusterAnnounceNumber, len(memberClusterAnnounceList.Items))
})
}
}
4 changes: 3 additions & 1 deletion multicluster/test/integration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ var _ = BeforeSuite(func() {
k8sManager.GetClient(),
k8sManager.GetScheme(),
"default",
clusterSetReconciler)
clusterSetReconciler,
multiclustercontrollers.MemberCluster,
)

go staleController.Run(stopCh)
// Make sure to trigger clean up process every 5 seconds
Expand Down

0 comments on commit c32fd04

Please sign in to comment.