From 1deae10d98e386664bbbb482e9e7ed49d22dcf0e Mon Sep 17 00:00:00 2001 From: novad03 Date: Wed, 2 Aug 2023 10:45:35 -0400 Subject: [PATCH] Re-create GlobalIngressIP when service is re-created Fixes https://github.com/submariner-io/submariner/issues/1734 Signed-off-by: Tom Pantelis --- pkg/globalnet/controllers/gateway_monitor.go | 6 +- .../controllers/service_controller.go | 16 +-- .../controllers/service_controller_test.go | 98 ++++++++++--------- .../controllers/service_export_controller.go | 7 +- .../service_export_controller_test.go | 7 +- pkg/globalnet/controllers/types.go | 5 +- 6 files changed, 77 insertions(+), 62 deletions(-) diff --git a/pkg/globalnet/controllers/gateway_monitor.go b/pkg/globalnet/controllers/gateway_monitor.go index dd78dbb7..54b0db06 100644 --- a/pkg/globalnet/controllers/gateway_monitor.go +++ b/pkg/globalnet/controllers/gateway_monitor.go @@ -326,15 +326,15 @@ func (g *gatewayMonitor) startControllers() error { return errors.WithMessage(err, "error creating the IngressEndpointsControllers") } - c, err = NewServiceExportController(g.syncerConfig, podControllers, endpointsControllers, + seController, err := NewServiceExportController(g.syncerConfig, podControllers, endpointsControllers, ingressEndpointsControllers) if err != nil { return errors.Wrap(err, "error creating the ServiceExport controller") } - g.controllers = append(g.controllers, c) + g.controllers = append(g.controllers, seController) - c, err = NewServiceController(g.syncerConfig, podControllers) + c, err = NewServiceController(g.syncerConfig, podControllers, seController.GetSyncer()) if err != nil { return errors.Wrap(err, "error creating the Service controller") } diff --git a/pkg/globalnet/controllers/service_controller.go b/pkg/globalnet/controllers/service_controller.go index fd222045..4303a526 100644 --- a/pkg/globalnet/controllers/service_controller.go +++ b/pkg/globalnet/controllers/service_controller.go @@ -31,7 +31,8 @@ import ( "k8s.io/client-go/tools/cache" ) -func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers) (Interface, error) { +func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers, serviceExportSyncer syncer.Interface, +) (Interface, error) { // We'll panic if config is nil, this is intentional var err error @@ -40,6 +41,7 @@ func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *I controller := &serviceController{ baseSyncerController: newBaseSyncerController(), podControllers: podControllers, + serviceExportSyncer: serviceExportSyncer, } controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{ @@ -97,22 +99,24 @@ func (c *serviceController) Start() error { func (c *serviceController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { service := from.(*corev1.Service) - if service.Spec.Type != corev1.ServiceTypeClusterIP { + if service.Spec.Type != corev1.ServiceTypeClusterIP || op == syncer.Update { return nil, false } + key, _ := cache.MetaNamespaceKeyFunc(service) + logger.Infof("Service %q %sd", key, op) + if op == syncer.Delete { return c.onDelete(service) } + // For Create, requeue the associated ServiceExport, if any, to re-create the GlobalIngressIP. + c.serviceExportSyncer.RequeueResource(service.Name, service.Namespace) + return nil, false } func (c *serviceController) onDelete(service *corev1.Service) (runtime.Object, bool) { - key, _ := cache.MetaNamespaceKeyFunc(service) - - logger.Infof("Service %q deleted", key) - c.podControllers.stopAndCleanup(service.Name, service.Namespace) if service.Spec.ClusterIP == corev1.ClusterIPNone { diff --git a/pkg/globalnet/controllers/service_controller_test.go b/pkg/globalnet/controllers/service_controller_test.go index e940c9cf..57041823 100644 --- a/pkg/globalnet/controllers/service_controller_test.go +++ b/pkg/globalnet/controllers/service_controller_test.go @@ -23,7 +23,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/submariner-io/admiral/pkg/syncer" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/globalnet/controllers" corev1 "k8s.io/api/core/v1" @@ -34,72 +33,80 @@ import ( var _ = Describe("Service controller", func() { t := newServiceControllerTestDriver() - When("an exported cluster IP Service is deleted", func() { + var service *corev1.Service + + When("an exported cluster IP Service is deleted and subsequently re-created", func() { BeforeEach(func() { - t.createServiceExport(t.createService(newClusterIPService())) - t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{ - ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - }, - }) + service = newClusterIPService() + t.createServiceExport(t.createService(service)) }) - It("should delete the GlobalIngressIP", func() { - Expect(t.services.Delete(context.TODO(), serviceName, metav1.DeleteOptions{})).To(Succeed()) - t.awaitNoGlobalIngressIP(serviceName) + JustBeforeEach(func() { + t.awaitGlobalIngressIP(service.Name) + }) + + It("should delete the GlobalIngressIP and then re-create it", func() { + By("Deleting the service") + + Expect(t.services.Delete(context.TODO(), service.Name, metav1.DeleteOptions{})).To(Succeed()) + t.awaitNoGlobalIngressIP(service.Name) + + By("Re-creating the service") + + t.createService(service) + t.awaitGlobalIngressIP(service.Name) }) }) - When("an exported headless Service is deleted", func() { - BeforeEach(func() { - t.createServiceExport(t.createService(newHeadlessService())) + When("an exported headless Service is deleted and subsequently re-created", func() { + var backendPod *corev1.Pod - t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - Labels: map[string]string{ - controllers.ServiceRefLabel: serviceName, - }, - }, - }) + BeforeEach(func() { + service = newHeadlessService() + backendPod = newHeadlessServicePod(service.Name) + t.createServiceExport(t.createService(service)) + }) - t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-2", - Labels: map[string]string{ - controllers.ServiceRefLabel: serviceName, - }, - }, - }) + JustBeforeEach(func() { + t.createPod(backendPod) + t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name) }) - It("should delete the GlobalIngressIP objects associated with the backend Pods", func() { - Expect(t.services.Delete(context.TODO(), serviceName, metav1.DeleteOptions{})).To(Succeed()) + It("should delete the GlobalIngressIP objects associated with the backend Pods and then re-create them", func() { + By("Deleting the service") + + Expect(t.services.Delete(context.TODO(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(func() []unstructured.Unstructured { list, _ := t.globalIngressIPs.List(context.TODO(), metav1.ListOptions{}) return list.Items }, 5).Should(BeEmpty()) + + By("Re-creating the service") + + t.createService(service) + t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name) }) }) When("a GlobalIngressIP is stale on startup due to a missed delete event", func() { Context("for a cluster IP Service", func() { BeforeEach(func() { - t.createServiceExport(newClusterIPService()) + service = newClusterIPService() + t.createServiceExport(service) t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{ ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, + Name: service.Name, }, Spec: submarinerv1.GlobalIngressIPSpec{ Target: submarinerv1.ClusterIPService, - ServiceRef: &corev1.LocalObjectReference{Name: serviceName}, + ServiceRef: &corev1.LocalObjectReference{Name: service.Name}, }, }) }) It("should delete the GlobalIngressIP on reconciliation", func() { - t.awaitNoGlobalIngressIP(serviceName) + t.awaitNoGlobalIngressIP(service.Name) }) }) @@ -110,12 +117,12 @@ var _ = Describe("Service controller", func() { ObjectMeta: metav1.ObjectMeta{ Name: "pod-one", Labels: map[string]string{ - controllers.ServiceRefLabel: serviceName, + controllers.ServiceRefLabel: service.Name, }, }, Spec: submarinerv1.GlobalIngressIPSpec{ Target: submarinerv1.HeadlessServicePod, - ServiceRef: &corev1.LocalObjectReference{Name: serviceName}, + ServiceRef: &corev1.LocalObjectReference{Name: service.Name}, }, }) }) @@ -150,18 +157,13 @@ func newServiceControllerTestDriver() *serviceControllerTestDriver { } func (t *serviceControllerTestDriver) start() { - var err error - - config := &syncer.ResourceSyncerConfig{ - SourceClient: t.dynClient, - RestMapper: t.restMapper, - Scheme: t.scheme, - } + seTestDriver := &serviceExportControllerTestDriver{} + seTestDriver.testDriverBase = t.testDriverBase + config, podControllers, syncer := seTestDriver.start() - podControllers, err := controllers.NewIngressPodControllers(config) - Expect(err).To(Succeed()) + var err error - t.controller, err = controllers.NewServiceController(config, podControllers) + t.controller, err = controllers.NewServiceController(config, podControllers, syncer) Expect(err).To(Succeed()) Expect(t.controller.Start()).To(Succeed()) diff --git a/pkg/globalnet/controllers/service_export_controller.go b/pkg/globalnet/controllers/service_export_controller.go index ed0d4612..a2f6a04f 100644 --- a/pkg/globalnet/controllers/service_export_controller.go +++ b/pkg/globalnet/controllers/service_export_controller.go @@ -33,10 +33,11 @@ import ( mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) +//nolint:revive // Ignore "unexported-return:... which can be annoying to use"; it's only used by unit tests. func NewServiceExportController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers, endpointsControllers *ServiceExportEndpointsControllers, ingressEndpointsControllers *IngressEndpointsControllers, -) (Interface, error) { +) (*serviceExportController, error) { // We'll panic if config is nil, this is intentional var err error @@ -88,6 +89,10 @@ func NewServiceExportController(config *syncer.ResourceSyncerConfig, podControll return controller, nil } +func (c *serviceExportController) GetSyncer() syncer.Interface { + return c.resourceSyncer +} + func (c *serviceExportController) Stop() { c.baseController.Stop() c.podControllers.stopAll() diff --git a/pkg/globalnet/controllers/service_export_controller_test.go b/pkg/globalnet/controllers/service_export_controller_test.go index 44d85fb0..1ebca5c8 100644 --- a/pkg/globalnet/controllers/service_export_controller_test.go +++ b/pkg/globalnet/controllers/service_export_controller_test.go @@ -446,7 +446,7 @@ func newServiceExportControllerTestDriver() *serviceExportControllerTestDriver { return t } -func (t *serviceExportControllerTestDriver) start() { +func (t *serviceExportControllerTestDriver) start() (*syncer.ResourceSyncerConfig, *controllers.IngressPodControllers, syncer.Interface) { var err error t.pool, err = ipam.NewIPPool(t.globalCIDR) @@ -467,8 +467,11 @@ func (t *serviceExportControllerTestDriver) start() { ingressEndpointsControllers, err := controllers.NewIngressEndpointsControllers(config) Expect(err).To(Succeed()) - t.controller, err = controllers.NewServiceExportController(config, podControllers, endpointsControllers, ingressEndpointsControllers) + controller, err := controllers.NewServiceExportController(config, podControllers, endpointsControllers, ingressEndpointsControllers) + t.controller = controller Expect(err).To(Succeed()) Expect(t.controller.Start()).To(Succeed()) + + return config, podControllers, controller.GetSyncer() } diff --git a/pkg/globalnet/controllers/types.go b/pkg/globalnet/controllers/types.go index 1bc056c6..df41c9c3 100644 --- a/pkg/globalnet/controllers/types.go +++ b/pkg/globalnet/controllers/types.go @@ -155,8 +155,9 @@ type serviceExportController struct { type serviceController struct { *baseSyncerController - ingressIPs dynamic.ResourceInterface - podControllers *IngressPodControllers + ingressIPs dynamic.ResourceInterface + podControllers *IngressPodControllers + serviceExportSyncer syncer.Interface } type nodeController struct {