Skip to content

Commit

Permalink
Re-create GlobalIngressIP when service is re-created
Browse files Browse the repository at this point in the history
Fixes submariner-io#1734

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
  • Loading branch information
tpantelis committed Aug 2, 2023
1 parent a8f43d4 commit b44d319
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 65 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637
github.com/prometheus-community/pro-bing v0.3.0
github.com/prometheus/client_golang v1.16.0
github.com/submariner-io/admiral v0.16.0-m3
github.com/submariner-io/admiral v0.16.0-m3.0.20230802132903-9d8a8b93dc58
github.com/submariner-io/shipyard v0.16.0-m3
github.com/uw-labs/lichen v0.1.7
github.com/vishvananda/netlink v1.2.1-beta.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/submariner-io/admiral v0.16.0-m3 h1:qHpRw8mE80/Q3e/2z0ZPdqPVBESYWPPXRmJxUn5azeY=
github.com/submariner-io/admiral v0.16.0-m3/go.mod h1:wiU8hC/soJ6C3g634CFvaGq/Hptgy4D2Xpf8IlQ5y5A=
github.com/submariner-io/admiral v0.16.0-m3.0.20230802132903-9d8a8b93dc58 h1:qM5Tr6cuIODfNRminvU7nx2Vf720lLVJuMNBWhR5sLs=
github.com/submariner-io/admiral v0.16.0-m3.0.20230802132903-9d8a8b93dc58/go.mod h1:wiU8hC/soJ6C3g634CFvaGq/Hptgy4D2Xpf8IlQ5y5A=
github.com/submariner-io/shipyard v0.16.0-m3 h1:795gM5zCjszEjQ5UM9LY/7vRTldHt16PdETTzjY265A=
github.com/submariner-io/shipyard v0.16.0-m3/go.mod h1:P6zHeYDcQMS24/8Z7NN2WP4Ydqdu4CB4HC+VRn3l2MA=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
6 changes: 3 additions & 3 deletions pkg/globalnet/controllers/gateway_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,15 @@ func (g *gatewayMonitor) startControllers() error {
return errors.WithMessage(err, "error creating the IngressEndpointsControllers")
}

c, err = NewServiceExportController(g.syncerConfig, podControllers, endpointsControllers,
seController, err := NewServiceExportController(g.syncerConfig, podControllers, endpointsControllers,
ingressEndpointsControllers)
if err != nil {
return errors.Wrap(err, "error creating the ServiceExport controller")
}

g.controllers = append(g.controllers, c)
g.controllers = append(g.controllers, seController)

c, err = NewServiceController(g.syncerConfig, podControllers)
c, err = NewServiceController(g.syncerConfig, podControllers, seController.GetSyncer())
if err != nil {
return errors.Wrap(err, "error creating the Service controller")
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/globalnet/controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
"k8s.io/client-go/tools/cache"
)

func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers) (Interface, error) {
func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers, serviceExportSyncer syncer.Interface,
) (Interface, error) {
// We'll panic if config is nil, this is intentional
var err error

Expand All @@ -40,6 +41,7 @@ func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *I
controller := &serviceController{
baseSyncerController: newBaseSyncerController(),
podControllers: podControllers,
serviceExportSyncer: serviceExportSyncer,
}

controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{
Expand Down Expand Up @@ -97,22 +99,24 @@ func (c *serviceController) Start() error {
func (c *serviceController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) {
service := from.(*corev1.Service)

if service.Spec.Type != corev1.ServiceTypeClusterIP {
if service.Spec.Type != corev1.ServiceTypeClusterIP || op == syncer.Update {
return nil, false
}

key, _ := cache.MetaNamespaceKeyFunc(service)
logger.Infof("Service %q %sd", key, op)

if op == syncer.Delete {
return c.onDelete(service)
}

// For Create, requeue the associated ServiceExport, if any, to re-create the GlobalIngressIP.
c.serviceExportSyncer.RequeueResource(service.Name, service.Namespace)

return nil, false
}

func (c *serviceController) onDelete(service *corev1.Service) (runtime.Object, bool) {
key, _ := cache.MetaNamespaceKeyFunc(service)

logger.Infof("Service %q deleted", key)

c.podControllers.stopAndCleanup(service.Name, service.Namespace)

if service.Spec.ClusterIP == corev1.ClusterIPNone {
Expand Down
98 changes: 50 additions & 48 deletions pkg/globalnet/controllers/service_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/syncer"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/globalnet/controllers"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,72 +33,80 @@ import (
var _ = Describe("Service controller", func() {
t := newServiceControllerTestDriver()

When("an exported cluster IP Service is deleted", func() {
var service *corev1.Service

When("an exported cluster IP Service is deleted and subsequently re-created", func() {
BeforeEach(func() {
t.createServiceExport(t.createService(newClusterIPService()))
t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
},
})
service = newClusterIPService()
t.createServiceExport(t.createService(service))
})

It("should delete the GlobalIngressIP", func() {
Expect(t.services.Delete(context.TODO(), serviceName, metav1.DeleteOptions{})).To(Succeed())
t.awaitNoGlobalIngressIP(serviceName)
JustBeforeEach(func() {
t.awaitGlobalIngressIP(service.Name)
})

It("should delete the GlobalIngressIP and then re-create it", func() {
By("Deleting the service")

Expect(t.services.Delete(context.TODO(), service.Name, metav1.DeleteOptions{})).To(Succeed())
t.awaitNoGlobalIngressIP(service.Name)

By("Re-creating the service")

t.createService(service)
t.awaitGlobalIngressIP(service.Name)
})
})

When("an exported headless Service is deleted", func() {
BeforeEach(func() {
t.createServiceExport(t.createService(newHeadlessService()))
When("an exported headless Service is deleted and subsequently re-created", func() {
var backendPod *corev1.Pod

t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Labels: map[string]string{
controllers.ServiceRefLabel: serviceName,
},
},
})
BeforeEach(func() {
service = newHeadlessService()
backendPod = newHeadlessServicePod(service.Name)
t.createServiceExport(t.createService(service))
})

t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
Labels: map[string]string{
controllers.ServiceRefLabel: serviceName,
},
},
})
JustBeforeEach(func() {
t.createPod(backendPod)
t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name)
})

It("should delete the GlobalIngressIP objects associated with the backend Pods", func() {
Expect(t.services.Delete(context.TODO(), serviceName, metav1.DeleteOptions{})).To(Succeed())
It("should delete the GlobalIngressIP objects associated with the backend Pods and then re-create them", func() {
By("Deleting the service")

Expect(t.services.Delete(context.TODO(), service.Name, metav1.DeleteOptions{})).To(Succeed())

Eventually(func() []unstructured.Unstructured {
list, _ := t.globalIngressIPs.List(context.TODO(), metav1.ListOptions{})
return list.Items
}, 5).Should(BeEmpty())

By("Re-creating the service")

t.createService(service)
t.awaitHeadlessGlobalIngressIP(service.Name, backendPod.Name)
})
})

When("a GlobalIngressIP is stale on startup due to a missed delete event", func() {
Context("for a cluster IP Service", func() {
BeforeEach(func() {
t.createServiceExport(newClusterIPService())
service = newClusterIPService()
t.createServiceExport(service)
t.createGlobalIngressIP(&submarinerv1.GlobalIngressIP{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Name: service.Name,
},
Spec: submarinerv1.GlobalIngressIPSpec{
Target: submarinerv1.ClusterIPService,
ServiceRef: &corev1.LocalObjectReference{Name: serviceName},
ServiceRef: &corev1.LocalObjectReference{Name: service.Name},
},
})
})

It("should delete the GlobalIngressIP on reconciliation", func() {
t.awaitNoGlobalIngressIP(serviceName)
t.awaitNoGlobalIngressIP(service.Name)
})
})

Expand All @@ -110,12 +117,12 @@ var _ = Describe("Service controller", func() {
ObjectMeta: metav1.ObjectMeta{
Name: "pod-one",
Labels: map[string]string{
controllers.ServiceRefLabel: serviceName,
controllers.ServiceRefLabel: service.Name,
},
},
Spec: submarinerv1.GlobalIngressIPSpec{
Target: submarinerv1.HeadlessServicePod,
ServiceRef: &corev1.LocalObjectReference{Name: serviceName},
ServiceRef: &corev1.LocalObjectReference{Name: service.Name},
},
})
})
Expand Down Expand Up @@ -150,18 +157,13 @@ func newServiceControllerTestDriver() *serviceControllerTestDriver {
}

func (t *serviceControllerTestDriver) start() {
var err error

config := &syncer.ResourceSyncerConfig{
SourceClient: t.dynClient,
RestMapper: t.restMapper,
Scheme: t.scheme,
}
seTestDriver := &serviceExportControllerTestDriver{}
seTestDriver.testDriverBase = t.testDriverBase
config, podControllers, syncer := seTestDriver.start()

podControllers, err := controllers.NewIngressPodControllers(config)
Expect(err).To(Succeed())
var err error

t.controller, err = controllers.NewServiceController(config, podControllers)
t.controller, err = controllers.NewServiceController(config, podControllers, syncer)

Expect(err).To(Succeed())
Expect(t.controller.Start()).To(Succeed())
Expand Down
7 changes: 6 additions & 1 deletion pkg/globalnet/controllers/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ import (
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

//nolint:revive // Ignore "unexported-return:... which can be annoying to use"; it's only used by unit tests.
func NewServiceExportController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers,
endpointsControllers *ServiceExportEndpointsControllers,
ingressEndpointsControllers *IngressEndpointsControllers,
) (Interface, error) {
) (*serviceExportController, error) {
// We'll panic if config is nil, this is intentional
var err error

Expand Down Expand Up @@ -88,6 +89,10 @@ func NewServiceExportController(config *syncer.ResourceSyncerConfig, podControll
return controller, nil
}

func (c *serviceExportController) GetSyncer() syncer.Interface {
return c.resourceSyncer
}

func (c *serviceExportController) Stop() {
c.baseController.Stop()
c.podControllers.stopAll()
Expand Down
7 changes: 5 additions & 2 deletions pkg/globalnet/controllers/service_export_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func newServiceExportControllerTestDriver() *serviceExportControllerTestDriver {
return t
}

func (t *serviceExportControllerTestDriver) start() {
func (t *serviceExportControllerTestDriver) start() (*syncer.ResourceSyncerConfig, *controllers.IngressPodControllers, syncer.Interface) {
var err error

t.pool, err = ipam.NewIPPool(t.globalCIDR)
Expand All @@ -467,8 +467,11 @@ func (t *serviceExportControllerTestDriver) start() {
ingressEndpointsControllers, err := controllers.NewIngressEndpointsControllers(config)
Expect(err).To(Succeed())

t.controller, err = controllers.NewServiceExportController(config, podControllers, endpointsControllers, ingressEndpointsControllers)
controller, err := controllers.NewServiceExportController(config, podControllers, endpointsControllers, ingressEndpointsControllers)
t.controller = controller

Expect(err).To(Succeed())
Expect(t.controller.Start()).To(Succeed())

return config, podControllers, controller.GetSyncer()
}
5 changes: 3 additions & 2 deletions pkg/globalnet/controllers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ type serviceExportController struct {

type serviceController struct {
*baseSyncerController
ingressIPs dynamic.ResourceInterface
podControllers *IngressPodControllers
ingressIPs dynamic.ResourceInterface
podControllers *IngressPodControllers
serviceExportSyncer syncer.Interface
}

type nodeController struct {
Expand Down

0 comments on commit b44d319

Please sign in to comment.