Skip to content

Commit

Permalink
Re-create GlobalIngressIP when service is re-created
Browse files Browse the repository at this point in the history
Fixes #1734

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
  • Loading branch information
tpantelis authored and dfarrell07 committed Aug 9, 2023
1 parent b02c024 commit bf8acf1
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 62 deletions.
6 changes: 3 additions & 3 deletions pkg/globalnet/controllers/gateway_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,15 @@ func (g *gatewayMonitor) startControllers() error {
return errors.WithMessage(err, "error creating the IngressEndpointsControllers")
}

c, err = NewServiceExportController(g.syncerConfig, podControllers, endpointsControllers,
seController, err := NewServiceExportController(g.syncerConfig, podControllers, endpointsControllers,
ingressEndpointsControllers)
if err != nil {
return errors.Wrap(err, "error creating the ServiceExport controller")
}

g.controllers = append(g.controllers, c)
g.controllers = append(g.controllers, seController)

c, err = NewServiceController(g.syncerConfig, podControllers)
c, err = NewServiceController(g.syncerConfig, podControllers, seController.GetSyncer())
if err != nil {
return errors.Wrap(err, "error creating the Service controller")
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/globalnet/controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
"k8s.io/client-go/tools/cache"
)

func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers) (Interface, error) {
func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers, serviceExportSyncer syncer.Interface,
) (Interface, error) {
// We'll panic if config is nil, this is intentional
var err error

Expand All @@ -40,6 +41,7 @@ func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *I
controller := &serviceController{
baseSyncerController: newBaseSyncerController(),
podControllers: podControllers,
serviceExportSyncer: serviceExportSyncer,
}

controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{
Expand Down Expand Up @@ -97,22 +99,24 @@ func (c *serviceController) Start() error {
func (c *serviceController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) {
service := from.(*corev1.Service)

if service.Spec.Type != corev1.ServiceTypeClusterIP {
if service.Spec.Type != corev1.ServiceTypeClusterIP || op == syncer.Update {
return nil, false
}

key, _ := cache.MetaNamespaceKeyFunc(service)
logger.Infof("Service %q %sd", key, op)

if op == syncer.Delete {
return c.onDelete(service)
}

// For Create, requeue the associated ServiceExport, if any, to re-create the GlobalIngressIP.
c.serviceExportSyncer.RequeueResource(service.Name, service.Namespace)

return nil, false
}

func (c *serviceController) onDelete(service *corev1.Service) (runtime.Object, bool) {
key, _ := cache.MetaNamespaceKeyFunc(service)

logger.Infof("Service %q deleted", key)

c.podControllers.stopAndCleanup(service.Name, service.Namespace)

if service.Spec.ClusterIP == corev1.ClusterIPNone {
Expand Down
98 changes: 50 additions & 48 deletions pkg/globalnet/controllers/service_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/syncer"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/globalnet/controllers"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,72 +33,80 @@ import (
var _ = Describe("Service controller", func() {
t := newServiceControllerTestDriver()

When("an exported cluster IP Service is deleted", func() {
var service *corev1.Service

When("an exported cluster IP Service is deleted and subsequently re-created", func() {
BeforeEach(func() {
t.createServiceExport(t.createService(newClusterIPService()))
t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
},
})
service = newClusterIPService()
t.createServiceExport(t.createService(service))
})

It("should delete the GlobalIngressIP", func() {
Expect(t.services.Delete(context.TODO(), serviceName, metav1.DeleteOptions{})).To(Succeed())
t.awaitNoGlobalIngressIP(serviceName)
JustBeforeEach(func() {
t.awaitGlobalIngressIP(service.Name)
})

It("should delete the GlobalIngressIP and then re-create it", func() {
By("Deleting the service")

Expect(t.services.Delete(context.TODO(), service.Name, metav1.DeleteOptions{})).To(Succeed())
t.awaitNoGlobalIngressIP(service.Name)

By("Re-creating the service")

t.createService(service)
t.awaitGlobalIngressIP(service.Name)
})
})

When("an exported headless Service is deleted", func() {
BeforeEach(func() {
t.createServiceExport(t.createService(newHeadlessService()))
When("an exported headless Service is deleted and subsequently re-created", func() {
var backendPod *corev1.Pod

t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Labels: map[string]string{
controllers.ServiceRefLabel: serviceName,
},
},
})
BeforeEach(func() {
service = newHeadlessService()
backendPod = newHeadlessServicePod(service.Name)
t.createServiceExport(t.createService(service))
})

t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
Labels: map[string]string{
controllers.ServiceRefLabel: serviceName,
},
},
})
JustBeforeEach(func() {
t.createPod(backendPod)
t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name)
})

It("should delete the GlobalIngressIP objects associated with the backend Pods", func() {
Expect(t.services.Delete(context.TODO(), serviceName, metav1.DeleteOptions{})).To(Succeed())
It("should delete the GlobalIngressIP objects associated with the backend Pods and then re-create them", func() {
By("Deleting the service")

Expect(t.services.Delete(context.TODO(), service.Name, metav1.DeleteOptions{})).To(Succeed())

Eventually(func() []unstructured.Unstructured {
list, _ := t.globalIngressIPs.List(context.TODO(), metav1.ListOptions{})
return list.Items
}, 5).Should(BeEmpty())

By("Re-creating the service")

t.createService(service)
t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name)
})
})

When("a GlobalIngressIP is stale on startup due to a missed delete event", func() {
Context("for a cluster IP Service", func() {
BeforeEach(func() {
t.createServiceExport(newClusterIPService())
service = newClusterIPService()
t.createServiceExport(service)
t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Name: service.Name,
},
Spec: submarinerv1.GlobalIngressIPSpec{
Target: submarinerv1.ClusterIPService,
ServiceRef: &corev1.LocalObjectReference{Name: serviceName},
ServiceRef: &corev1.LocalObjectReference{Name: service.Name},
},
})
})

It("should delete the GlobalIngressIP on reconciliation", func() {
t.awaitNoGlobalIngressIP(serviceName)
t.awaitNoGlobalIngressIP(service.Name)
})
})

Expand All @@ -110,12 +117,12 @@ var _ = Describe("Service controller", func() {
ObjectMeta: metav1.ObjectMeta{
Name: "pod-one",
Labels: map[string]string{
controllers.ServiceRefLabel: serviceName,
controllers.ServiceRefLabel: service.Name,
},
},
Spec: submarinerv1.GlobalIngressIPSpec{
Target: submarinerv1.HeadlessServicePod,
ServiceRef: &corev1.LocalObjectReference{Name: serviceName},
ServiceRef: &corev1.LocalObjectReference{Name: service.Name},
},
})
})
Expand Down Expand Up @@ -150,18 +157,13 @@ func newServiceControllerTestDriver() *serviceControllerTestDriver {
}

func (t *serviceControllerTestDriver) start() {
var err error

config := &syncer.ResourceSyncerConfig{
SourceClient: t.dynClient,
RestMapper: t.restMapper,
Scheme: t.scheme,
}
seTestDriver := &serviceExportControllerTestDriver{}
seTestDriver.testDriverBase = t.testDriverBase
config, podControllers, syncer := seTestDriver.start()

podControllers, err := controllers.NewIngressPodControllers(config)
Expect(err).To(Succeed())
var err error

t.controller, err = controllers.NewServiceController(config, podControllers)
t.controller, err = controllers.NewServiceController(config, podControllers, syncer)

Expect(err).To(Succeed())
Expect(t.controller.Start()).To(Succeed())
Expand Down
7 changes: 6 additions & 1 deletion pkg/globalnet/controllers/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ import (
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

//nolint:revive // Ignore "unexported-return:... which can be annoying to use"; it's only used by unit tests.
func NewServiceExportController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers,
endpointsControllers *ServiceExportEndpointsControllers,
ingressEndpointsControllers *IngressEndpointsControllers,
) (Interface, error) {
) (*serviceExportController, error) {
// We'll panic if config is nil, this is intentional
var err error

Expand Down Expand Up @@ -88,6 +89,10 @@ func NewServiceExportController(config *syncer.ResourceSyncerConfig, podControll
return controller, nil
}

func (c *serviceExportController) GetSyncer() syncer.Interface {
return c.resourceSyncer
}

func (c *serviceExportController) Stop() {
c.baseController.Stop()
c.podControllers.stopAll()
Expand Down
7 changes: 5 additions & 2 deletions pkg/globalnet/controllers/service_export_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func newServiceExportControllerTestDriver() *serviceExportControllerTestDriver {
return t
}

func (t *serviceExportControllerTestDriver) start() {
func (t *serviceExportControllerTestDriver) start() (*syncer.ResourceSyncerConfig, *controllers.IngressPodControllers, syncer.Interface) {
var err error

t.pool, err = ipam.NewIPPool(t.globalCIDR)
Expand All @@ -467,8 +467,11 @@ func (t *serviceExportControllerTestDriver) start() {
ingressEndpointsControllers, err := controllers.NewIngressEndpointsControllers(config)
Expect(err).To(Succeed())

t.controller, err = controllers.NewServiceExportController(config, podControllers, endpointsControllers, ingressEndpointsControllers)
controller, err := controllers.NewServiceExportController(config, podControllers, endpointsControllers, ingressEndpointsControllers)
t.controller = controller

Expect(err).To(Succeed())
Expect(t.controller.Start()).To(Succeed())

return config, podControllers, controller.GetSyncer()
}
5 changes: 3 additions & 2 deletions pkg/globalnet/controllers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ type serviceExportController struct {

type serviceController struct {
*baseSyncerController
ingressIPs dynamic.ResourceInterface
podControllers *IngressPodControllers
ingressIPs dynamic.ResourceInterface
podControllers *IngressPodControllers
serviceExportSyncer syncer.Interface
}

type nodeController struct {
Expand Down

0 comments on commit bf8acf1

Please sign in to comment.