diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 595e0e3a..9e61af96 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -21,6 +21,7 @@ package controller_test import ( "fmt" "strconv" + "time" . "github.com/onsi/ginkgo/v2" "github.com/submariner-io/admiral/pkg/fake" @@ -201,6 +202,9 @@ func testClusterIPServiceInOneCluster() { t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, } + + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig }) It("should be propagated to the ServiceImport", func() { @@ -388,11 +392,6 @@ func testClusterIPServiceInTwoClusters() { BeforeEach(func() { t = newTestDiver() - - t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP - t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ - ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, - } }) JustBeforeEach(func() { @@ -400,27 +399,48 @@ func testClusterIPServiceInTwoClusters() { t.cluster1.createService() t.cluster1.createServiceExport() - t.justBeforeEach() + t.cluster1.start(t, *t.syncerConfig) + + // Sleep a little before starting the second cluster to ensure its resource CreationTimestamps will be + // later than the first cluster to ensure conflict checking in deterministic. + time.Sleep(100 * time.Millisecond) t.cluster2.createServiceEndpointSlices() t.cluster2.createService() t.cluster2.createServiceExport() + + t.cluster2.start(t, *t.syncerConfig) }) AfterEach(func() { t.afterEach() }) - It("should export the service in both clusters", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, "")) - t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, "")) - t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) - t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + Context("", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + + t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig - By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status") + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + }) - t.cluster1.ensureNoServiceExportActions() + It("should export the service in both clusters", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, "")) + t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, "")) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + + By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status") + + t.cluster1.ensureNoServiceExportActions() + }) }) Context("with differing ports", func() { @@ -492,6 +512,165 @@ func testClusterIPServiceInTwoClusters() { }) }) }) + + Context("with differing service SessionAffinity", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + }) + + It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + + Context("initially and after updating the SessionAffinity on the conflicting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + + By("Updating the SessionAffinity on the service") + + t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.cluster2.updateService() + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after updating the SessionAffinity on the oldest exporting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + + By("Updating the SessionAffinity on the service") + + t.cluster1.service.Spec.SessionAffinity = t.cluster2.service.Spec.SessionAffinity + t.cluster1.updateService() + + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after the service on the oldest exporting cluster is unexported", func() { + It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + + By("Unexporting the service") + + t.cluster1.deleteServiceExport() + + t.aggregatedSessionAffinity = t.cluster2.service.Spec.SessionAffinity + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2) + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + }) + + Context("with differing service SessionAffinityConfig", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + }) + + It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + + Context("initially and after updating the SessionAffinityConfig on the conflicting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + + By("Updating the SessionAffinityConfig on the service") + + t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + t.cluster2.updateService() + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after updating the SessionAffinityConfig on the oldest exporting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + + By("Updating the SessionAffinityConfig on the service") + + t.cluster1.service.Spec.SessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig + t.cluster1.updateService() + + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after the service on the oldest exporting cluster is unexported", func() { + BeforeEach(func() { + t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(20))}, + } + }) + + It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + + By("Unexporting the service") + + t.cluster1.deleteServiceExport() + + t.aggregatedSessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2) + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + }) + + Context("with differing service SessionAffinity and SessionAffinityConfig", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + }) + + It("should resolve the conflicts and set the Conflict status condition on the conflicting cluster", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + fmt.Sprintf("%s,%s", controller.SessionAffinityConflictReason, controller.SessionAffinityConfigConflictReason))) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + }) } func testClusterIPServiceWithMultipleEPS() { diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index 576ad76f..472c46f6 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -135,16 +135,18 @@ type cluster struct { } type testDriver struct { - cluster1 cluster - cluster2 cluster - brokerServiceImportClient dynamic.NamespaceableResourceInterface - brokerEndpointSliceClient dynamic.ResourceInterface - brokerEndpointSliceReactor *fake.FailingReactor - stopCh chan struct{} - syncerConfig *broker.SyncerConfig - doStart bool - brokerServiceImportReactor *fake.FailingReactor - aggregatedServicePorts []mcsv1a1.ServicePort + cluster1 cluster + cluster2 cluster + brokerServiceImportClient dynamic.NamespaceableResourceInterface + brokerEndpointSliceClient dynamic.ResourceInterface + brokerEndpointSliceReactor *fake.FailingReactor + stopCh chan struct{} + syncerConfig *broker.SyncerConfig + doStart bool + brokerServiceImportReactor *fake.FailingReactor + aggregatedServicePorts []mcsv1a1.ServicePort + aggregatedSessionAffinity corev1.ServiceAffinity + aggregatedSessionAffinityConfig *corev1.SessionAffinityConfig } func newTestDiver() *testDriver { @@ -163,7 +165,8 @@ func newTestDiver() *testDriver { fake.AddBasicReactors(&brokerClient.Fake) t := &testDriver{ - aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2}, + aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2}, + aggregatedSessionAffinity: corev1.ServiceAffinityNone, cluster1: cluster{ clusterID: clusterID1, agentSpec: controller.AgentSpecification{ @@ -595,8 +598,7 @@ func (c *cluster) findLocalServiceImport() *mcsv1a1.ServiceImport { for i := range list.Items { if list.Items[i].GetLabels()[mcsv1a1.LabelServiceName] == c.service.Name && list.Items[i].GetLabels()[constants.LabelSourceNamespace] == c.service.Namespace { - serviceImport := &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(&list.Items[i], serviceImport, nil)).To(Succeed()) + serviceImport := toServiceImport(&list.Items[i]) return serviceImport } @@ -645,8 +647,7 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected return false, nil } - serviceImport = &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(obj, serviceImport, nil)).To(Succeed()) + serviceImport = toServiceImport(obj) sortSlices(serviceImport) @@ -667,6 +668,13 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected Expect(serviceImport.Labels).To(BeEmpty()) } +func getServiceImport(client dynamic.NamespaceableResourceInterface, namespace, name string) *mcsv1a1.ServiceImport { + obj, err := client.Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + Expect(err).To(Succeed()) + + return toServiceImport(obj) +} + func findEndpointSlices(client dynamic.ResourceInterface, namespace, name, clusterID string) []*discovery.EndpointSlice { list, err := client.List(context.TODO(), metav1.ListOptions{}) if resource.IsMissingNamespaceErr(err) { @@ -777,9 +785,10 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp Namespace: test.RemoteNamespace, }, Spec: mcsv1a1.ServiceImportSpec{ - Type: sType, - Ports: []mcsv1a1.ServicePort{}, - SessionAffinity: corev1.ServiceAffinityNone, + Type: sType, + Ports: []mcsv1a1.ServicePort{}, + SessionAffinity: t.aggregatedSessionAffinity, + SessionAffinityConfig: t.aggregatedSessionAffinityConfig, }, } @@ -791,14 +800,6 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp for _, c := range clusters { expServiceImport.Status.Clusters = append(expServiceImport.Status.Clusters, mcsv1a1.ClusterStatus{Cluster: c.clusterID}) - - if c.service.Spec.SessionAffinity != corev1.ServiceAffinityNone { - expServiceImport.Spec.SessionAffinity = c.service.Spec.SessionAffinity - } - - if c.service.Spec.SessionAffinityConfig != nil { - expServiceImport.Spec.SessionAffinityConfig = c.service.Spec.SessionAffinityConfig - } } } @@ -938,6 +939,13 @@ func toServiceExport(obj interface{}) *mcsv1a1.ServiceExport { return se } +func toServiceImport(obj interface{}) *mcsv1a1.ServiceImport { + si := &mcsv1a1.ServiceImport{} + Expect(scheme.Scheme.Convert(obj, si, nil)).To(Succeed()) + + return si +} + func (t *testDriver) awaitNonHeadlessServiceExported(clusters ...*cluster) { t.awaitServiceExported(mcsv1a1.ClusterSetIP, clusters...) } diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 6ef1b35c..a2c40192 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -268,7 +268,7 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) - } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { + } else if c.serviceExportClient.hasCondition(name, namespace, mcsv1a1.ServiceExportConflict, PortConflictReason) { c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, "")) } diff --git a/pkg/agent/controller/reconciliation_test.go b/pkg/agent/controller/reconciliation_test.go index 5d30ae10..2ab140ad 100644 --- a/pkg/agent/controller/reconciliation_test.go +++ b/pkg/agent/controller/reconciliation_test.go @@ -35,7 +35,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/testing" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -174,12 +173,7 @@ var _ = Describe("Reconciliation", func() { When("a remote aggregated ServiceImport is stale in the local datastore on startup", func() { It("should delete it from the local datastore on reconciliation", func() { - obj, err := t.cluster2.localServiceImportClient.Namespace(t.cluster1.service.Namespace).Get(context.TODO(), - t.cluster1.service.Name, metav1.GetOptions{}) - Expect(err).To(Succeed()) - - serviceImport := &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(obj, serviceImport, nil)).To(Succeed()) + serviceImport := getServiceImport(t.cluster2.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name) t.afterEach() t = newTestDiver() diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index ea939ec6..49c41d60 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -58,6 +58,10 @@ func (c *ServiceExportClient) UpdateStatusConditions(ctx context.Context, name, func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, name, namespace string, canReplace bool, conditions ...mcsv1a1.ServiceExportCondition, ) { + if len(conditions) == 0 { + return + } + findStatusCondition := func(conditions []mcsv1a1.ServiceExportCondition, condType mcsv1a1.ServiceExportConditionType, ) *mcsv1a1.ServiceExportCondition { cond := FindServiceExportStatusCondition(conditions, condType) @@ -81,6 +85,7 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam if prevCond == nil { if condition.Type == mcsv1a1.ServiceExportConflict && condition.Status == corev1.ConditionFalse { + // The caller intends to clear the Conflict condition so don't add it. continue } @@ -90,12 +95,14 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam toUpdate.Status.Conditions = append(toUpdate.Status.Conditions, *condition) updated = true } else if condition.Type == mcsv1a1.ServiceExportConflict { - updated = updated || c.mergeConflictCondition(prevCond, condition) - if updated { + condUpdated := c.mergeConflictCondition(prevCond, condition) + if condUpdated { logger.V(log.DEBUG).Infof( "Update status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q", namespace, name, condition.Type, prevCond.Status, *prevCond.Reason, *prevCond.Message) } + + updated = updated || condUpdated } else if serviceExportConditionEqual(prevCond, condition) { logger.V(log.TRACE).Infof("Last ServiceExportCondition for (%s/%s) is equal - not updating status: %#v", namespace, name, prevCond) @@ -116,11 +123,11 @@ func (c *ServiceExportClient) mergeConflictCondition(to, from *mcsv1a1.ServiceEx var reasons, messages []string if ptr.Deref(to.Reason, "") != "" { - reasons = strings.Split(ptr.Deref(to.Reason, ""), ",") + reasons = strings.Split(*to.Reason, ",") } if ptr.Deref(to.Message, "") != "" { - messages = strings.Split(ptr.Deref(to.Message, ""), "\n") + messages = strings.Split(*to.Message, "\n") } index := goslices.Index(reasons, *from.Reason) @@ -196,6 +203,18 @@ func (c *ServiceExportClient) getLocalInstance(name, namespace string) *mcsv1a1. return obj.(*mcsv1a1.ServiceExport) } +//nolint:unparam // Ignore `condType` always receives `mcsv1a1.ServiceExportConflict` +func (c *ServiceExportClient) hasCondition(name, namespace string, condType mcsv1a1.ServiceExportConditionType, reason string) bool { + se := c.getLocalInstance(name, namespace) + if se == nil { + return false + } + + cond := FindServiceExportStatusCondition(se.Status.Conditions, condType) + + return cond != nil && strings.Contains(ptr.Deref(cond.Reason, ""), reason) +} + func serviceExportConditionEqual(c1, c2 *mcsv1a1.ServiceExportCondition) bool { return c1.Type == c2.Type && c1.Status == c2.Status && reflect.DeepEqual(c1.Reason, c2.Reason) && reflect.DeepEqual(c1.Message, c2.Message) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index a53b8c13..b584a39a 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -21,6 +21,10 @@ package controller import ( "context" "fmt" + "math" + "reflect" + "strconv" + "strings" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -36,13 +40,17 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/utils/set" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) +const timestampAnnotationPrefix = "timestamp.submariner.io/" + //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. func newServiceImportController(spec *AgentSpecification, syncerMetricNames AgentConfig, syncerConfig broker.SyncerConfig, brokerClient dynamic.Interface, brokerNamespace string, serviceExportClient *ServiceExportClient, @@ -97,7 +105,7 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen Federator: federate.NewCreateOrUpdateFederator(syncerConfig.LocalClient, syncerConfig.RestMapper, corev1.NamespaceAll, ""), ResourceType: &mcsv1a1.ServiceImport{}, Transform: controller.onRemoteServiceImport, - OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker, + OnSuccessfulSync: controller.onSuccessfulSyncFromBroker, Scheme: syncerConfig.Scheme, NamespaceInformer: syncerConfig.NamespaceInformer, SyncCounterOpts: &prometheus.GaugeOpts{ @@ -290,12 +298,25 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob serviceName := serviceImportSourceName(localServiceImport) serviceNamespace := localServiceImport.Labels[constants.LabelSourceNamespace] + localTimestamp := strconv.FormatInt(int64(math.MaxInt64-1), 10) + + // As per the MCS spec, a conflict will be resolved by assigning precedence based on each ServiceExport's + // creationTimestamp, from oldest to newest. We don't have access to other cluster's ServiceExports so + // instead add our ServiceExport's creationTimestamp as an annotation on the aggregated ServiceImport. + localServiceExport := c.serviceExportClient.getLocalInstance(serviceName, serviceNamespace) + if localServiceExport != nil { + localTimestamp = strconv.FormatInt(localServiceExport.CreationTimestamp.UTC().UnixNano(), 10) + } + + timestampAnnotationKey := makeTimestampAnnotationKey(c.clusterID) + aggregate := &mcsv1a1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", serviceName, serviceNamespace), Annotations: map[string]string{ mcsv1a1.LabelServiceName: serviceName, constants.LabelSourceNamespace: serviceNamespace, + timestampAnnotationKey: localTimestamp, }, }, Spec: mcsv1a1.ServiceImportSpec{ @@ -313,12 +334,13 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob }, } - conflict := false + typeConflict := false - // Here we just create the aggregated ServiceImport on the broker. We don't merge the local service info until we've - // successfully synced our local EndpointSlice to the broker. This is mainly done b/c the aggregated port information - // is determined from the constituent clusters' EndpointSlices, thus each cluster must have a consistent view of all - // the EndpointSlices in order for the aggregated port information to be eventually consistent. + // Here we create the aggregated ServiceImport on the broker or update the existing instance with our local service + // info, but we don't add/merge our local service ports until we've successfully synced our local EndpointSlice to + // the broker. This is mainly done b/c the aggregated port information is determined from the constituent clusters' + // EndpointSlices, thus each cluster must have a consistent view of all the EndpointSlices in order for the + // aggregated port information to be eventually consistent. result, err := util.CreateOrUpdate(ctx, resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), @@ -327,27 +349,32 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob existing := c.converter.toServiceImport(obj) if localServiceImport.Spec.Type != existing.Spec.Type { - conflict = true + typeConflict = true conflictCondition := newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, TypeConflictReason, - fmt.Sprintf("The service type %q does not match the type (%q) of the existing service export", + fmt.Sprintf("The local service type (%q) does not match the type (%q) of the existing exported service", localServiceImport.Spec.Type, existing.Spec.Type)) c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conflictCondition, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, ExportFailedReason, "Unable to export due to an irresolvable conflict")) } else { - c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition( - mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, TypeConflictReason, "")) - - if existing.Spec.SessionAffinity == "" || existing.Spec.SessionAffinity == corev1.ServiceAffinityNone { - existing.Spec.SessionAffinity = localServiceImport.Spec.SessionAffinity + if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, TypeConflictReason) { + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, TypeConflictReason, "")) } - if existing.Spec.SessionAffinityConfig == nil { - existing.Spec.SessionAffinityConfig = localServiceImport.Spec.SessionAffinityConfig + if existing.Annotations == nil { + existing.Annotations = map[string]string{} } + existing.Annotations[timestampAnnotationKey] = localTimestamp + + // Update the appropriate aggregated ServiceImport fields if we're the oldest exporting cluster + _ = c.updateAggregatedServiceImport(existing, localServiceImport) + + c.checkConflicts(ctx, existing, localServiceImport) + var added bool existing.Status.Clusters, added = slices.AppendIfNotPresent(existing.Status.Clusters, @@ -361,7 +388,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob return c.converter.toUnstructured(existing), nil }) - if err == nil && !conflict { + if err == nil && !typeConflict { err = c.startEndpointsController(localServiceImport) } @@ -424,6 +451,101 @@ func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ in return c.serviceImportMigrator.onRemoteServiceImport(serviceImport) } +func (c *ServiceImportController) onSuccessfulSyncFromBroker(synced runtime.Object, op syncer.Operation) bool { + ctx := context.TODO() + + retry := c.serviceImportMigrator.onSuccessfulSyncFromBroker(synced, op) + + aggregatedServiceImport := synced.(*mcsv1a1.ServiceImport) + + // Check for conflicts with the local ServiceImport + + siList := c.localSyncer.ListResourcesBySelector(k8slabels.SelectorFromSet(map[string]string{ + mcsv1a1.LabelServiceName: aggregatedServiceImport.Name, + constants.LabelSourceNamespace: aggregatedServiceImport.Namespace, + constants.MCSLabelSourceCluster: c.clusterID, + })) + + if len(siList) == 0 { + // Service not exported locally. + return retry + } + + localServiceImport := siList[0].(*mcsv1a1.ServiceImport) + + // This handles the case where the previously oldest exporting cluster has unexported its service. If we're now + // the oldest exporting cluster, then update the appropriate aggregated ServiceImport fields to match those of + // our service's. + if c.updateAggregatedServiceImport(aggregatedServiceImport, localServiceImport) { + err := c.serviceImportAggregator.update(ctx, aggregatedServiceImport.Name, aggregatedServiceImport.Namespace, + func(aggregated *mcsv1a1.ServiceImport) error { + aggregated.Spec.SessionAffinity = localServiceImport.Spec.SessionAffinity + aggregated.Spec.SessionAffinityConfig = localServiceImport.Spec.SessionAffinityConfig + + return nil + }) + if err != nil { + logger.Errorf(err, "error updating aggregated ServiceImport on broker sync") + + return true + } + } + + c.checkConflicts(ctx, aggregatedServiceImport, localServiceImport) + + return retry +} + +func (c *ServiceImportController) checkConflicts(ctx context.Context, aggregated, local *mcsv1a1.ServiceImport) { + var conditions []mcsv1a1.ServiceExportCondition + + serviceName := local.Labels[mcsv1a1.LabelServiceName] + serviceNamespace := local.Labels[constants.LabelSourceNamespace] + + precedentCluster := findClusterWithOldestTimestamp(aggregated.Annotations) + + if local.Spec.SessionAffinity != aggregated.Spec.SessionAffinity { + conditions = append(conditions, newServiceExportCondition(mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, + SessionAffinityConflictReason, + fmt.Sprintf("The local service SessionAffinity %q conflicts with other constituent clusters. "+ + "Using SessionAffinity %q from the oldest exported service in cluster %q.", + local.Spec.SessionAffinity, aggregated.Spec.SessionAffinity, precedentCluster))) + } else if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + SessionAffinityConflictReason) { + conditions = append(conditions, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, SessionAffinityConflictReason, "")) + } + + if !reflect.DeepEqual(local.Spec.SessionAffinityConfig, aggregated.Spec.SessionAffinityConfig) { + conditions = append(conditions, newServiceExportCondition(mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, + SessionAffinityConfigConflictReason, + fmt.Sprintf("The local service SessionAffinityConfig %q conflicts with other constituent clusters. "+ + "Using SessionAffinityConfig %q from the oldest exported service in cluster %q.", + toSessionAffinityConfigString(local.Spec.SessionAffinityConfig), + toSessionAffinityConfigString(aggregated.Spec.SessionAffinityConfig), precedentCluster))) + } else if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + SessionAffinityConfigConflictReason) { + conditions = append(conditions, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, SessionAffinityConfigConflictReason, "")) + } + + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conditions...) +} + +func (c *ServiceImportController) updateAggregatedServiceImport(aggregated, local *mcsv1a1.ServiceImport) bool { + oldestCluster := findClusterWithOldestTimestamp(aggregated.Annotations) + if oldestCluster != sanitizeClusterID(c.clusterID) { + return false + } + + origSpec := aggregated.Spec + + aggregated.Spec.SessionAffinity = local.Spec.SessionAffinity + aggregated.Spec.SessionAffinityConfig = local.Spec.SessionAffinityConfig + + return !reflect.DeepEqual(origSpec, aggregated.Spec) +} + func (c *ServiceImportController) localServiceImportLister(transform func(si *mcsv1a1.ServiceImport) runtime.Object) []runtime.Object { siList := c.localSyncer.ListResources() @@ -442,3 +564,48 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc return retList } + +func findClusterWithOldestTimestamp(from map[string]string) string { + oldest := int64(math.MaxInt64) + foundCluster := "" + + for k, v := range from { + cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) + if !found { + continue + } + + t, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) + continue + } + + if t < oldest || (t == oldest && cluster < foundCluster) { + foundCluster = cluster + oldest = t + } + } + + return foundCluster +} + +func toSessionAffinityConfigString(c *corev1.SessionAffinityConfig) string { + if c != nil && c.ClientIP != nil && c.ClientIP.TimeoutSeconds != nil { + return fmt.Sprintf("ClientIP TimeoutSeconds: %d", *c.ClientIP.TimeoutSeconds) + } + + return "none" +} + +func makeTimestampAnnotationKey(clusterID string) string { + return timestampAnnotationPrefix + sanitizeClusterID(clusterID) +} + +func sanitizeClusterID(clusterID string) string { + if len(clusterID) > validation.DNS1123LabelMaxLength { + clusterID = clusterID[:validation.DNS1123LabelMaxLength] + } + + return resource.EnsureValidName(clusterID) +} diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index 4202ae61..ef1c446f 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -101,6 +101,8 @@ func (a *ServiceImportAggregator) updateOnDelete(ctx context.Context, name, name logger.V(log.DEBUG).Infof("Removed cluster name %q from aggregated ServiceImport %q. New status: %#v", a.clusterID, existing.Name, existing.Status.Clusters) + delete(existing.Annotations, makeTimestampAnnotationKey(a.clusterID)) + return a.setServicePorts(ctx, existing) }) } diff --git a/pkg/agent/controller/service_import_migration_test.go b/pkg/agent/controller/service_import_migration_test.go index 6e221da7..5400702a 100644 --- a/pkg/agent/controller/service_import_migration_test.go +++ b/pkg/agent/controller/service_import_migration_test.go @@ -32,7 +32,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes/scheme" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -171,12 +170,8 @@ func testServiceImportMigration() { // Get the aggregated ServiceImport on the broker. - obj, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).Get(context.Background(), - fmt.Sprintf("%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace), metav1.GetOptions{}) - Expect(err).To(Succeed()) - - aggregatedServiceImport := &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(obj, aggregatedServiceImport, nil)).To(Succeed()) + aggregatedServiceImport := getServiceImport(t.brokerServiceImportClient, test.RemoteNamespace, + fmt.Sprintf("%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace)) By(fmt.Sprintf("Upgrade the first remote cluster %q", remoteServiceImport1.Status.Clusters[0].Cluster)) diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index f81c02ae..8254de4a 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -36,9 +36,11 @@ import ( ) const ( - ExportFailedReason = "ExportFailed" - TypeConflictReason = "ConflictingType" - PortConflictReason = "ConflictingPorts" + ExportFailedReason = "ExportFailed" + TypeConflictReason = "ConflictingType" + PortConflictReason = "ConflictingPorts" + SessionAffinityConflictReason = "ConflictingSessionAffinity" + SessionAffinityConfigConflictReason = "ConflictingSessionAffinityConfig" ) type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object