From 31ab910269950d4adb4f1b12d778ef55431e7a79 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 27 Aug 2024 19:48:23 -0400 Subject: [PATCH] Handle namespace recreated in resource syncer If a namespace is deleted and the namespace was previously seen, ie there was at least one resource previously synced in that namespace, then retrieve all the cached resource keys in the namespace and add them to the missingNamespaces set. If the namespace is later recreated, then those resources will be re-queued and re-synced. Signed-off-by: Tom Pantelis --- pkg/federate/fake/federator.go | 33 ++++++++-- pkg/syncer/resource_syncer.go | 63 +++++++++++++++--- pkg/syncer/resource_syncer_test.go | 102 +++++++++++++++++++++-------- 3 files changed, 156 insertions(+), 42 deletions(-) diff --git a/pkg/federate/fake/federator.go b/pkg/federate/fake/federator.go index 057724a2..b9da275e 100644 --- a/pkg/federate/fake/federator.go +++ b/pkg/federate/fake/federator.go @@ -26,6 +26,7 @@ import ( "time" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/federate" "github.com/submariner-io/admiral/pkg/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -35,6 +36,7 @@ type Federator struct { lock sync.Mutex distribute chan *unstructured.Unstructured delete chan *unstructured.Unstructured + delegator federate.Federator failOnDistribute error failOnDelete error ResetOnFailure atomic.Bool @@ -50,6 +52,13 @@ func New() *Federator { return f } +func (f *Federator) SetDelegator(d federate.Federator) { + f.lock.Lock() + defer f.lock.Unlock() + + f.delegator = d +} + func (f *Federator) FailOnDistribute(err error) { f.lock.Lock() defer f.lock.Unlock() @@ -64,7 +73,7 @@ func (f *Federator) FailOnDelete(err error) { f.failOnDelete = err } -func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { +func (f *Federator) Distribute(ctx context.Context, obj runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() @@ -77,12 +86,18 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { return err } - f.distribute <- resource.MustToUnstructured(obj) + if f.delegator != nil { + err = f.delegator.Distribute(ctx, obj) + } - return nil + if err == nil { + f.distribute <- resource.MustToUnstructured(obj) + } + + return err } -func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { +func (f *Federator) Delete(ctx context.Context, obj runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() @@ -95,9 +110,15 @@ func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { return err } - f.delete <- resource.MustToUnstructured(obj) + if f.delegator != nil { + err = f.delegator.Delete(ctx, obj) + } + + if err == nil { + f.delete <- resource.MustToUnstructured(obj) + } - return nil + return err } func (f *Federator) VerifyDistribute(expected runtime.Object) { diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index eb3e8bda..72cf929a 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -48,7 +48,8 @@ import ( const ( OrigNamespaceLabelKey = "submariner-io/originatingNamespace" - namespaceKey = "$namespace$" + namespaceAddedKey = "$namespace-added$" + namespaceDeletedKey = "$namespace-deleted$" ) type SyncDirection int @@ -316,7 +317,13 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { if config.NamespaceInformer != nil { _, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ AddFunc: func(obj interface{}, _ bool) { - syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String())) + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceAddedKey, resourceUtil.MustToMeta(obj).GetName()).String())) + }, + DeleteFunc: func(obj interface{}) { + objName, err := cache.DeletionHandlingObjectToName(obj) + utilruntime.Must(err) + + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceDeletedKey, objName.Name).String())) }, }) if err != nil { @@ -520,11 +527,16 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any } func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) { - if ns == namespaceKey { + if ns == namespaceAddedKey { r.handleNamespaceAdded(name) return false, nil } + if ns == namespaceDeletedKey { + r.handleNamespaceDeleted(name) + return false, nil + } + var ( requeue bool err error @@ -600,6 +612,8 @@ func (r *resourceSyncer) handleCreatedOrUpdated(key string, created *unstructure return true, errors.Wrapf(err, "error distributing resource %q", key) } + r.recordNamespaceSeen(resource.GetNamespace()) + if r.syncCounter != nil { r.syncCounter.With(prometheus.Labels{ DirectionLabel: r.config.Direction.String(), @@ -796,6 +810,13 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr return u } +func (r *resourceSyncer) recordNamespaceSeen(namespace string) { + _, ok := r.missingNamespaces[namespace] + if !ok { + r.missingNamespaces[namespace] = set.New[string]() + } +} + func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace) @@ -803,13 +824,8 @@ func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { return } - keys, ok := r.missingNamespaces[namespace] - if !ok { - keys = set.New[string]() - r.missingNamespaces[namespace] = keys - } - - keys.Insert(key) + r.recordNamespaceSeen(namespace) + r.missingNamespaces[namespace].Insert(key) } func (r *resourceSyncer) handleNamespaceAdded(namespace string) { @@ -826,6 +842,33 @@ func (r *resourceSyncer) handleNamespaceAdded(namespace string) { } } +func (r *resourceSyncer) handleNamespaceDeleted(namespace string) { + keys, ok := r.missingNamespaces[namespace] + if !ok { + return + } + + for _, key := range r.store.ListKeys() { + obj, exists, _ := r.store.GetByKey(key) + if !exists { + continue + } + + resource, _, _ := r.transform(r.assertUnstructured(obj), key, Create) + if resource == nil { + continue + } + + if resource.GetNamespace() == namespace { + keys.Insert(key) + } + } + + if keys.Len() > 0 { + r.log.Infof("Syncer %q: namespace %q deleted - recorded %d missing resources", r.config.Name, namespace, keys.Len()) + } +} + func getClusterIDLabel(resource runtime.Object) (string, bool) { clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey] return clusterID, found diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 2aca96ba..04676162 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -27,6 +27,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" + fakereactor "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/admiral/pkg/federate" "github.com/submariner-io/admiral/pkg/federate/fake" . "github.com/submariner-io/admiral/pkg/gomega" resourceutils "github.com/submariner-io/admiral/pkg/resource" @@ -43,11 +45,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - fakeK8s "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -1236,50 +1236,66 @@ func testWithSharedInformer() { } func testWithMissingNamespace() { - const transformedNamespace = "transformed-ns" + const ( + transformedNamespace = "transformed-ns" + noTransform = "no-transform" + ) d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) - var ( - k8sClient kubernetes.Interface - nsInformerFactory informers.SharedInformerFactory - ) + namespaceClient := func() dynamic.ResourceInterface { + return d.config.SourceClient.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Namespace(metav1.NamespaceNone) + } + + createNamespace := func(name string) { + test.CreateResource(namespaceClient(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }) + } BeforeEach(func() { d.config.Transform = func(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { - obj = obj.DeepCopyObject() - resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + if resourceutils.MustToMeta(obj).GetName() == noTransform { + return nil, false + } else if resourceutils.MustToMeta(obj).GetNamespace() == test.LocalNamespace { + obj = obj.DeepCopyObject() + resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + } return obj, false } - d.federator.FailOnDistribute(apierrors.NewNotFound(schema.GroupResource{ - Resource: "namespaces", - }, transformedNamespace)) - - k8sClient = fakeK8s.NewClientset() - nsInformerFactory = informers.NewSharedInformerFactory(k8sClient, 0) - d.config.NamespaceInformer = nsInformerFactory.Core().V1().Namespaces().Informer() + d.config.NamespaceInformer = cache.NewSharedInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return namespaceClient().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return namespaceClient().Watch(context.TODO(), options) + }, + }, resourceutils.MustToUnstructured(&corev1.Namespace{}), 0) }) JustBeforeEach(func() { - nsInformerFactory.Start(d.stopCh) + d.federator.SetDelegator(federate.NewCreateFederator(d.config.SourceClient, d.config.RestMapper, transformedNamespace)) + + createNamespace(test.LocalNamespace) + + fakereactor.AddVerifyNamespaceReactor(&d.config.SourceClient.(*fakeClient.FakeDynamicClient).Fake, "pods") + + if d.config.NamespaceInformer != nil { + go d.config.NamespaceInformer.Run(d.stopCh) + } }) Specify("distribute should eventually succeed when the namespace is created", func() { resource := test.CreateResource(d.sourceClient, d.resource) d.federator.VerifyNoDistribute() - d.federator.FailOnDistribute(nil) - By("Creating namespace") - _, err := k8sClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: transformedNamespace, - }, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + createNamespace(transformedNamespace) resource.SetNamespace(transformedNamespace) d.federator.VerifyDistribute(resource) @@ -1295,6 +1311,40 @@ func testWithMissingNamespace() { d.federator.VerifyNoDistribute() }) }) + + Context("after a namespace is created and distribute succeeds", func() { + const otherNS = "other-ns" + + JustBeforeEach(func() { + createNamespace(transformedNamespace) + createNamespace(otherNS) + }) + + It("should eventually redistribute when the namespace is recreated", func() { + resource := test.CreateResource(d.sourceClient, d.resource) + resource.SetNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + + other := d.resource.DeepCopy() + other.Name = noTransform + test.CreateResource(d.sourceClient, other) + + other = d.resource.DeepCopy() + other.Namespace = otherNS + test.CreateResource(d.config.SourceClient.Resource( + *test.GetGroupVersionResourceFor(d.config.RestMapper, other)).Namespace(otherNS), other) + + By("Deleting namespace") + + err := namespaceClient().Delete(context.TODO(), transformedNamespace, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + By("Recreating namespace") + + createNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + }) + }) } func testEventOrdering() {