From 761b26535c747be2cf4a6e736c566754bc852b9a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 30 Oct 2023 10:27:36 -0400 Subject: [PATCH] Bump admiral and adjust to Federator API changes This also required further propagating ctx params to satisfy the contextcheck linter. Signed-off-by: Tom Pantelis --- go.mod | 2 +- go.sum | 4 +-- pkg/agent/controller/agent.go | 31 +++++++++++------- pkg/agent/controller/cleanup.go | 22 ++++++------- pkg/agent/controller/cleanup_test.go | 2 +- pkg/agent/controller/endpoint_slice.go | 32 +++++++++++-------- .../controller/service_endpoint_slices.go | 18 +++++------ pkg/agent/controller/service_export_client.go | 23 +++++++------ pkg/agent/controller/service_import.go | 28 ++++++++-------- .../controller/service_import_aggregator.go | 22 ++++++------- .../controller/service_import_migrator.go | 6 ++-- pkg/agent/main.go | 5 ++- 12 files changed, 106 insertions(+), 89 deletions(-) diff --git a/go.mod b/go.mod index 2fbe376ba..4418493c7 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 - github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c + github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca github.com/submariner-io/shipyard v0.16.0 github.com/uw-labs/lichen v0.1.7 k8s.io/api v0.27.6 diff --git a/go.sum b/go.sum index f0e0b27d8..f5a074ca9 100644 --- a/go.sum +++ b/go.sum @@ -408,8 +408,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c h1:zy5mZZrB885JAuLPqpb/RoGhtd9N9tUCFE5OGAZEzWw= -github.com/submariner-io/admiral v0.16.1-0.20231025063702-858d0984799c/go.mod h1:GP0TCJkt444r2ONKVHKBbSPaKjJb0S5Qj0MyNUl2keQ= +github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca h1:O3gUAdYldm4LQJD/IpqaFcOlJVf9ikM+uBWsfVADmjg= +github.com/submariner-io/admiral v0.16.1-0.20231030143920-15adb86d8eca/go.mod h1:GP0TCJkt444r2ONKVHKBbSPaKjJb0S5Qj0MyNUl2keQ= github.com/submariner-io/shipyard v0.16.0 h1:PTvp2aKNBoCkfC8nS38k+DW5ZaXNMq/wzzjGOvsiAQM= github.com/submariner-io/shipyard v0.16.0/go.mod h1:aKCotVktXJO3azjBOmhu/0KbRcYLY3eUcSNSDDJNbxs= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index ade0a91a8..bc3a625ed 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -19,6 +19,7 @@ limitations under the License. package controller import ( + "context" "encoding/json" "fmt" "reflect" @@ -179,6 +180,8 @@ func (a *Controller) Start(stopCh <-chan struct{}) error { func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { svcExport := obj.(*mcsv1a1.ServiceExport) + ctx := context.Background() + logger.V(log.DEBUG).Infof("ServiceExport %s/%s %sd", svcExport.Namespace, svcExport.Name, op) if op == syncer.Delete { @@ -188,8 +191,9 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op obj, found, err := a.serviceSyncer.GetResource(svcExport.Name, svcExport.Namespace) if err != nil { // some other error. Log and requeue - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionUnknown, "ServiceRetrievalFailed", fmt.Sprintf("Error retrieving the Service: %v", err))) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionUnknown, "ServiceRetrievalFailed", + fmt.Sprintf("Error retrieving the Service: %v", err))) logger.Errorf(err, "Error retrieving Service %s/%s", svcExport.Namespace, svcExport.Name) return nil, true @@ -197,8 +201,9 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op if !found { logger.V(log.DEBUG).Infof("Service to be exported (%s/%s) doesn't exist", svcExport.Namespace, svcExport.Name) - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionFalse, serviceUnavailable, "Service to be exported doesn't exist")) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, serviceUnavailable, + "Service to be exported doesn't exist")) return nil, false } @@ -208,11 +213,12 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op svcType, ok := getServiceImportType(svc) if !ok { - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionFalse, invalidServiceType, fmt.Sprintf("Service of type %v not supported", svc.Spec.Type))) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, invalidServiceType, + fmt.Sprintf("Service of type %v not supported", svc.Spec.Type))) logger.Errorf(nil, "Service type %q not supported for Service (%s/%s)", svc.Spec.Type, svcExport.Namespace, svcExport.Name) - err = a.localServiceImportFederator.Delete(a.newServiceImport(svcExport.Name, svcExport.Namespace)) + err = a.localServiceImportFederator.Delete(ctx, a.newServiceImport(svcExport.Name, svcExport.Namespace)) if err == nil || apierrors.IsNotFound(err) { return nil, false } @@ -246,7 +252,7 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op logger.V(log.DEBUG).Infof("Service to be exported (%s/%s) doesn't have a global IP yet", svcExport.Namespace, svcExport.Name) // Globalnet enabled but service doesn't have globalIp yet, Update the status and requeue - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, reason, msg)) return nil, true @@ -260,8 +266,8 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op serviceImport.Spec.Ports = a.getPortsForService(svc) } - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionTrue, "", "")) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionTrue, "", "")) logger.V(log.DEBUG).Infof("Returning ServiceImport %s/%s: %s", svcExport.Namespace, svcExport.Name, serviceImportStringer{serviceImport}) @@ -329,8 +335,9 @@ func (a *Controller) serviceToRemoteServiceImport(obj runtime.Object, _ int, op serviceImport := a.newServiceImport(svc.Name, svc.Namespace) // Update the status and requeue - a.serviceExportClient.updateStatusConditions(svc.Name, svc.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionFalse, serviceUnavailable, "Service to be exported doesn't exist")) + a.serviceExportClient.updateStatusConditions(context.Background(), svc.Name, svc.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, serviceUnavailable, + "Service to be exported doesn't exist")) return serviceImport, false } diff --git a/pkg/agent/controller/cleanup.go b/pkg/agent/controller/cleanup.go index 6ece41726..af82d7977 100644 --- a/pkg/agent/controller/cleanup.go +++ b/pkg/agent/controller/cleanup.go @@ -48,12 +48,12 @@ var ( } ) -func (a *Controller) Cleanup() error { +func (a *Controller) Cleanup(ctx context.Context) error { // Delete all ServiceImports from the local cluster skipping those in the broker namespace if the broker is on the // local cluster. siClient := a.serviceImportController.localClient.Resource(serviceImportGVR) - list, err := listResources(siClient, metav1.NamespaceAll, + list, err := listResources(ctx, siClient, metav1.NamespaceAll, &metav1.ListOptions{ FieldSelector: fields.OneTermNotEqualSelector("metadata.namespace", a.serviceImportController.serviceImportAggregator.brokerNamespace).String(), @@ -65,13 +65,13 @@ func (a *Controller) Cleanup() error { for i := range list { _, ok := list[i].GetLabels()[mcsv1a1.LabelServiceName] if ok { - err = a.serviceImportController.Delete(&list[i]) + err = a.serviceImportController.Delete(ctx, &list[i]) if err != nil && !apierrors.IsNotFound(err) { return err } } - err = siClient.Namespace(list[i].GetNamespace()).Delete(context.TODO(), list[i].GetName(), metav1.DeleteOptions{}) + err = siClient.Namespace(list[i].GetNamespace()).Delete(ctx, list[i].GetName(), metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err //nolint:wrapcheck // Let the caller wrap @@ -80,7 +80,7 @@ func (a *Controller) Cleanup() error { // Delete all EndpointSlices from the local cluster skipping those in the broker namespace if the broker is on the // local cluster. - err = deleteResources(a.endpointSliceController.syncer.GetLocalClient().Resource(endpointSliceGVR), metav1.NamespaceAll, + err = deleteResources(ctx, a.endpointSliceController.syncer.GetLocalClient().Resource(endpointSliceGVR), metav1.NamespaceAll, &metav1.ListOptions{ FieldSelector: fields.OneTermNotEqualSelector("metadata.namespace", a.serviceImportController.serviceImportAggregator.brokerNamespace).String(), @@ -91,7 +91,7 @@ func (a *Controller) Cleanup() error { } // Delete all local EndpointSlices from the broker. - err = deleteResources(a.endpointSliceController.syncer.GetBrokerClient().Resource(endpointSliceGVR), + err = deleteResources(ctx, a.endpointSliceController.syncer.GetBrokerClient().Resource(endpointSliceGVR), a.endpointSliceController.syncer.GetBrokerNamespace(), &metav1.ListOptions{ LabelSelector: labels.Set(map[string]string{constants.MCSLabelSourceCluster: a.clusterID}).String(), @@ -100,14 +100,14 @@ func (a *Controller) Cleanup() error { return errors.Wrap(err, "error deleting remote EndpointSlices") } -func deleteResources(client dynamic.NamespaceableResourceInterface, ns string, options *metav1.ListOptions) error { - list, err := listResources(client, ns, options) +func deleteResources(ctx context.Context, client dynamic.NamespaceableResourceInterface, ns string, options *metav1.ListOptions) error { + list, err := listResources(ctx, client, ns, options) if err != nil { return err } for i := range list { - err = client.Namespace(list[i].GetNamespace()).Delete(context.TODO(), list[i].GetName(), metav1.DeleteOptions{}) + err = client.Namespace(list[i].GetNamespace()).Delete(ctx, list[i].GetName(), metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err //nolint:wrapcheck // Let the caller wrap } @@ -116,10 +116,10 @@ func deleteResources(client dynamic.NamespaceableResourceInterface, ns string, o return nil } -func listResources(client dynamic.NamespaceableResourceInterface, ns string, +func listResources(ctx context.Context, client dynamic.NamespaceableResourceInterface, ns string, options *metav1.ListOptions, ) ([]unstructured.Unstructured, error) { - list, err := client.Namespace(ns).List(context.TODO(), *options) + list, err := client.Namespace(ns).List(ctx, *options) if err != nil && !apierrors.IsNotFound(err) { return nil, err //nolint:wrapcheck // Let the caller wrap } diff --git a/pkg/agent/controller/cleanup_test.go b/pkg/agent/controller/cleanup_test.go index bf993cd37..8d4c4b4c8 100644 --- a/pkg/agent/controller/cleanup_test.go +++ b/pkg/agent/controller/cleanup_test.go @@ -228,7 +228,7 @@ var _ = Describe("Cleanup", func() { }) It("should correctly remove local and remote ServiceImports and EndpointSlices", func() { - Expect(t.cluster1.agentController.Cleanup()).To(Succeed()) + Expect(t.cluster1.agentController.Cleanup(context.Background())).To(Succeed()) test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport1.Name) test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(serviceNamespace), localAggregatedServiceImport1.Name) diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 1494915ed..0499eaeee 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -65,7 +65,7 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy BrokerResourceType: &discovery.EndpointSlice{}, TransformBrokerToLocal: c.onRemoteEndpointSlice, OnSuccessfulSyncFromBroker: func(obj runtime.Object, op syncer.Operation) bool { - c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice), op) + c.enqueueForConflictCheck(context.TODO(), obj.(*discovery.EndpointSlice), op) return false }, BrokerResyncPeriod: BrokerResyncPeriod, @@ -102,12 +102,13 @@ func (c *EndpointSliceController) start(stopCh <-chan struct{}) error { func (c *EndpointSliceController) onLocalEndpointSlice(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { endpointSlice := obj.(*discovery.EndpointSlice) + ctx := context.TODO() if op != syncer.Delete && isLegacyEndpointSlice(endpointSlice) { logger.Infof("Found legacy EndpointSlice %s/%s - deleting it", endpointSlice.Namespace, endpointSlice.Name) - err := c.syncer.GetLocalFederator().Delete(endpointSlice) + err := c.syncer.GetLocalFederator().Delete(ctx, endpointSlice) if err != nil { logger.Errorf(err, "Error deleting legacy EndpointSlice %s/%s", endpointSlice.Namespace, endpointSlice.Name) } @@ -135,6 +136,7 @@ func (c *EndpointSliceController) onRemoteEndpointSlice(obj runtime.Object, _ in func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, op syncer.Operation) bool { endpointSlice := obj.(*discovery.EndpointSlice) + ctx := context.TODO() serviceName := endpointSlice.Labels[mcsv1a1.LabelServiceName] serviceNamespace := endpointSlice.Labels[constants.LabelSourceNamespace] @@ -153,18 +155,20 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, if op == syncer.Delete { if c.hasNoRemainingEndpointSlices(endpointSlice) { - err = c.serviceImportAggregator.updateOnDelete(serviceName, serviceNamespace) + err = c.serviceImportAggregator.updateOnDelete(ctx, serviceName, serviceNamespace) } } else { - err = c.serviceImportAggregator.updateOnCreateOrUpdate(serviceName, serviceNamespace) + err = c.serviceImportAggregator.updateOnCreateOrUpdate(ctx, serviceName, serviceNamespace) if err != nil { - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionFalse, exportFailedReason, fmt.Sprintf("Unable to export: %v", err))) + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, + newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, + fmt.Sprintf("Unable to export: %v", err))) } else { - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionTrue, "", "Service was successfully exported to the broker")) + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, + newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionTrue, "", + "Service was successfully exported to the broker")) - c.enqueueForConflictCheck(endpointSlice, op) + c.enqueueForConflictCheck(ctx, endpointSlice, op) } } @@ -201,6 +205,8 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di } func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (bool, error) { + ctx := context.TODO() + localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace) if localServiceExport == nil { return false, nil @@ -230,26 +236,26 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( } if conflict { - c.serviceExportClient.updateStatusConditions(name, namespace, newServiceExportCondition( + c.serviceExportClient.updateStatusConditions(ctx, name, namespace, newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, portConflictReason, fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { - c.serviceExportClient.removeStatusCondition(name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) + c.serviceExportClient.removeStatusCondition(ctx, name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) } return false, nil } -func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice, op syncer.Operation) { +func (c *EndpointSliceController) enqueueForConflictCheck(ctx context.Context, eps *discovery.EndpointSlice, op syncer.Operation) { if eps.Labels[constants.LabelIsHeadless] != "false" { return } // Since the conflict checking works off of the local cache for efficiency, wait a little bit here for the local cache to be updated // with the latest state of the EndpointSlice. - _ = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 100*time.Millisecond, true, + _ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 100*time.Millisecond, true, func(_ context.Context) (bool, error) { _, found, _ := c.syncer.GetLocalResource(eps.Name, eps.Namespace, eps) return (op == syncer.Delete && !found) || (op != syncer.Delete && found), nil diff --git a/pkg/agent/controller/service_endpoint_slices.go b/pkg/agent/controller/service_endpoint_slices.go index 9c7d8d5d3..cc2eb138e 100644 --- a/pkg/agent/controller/service_endpoint_slices.go +++ b/pkg/agent/controller/service_endpoint_slices.go @@ -99,7 +99,7 @@ func (c *ServiceEndpointSliceController) stop() { }) } -func (c *ServiceEndpointSliceController) cleanup() (bool, error) { +func (c *ServiceEndpointSliceController) cleanup(ctx context.Context) (bool, error) { listOptions := metav1.ListOptions{ LabelSelector: k8slabels.SelectorFromSet(map[string]string{ discovery.LabelManagedBy: constants.LabelValueManagedBy, @@ -109,7 +109,7 @@ func (c *ServiceEndpointSliceController) cleanup() (bool, error) { }).String(), } - list, err := c.localClient.List(context.Background(), listOptions) + list, err := c.localClient.List(ctx, listOptions) if err != nil { return false, errors.Wrapf(err, "error listing the EndpointSlices associated with service %s/%s", c.serviceNamespace, c.serviceName) @@ -119,7 +119,7 @@ func (c *ServiceEndpointSliceController) cleanup() (bool, error) { return false, nil } - err = c.localClient.DeleteCollection(context.Background(), metav1.DeleteOptions{}, listOptions) + err = c.localClient.DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions) if err != nil && !apierrors.IsNotFound(err) { return false, errors.Wrapf(err, "error deleting the EndpointSlices associated with service %s/%s", @@ -155,7 +155,7 @@ func (c *ServiceEndpointSliceController) onServiceEndpointSlice(obj runtime.Obje } if op == syncer.Delete { - list, err := c.localClient.List(context.Background(), metav1.ListOptions{ + list, err := c.localClient.List(context.TODO(), metav1.ListOptions{ LabelSelector: k8slabels.SelectorFromSet(returnEPS.Labels).String(), }) if err != nil { @@ -320,18 +320,18 @@ func (c *ServiceEndpointSliceController) isHeadless() bool { return c.serviceImportSpec.Type == mcsv1a1.Headless } -func (c *ServiceEndpointSliceController) Distribute(obj runtime.Object) error { - return c.federator.Distribute(obj) //nolint:wrapcheck // No need to wrap here +func (c *ServiceEndpointSliceController) Distribute(ctx context.Context, obj runtime.Object) error { + return c.federator.Distribute(ctx, obj) //nolint:wrapcheck // No need to wrap here } -func (c *ServiceEndpointSliceController) Delete(obj runtime.Object) error { +func (c *ServiceEndpointSliceController) Delete(ctx context.Context, obj runtime.Object) error { if c.isHeadless() { - return c.federator.Delete(obj) //nolint:wrapcheck // No need to wrap here + return c.federator.Delete(ctx, obj) //nolint:wrapcheck // No need to wrap here } // For a non-headless service, we never delete the single exported EPS - we update its endpoint condition based on // the backend service EPS's as they are created/updated/deleted. - return c.Distribute(obj) + return c.Distribute(ctx, obj) } type endpointSliceStringer struct { diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index cf92132b5..3743f7b38 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -32,8 +32,10 @@ import ( mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) -func (c *ServiceExportClient) removeStatusCondition(name, namespace string, condType mcsv1a1.ServiceExportConditionType, reason string) { - c.doUpdate(name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { +func (c *ServiceExportClient) removeStatusCondition(ctx context.Context, name, namespace string, + condType mcsv1a1.ServiceExportConditionType, reason string, +) { + c.doUpdate(ctx, name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { condition := FindServiceExportStatusCondition(toUpdate.Status.Conditions, condType) if condition != nil && reflect.DeepEqual(condition.Reason, &reason) { logger.V(log.DEBUG).Infof("Removing status condition (Type: %q, Reason: %q) from ServiceExport (%s/%s)", @@ -51,11 +53,13 @@ func (c *ServiceExportClient) removeStatusCondition(name, namespace string, cond }) } -func (c *ServiceExportClient) updateStatusConditions(name, namespace string, conditions ...mcsv1a1.ServiceExportCondition) { - c.tryUpdateStatusConditions(name, namespace, true, conditions...) +func (c *ServiceExportClient) updateStatusConditions(ctx context.Context, name, namespace string, + conditions ...mcsv1a1.ServiceExportCondition, +) { + c.tryUpdateStatusConditions(ctx, name, namespace, true, conditions...) } -func (c *ServiceExportClient) tryUpdateStatusConditions(name, namespace string, canReplace bool, +func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, name, namespace string, canReplace bool, conditions ...mcsv1a1.ServiceExportCondition, ) { findStatusCondition := func(conditions []mcsv1a1.ServiceExportCondition, condType mcsv1a1.ServiceExportConditionType, @@ -71,7 +75,7 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(name, namespace string, return cond } - c.doUpdate(name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { + c.doUpdate(ctx, name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { updated := false for i := range conditions { @@ -100,9 +104,9 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(name, namespace string, }) } -func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) { +func (c *ServiceExportClient) doUpdate(ctx context.Context, name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - obj, err := c.Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { logger.V(log.TRACE).Infof("ServiceExport (%s/%s) not found - unable to update status", namespace, name) return nil @@ -117,8 +121,7 @@ func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpd return nil } - _, err = c.Namespace(toUpdate.Namespace).UpdateStatus(context.TODO(), - c.toUnstructured(toUpdate), metav1.UpdateOptions{}) + _, err = c.Namespace(toUpdate.Namespace).UpdateStatus(ctx, c.toUnstructured(toUpdate), metav1.UpdateOptions{}) return errors.Wrap(err, "error from UpdateStatus") }) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index ed4949b25..54a424325 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -232,12 +232,12 @@ func (c *ServiceImportController) startEndpointsController(serviceImport *mcsv1a return nil } -func (c *ServiceImportController) stopEndpointsController(key string) (bool, error) { +func (c *ServiceImportController) stopEndpointsController(ctx context.Context, key string) (bool, error) { if obj, found := c.endpointControllers.Load(key); found { endpointController := obj.(*ServiceEndpointSliceController) endpointController.stop() - found, err := endpointController.cleanup() + found, err := endpointController.cleanup(ctx) if err == nil { c.endpointControllers.Delete(key) } @@ -251,6 +251,7 @@ func (c *ServiceImportController) stopEndpointsController(key string) (bool, err func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { serviceImport := obj.(*mcsv1a1.ServiceImport) key, _ := cache.MetaNamespaceKeyFunc(serviceImport) + ctx := context.TODO() serviceName := serviceImportSourceName(serviceImport) @@ -262,13 +263,13 @@ func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int logger.V(log.DEBUG).Infof("Local ServiceImport %q %sd", key, op) if op == syncer.Delete { - c.serviceExportClient.updateStatusConditions(serviceName, serviceImport.Labels[constants.LabelSourceNamespace], + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceImport.Labels[constants.LabelSourceNamespace], newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, "NoServiceImport", "ServiceImport was deleted")) return obj, false } else if op == syncer.Create { - c.serviceExportClient.tryUpdateStatusConditions(serviceName, serviceImport.Labels[constants.LabelSourceNamespace], + c.serviceExportClient.tryUpdateStatusConditions(ctx, serviceName, serviceImport.Labels[constants.LabelSourceNamespace], false, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, "AwaitingExport", fmt.Sprintf("ServiceImport %sd - awaiting aggregation on the broker", op))) } @@ -276,7 +277,7 @@ func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int return obj, false } -func (c *ServiceImportController) Distribute(obj runtime.Object) error { +func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Object) error { localServiceImport := c.converter.toServiceImport(obj) key, _ := cache.MetaNamespaceKeyFunc(localServiceImport) @@ -306,7 +307,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { // is determined from the constituent clusters' EndpointSlices, thus each cluster must have a consistent view of all // the EndpointSlices in order for the aggregated port information to be eventually consistent. - result, err := util.CreateOrUpdate(context.Background(), resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), + result, err := util.CreateOrUpdate(ctx, resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), c.converter.toUnstructured(aggregate), func(obj runtime.Object) (runtime.Object, error) { existing := c.converter.toServiceImport(obj) @@ -318,11 +319,12 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { fmt.Sprintf("The service type %q does not match the type (%q) of the existing service export", localServiceImport.Spec.Type, existing.Spec.Type)) - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, conflictCondition, + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, conflictCondition, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, "Unable to export due to an irresolvable conflict")) } else { - c.serviceExportClient.removeStatusCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, typeConflictReason) + c.serviceExportClient.removeStatusCondition(ctx, serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + typeConflictReason) } return obj, nil @@ -332,7 +334,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { } if err != nil { - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, fmt.Sprintf("Unable to export: %v", err))) } @@ -344,7 +346,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { return err } -func (c *ServiceImportController) Delete(obj runtime.Object) error { +func (c *ServiceImportController) Delete(ctx context.Context, obj runtime.Object) error { localServiceImport := c.converter.toServiceImport(obj) key, _ := cache.MetaNamespaceKeyFunc(localServiceImport) @@ -355,13 +357,13 @@ func (c *ServiceImportController) Delete(obj runtime.Object) error { // was never started or if there are no local EndpointSlices, which can happen during reconciliation on startup or // during clean up on uninstall, then we handle removal here. - found, err := c.stopEndpointsController(key) + found, err := c.stopEndpointsController(ctx, key) if err != nil { return err } if !found { - err = c.serviceImportAggregator.updateOnDelete(serviceImportSourceName(localServiceImport), + err = c.serviceImportAggregator.updateOnDelete(ctx, serviceImportSourceName(localServiceImport), localServiceImport.Labels[constants.LabelSourceNamespace]) } @@ -369,7 +371,7 @@ func (c *ServiceImportController) Delete(obj runtime.Object) error { return err } - return c.serviceImportMigrator.onLocalServiceImportDeleted(localServiceImport) + return c.serviceImportMigrator.onLocalServiceImportDeleted(ctx, localServiceImport) } func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index cae4e7671..b285b7e0c 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -48,8 +48,8 @@ func newServiceImportAggregator(brokerClient dynamic.Interface, brokerNamespace, } } -func (a *ServiceImportAggregator) updateOnCreateOrUpdate(name, namespace string) error { - return a.update(name, namespace, func(existing *mcsv1a1.ServiceImport) error { +func (a *ServiceImportAggregator) updateOnCreateOrUpdate(ctx context.Context, name, namespace string) error { + return a.update(ctx, name, namespace, func(existing *mcsv1a1.ServiceImport) error { var added bool existing.Status.Clusters, added = slices.AppendIfNotPresent(existing.Status.Clusters, @@ -60,11 +60,11 @@ func (a *ServiceImportAggregator) updateOnCreateOrUpdate(name, namespace string) a.clusterID, existing.Name, existing.Status.Clusters) } - return a.setServicePorts(existing) + return a.setServicePorts(ctx, existing) }) } -func (a *ServiceImportAggregator) setServicePorts(si *mcsv1a1.ServiceImport) error { +func (a *ServiceImportAggregator) setServicePorts(ctx context.Context, si *mcsv1a1.ServiceImport) error { // We don't set the port info for headless services. if si.Spec.Type != mcsv1a1.ClusterSetIP { return nil @@ -73,7 +73,7 @@ func (a *ServiceImportAggregator) setServicePorts(si *mcsv1a1.ServiceImport) err serviceName := si.Annotations[mcsv1a1.LabelServiceName] serviceNamespace := si.Annotations[constants.LabelSourceNamespace] - list, err := a.brokerClient.Resource(endpointSliceGVR).Namespace(a.brokerNamespace).List(context.Background(), metav1.ListOptions{ + list, err := a.brokerClient.Resource(endpointSliceGVR).Namespace(a.brokerNamespace).List(ctx, metav1.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ discovery.LabelManagedBy: constants.LabelValueManagedBy, constants.LabelSourceNamespace: serviceNamespace, @@ -97,8 +97,8 @@ func (a *ServiceImportAggregator) setServicePorts(si *mcsv1a1.ServiceImport) err return nil } -func (a *ServiceImportAggregator) updateOnDelete(name, namespace string) error { - return a.update(name, namespace, func(existing *mcsv1a1.ServiceImport) error { +func (a *ServiceImportAggregator) updateOnDelete(ctx context.Context, name, namespace string) error { + return a.update(ctx, name, namespace, func(existing *mcsv1a1.ServiceImport) error { var removed bool existing.Status.Clusters, removed = slices.Remove(existing.Status.Clusters, mcsv1a1.ClusterStatus{Cluster: a.clusterID}, @@ -110,11 +110,11 @@ func (a *ServiceImportAggregator) updateOnDelete(name, namespace string) error { logger.V(log.DEBUG).Infof("Removed cluster name %q from aggregated ServiceImport %q. New status: %#v", a.clusterID, existing.Name, existing.Status.Clusters) - return a.setServicePorts(existing) + return a.setServicePorts(ctx, existing) }) } -func (a *ServiceImportAggregator) update(name, namespace string, mutate func(*mcsv1a1.ServiceImport) error) error { +func (a *ServiceImportAggregator) update(ctx context.Context, name, namespace string, mutate func(*mcsv1a1.ServiceImport) error) error { aggregate := &mcsv1a1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", name, namespace), @@ -122,7 +122,7 @@ func (a *ServiceImportAggregator) update(name, namespace string, mutate func(*mc } //nolint:wrapcheck // No need to wrap the return error. - return util.Update(context.Background(), resource.ForDynamic(a.brokerServiceImportClient()), a.converter.toUnstructured(aggregate), + return util.Update(ctx, resource.ForDynamic(a.brokerServiceImportClient()), a.converter.toUnstructured(aggregate), func(obj runtime.Object) (runtime.Object, error) { existing := a.converter.toServiceImport(obj) @@ -134,7 +134,7 @@ func (a *ServiceImportAggregator) update(name, namespace string, mutate func(*mc if len(existing.Status.Clusters) == 0 { logger.V(log.DEBUG).Infof("Deleting aggregated ServiceImport %q", existing.Name) - err := a.brokerServiceImportClient().Delete(context.Background(), existing.Name, metav1.DeleteOptions{ + err := a.brokerServiceImportClient().Delete(ctx, existing.Name, metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{ ResourceVersion: ptr.To(existing.ResourceVersion), }, diff --git a/pkg/agent/controller/service_import_migrator.go b/pkg/agent/controller/service_import_migrator.go index bbd903e7e..f75509794 100644 --- a/pkg/agent/controller/service_import_migrator.go +++ b/pkg/agent/controller/service_import_migrator.go @@ -110,7 +110,7 @@ func (c *ServiceImportMigrator) onSuccessfulSyncFromBroker(obj runtime.Object, o logger.Infof("All remote clusters have been upgraded for service \"%s/%s\" - removing local ServiceImport %q from the broker", aggregatedServiceImport.Namespace, aggregatedServiceImport.Name, localServiceImportName) - err := c.brokerClient.Delete(context.Background(), localServiceImportName, metav1.DeleteOptions{}) + err := c.brokerClient.Delete(context.TODO(), localServiceImportName, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { logger.Error(err, "error deleting legacy ServiceImport from the broker") return true @@ -122,14 +122,14 @@ func (c *ServiceImportMigrator) onSuccessfulSyncFromBroker(obj runtime.Object, o return false } -func (c *ServiceImportMigrator) onLocalServiceImportDeleted(serviceImport *mcsv1a1.ServiceImport) error { +func (c *ServiceImportMigrator) onLocalServiceImportDeleted(ctx context.Context, serviceImport *mcsv1a1.ServiceImport) error { if serviceImport.Labels[LegacySourceClusterLabel] != c.clusterID { return nil } logger.Infof("Legacy local ServiceImport %q deleted - removing from the broker", serviceImport.Name) - err := c.brokerClient.Delete(context.Background(), serviceImport.Name, metav1.DeleteOptions{}) + err := c.brokerClient.Delete(ctx, serviceImport.Name, metav1.DeleteOptions{}) if apierrors.IsNotFound(err) { err = nil } diff --git a/pkg/agent/main.go b/pkg/agent/main.go index 1dea4eab8..e969c46e2 100644 --- a/pkg/agent/main.go +++ b/pkg/agent/main.go @@ -19,7 +19,6 @@ limitations under the License. package main import ( - "context" "errors" "flag" "fmt" @@ -141,7 +140,7 @@ func main() { if agentSpec.Uninstall { logger.Info("Uninstalling lighthouse") - err := lightHouseAgent.Cleanup() + err := lightHouseAgent.Cleanup(ctx) exitOnError(err, "Error cleaning up the lighthouse agent controller") return @@ -156,7 +155,7 @@ func main() { logger.Info("All controllers stopped or exited. Stopping main loop") - if err := httpServer.Shutdown(context.TODO()); err != nil { + if err := httpServer.Shutdown(ctx); err != nil { logger.Error(err, "Error shutting down metrics HTTP server") } }