From bca6ee4940d72295324bac9f3ca13055ece32909 Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Fri, 7 Apr 2023 15:21:51 -0700 Subject: [PATCH] Define the interface for NEG dual stack migration handler --- pkg/neg/syncers/dualstack/migrator.go | 120 +++++++++++++++++++++ pkg/neg/syncers/dualstack/migrator_test.go | 86 +++++++++++++++ pkg/neg/syncers/transaction.go | 39 +++---- pkg/neg/syncers/transaction_test.go | 9 +- 4 files changed, 233 insertions(+), 21 deletions(-) create mode 100644 pkg/neg/syncers/dualstack/migrator.go create mode 100644 pkg/neg/syncers/dualstack/migrator_test.go diff --git a/pkg/neg/syncers/dualstack/migrator.go b/pkg/neg/syncers/dualstack/migrator.go new file mode 100644 index 0000000000..3bcba7bf1c --- /dev/null +++ b/pkg/neg/syncers/dualstack/migrator.go @@ -0,0 +1,120 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dualstack + +import ( + "time" + + "k8s.io/ingress-gce/pkg/neg/types" +) + +const ( + defaultMigrationWaitDuration = 1 * time.Minute +) + +// Migrator exposes functions to control the migration of single-stack +// NEG endpoints to dual-stack NEG endpoints (and vice versa) +// +// # Single-stack vs Dual-stack +// +// - A NEG endpoint is said to be single-stack if it just has an IPv4 or IPv6 +// address (but not both.) +// +// - A NEG endpoint is said to be dual-stack if it has both IPv4 and IPv6 +// address. +// +// # Migration endpoint +// +// An endpoint is said to be a migration-endpoint if its current state is +// single-stack but desired state is dual-stack (and vice versa.) +// +// TODO(gauravkghildiyal): Add details about the heuristics as we go on +// implementing. +type Migrator struct { + // Setting this to false will make all exported functions a no-op. + enableDualStack bool +} + +func NewMigrator(enableDualStack bool) *Migrator { + return &Migrator{enableDualStack: enableDualStack} +} + +// Filter will modify the `addEndpoints` and `removeEndpoints` in TWO DISTINCT +// ways: +// 1. Remove all migration-endpoints, irrespective of whether the migrator is +// paused or not. +// 2. If the migrator is not currently paused, it will also start the +// detachment of a subset of migration-endpoints from a single zone. +// +// The returned string represents the zone for which detachment was started. An +// empty return value signifies that detachment was not started (which is the +// case when there were no migration-endpoints to begin with, or the migrator +// was paused.) +// +// Refer the comment on [Migrator] for further details and +// terminologies. +func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string { + if !d.enableDualStack { + return "" + } + + _, migrationEndpointsInRemoveSet := findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints) + + // TODO(gauravkghildiyal): Implement rate limited migration-detachment. + for zone, endpointSet := range migrationEndpointsInRemoveSet { + if endpointSet.Len() != 0 { + removeEndpoints[zone] = removeEndpoints[zone].Union(endpointSet) + return zone + } + } + + return "" +} + +// Pause will prevent any subsequent Filter() invocations from starting +// detachment of migration-endpoints. Pause should be invoked before starting +// any NEG-endpoint detach operations that include migration-endpoints. +// +// Invoking Pause on a migrator which is already paused will be a no-op. +// +// Pause is usually paired with a Continue() invocation once the NEG-endpoint +// detach operation completes. +func (d *Migrator) Pause() { + if !d.enableDualStack { + return + } +} + +// Continue will unpause the migration. It expects an error as input which +// specifies the result of the NEG-endpoint detach operation. Depending on +// whether the detach operation passed or failed, the effect of unpause could be +// delayed: +// - If the NEG detach operation failed, the migration will be unpaused +// immediately before Continue returns. This would allow any resyncs to +// reattempt the migration. The migrator itself doesn't trigger any sync in +// this case. +// - If the NEG detach operation succeeded, a migrationWaitDuration timer will +// be started, which upon completion will unpause the migration and also +// trigger another sync. Continue will not keep the caller blocked for the +// completion of the timer. If Continue is invoked multiple times, only the +// first continue will trigger a resync. +func (d *Migrator) Continue(err error) { + if !d.enableDualStack { + return + } +} + diff --git a/pkg/neg/syncers/dualstack/migrator_test.go b/pkg/neg/syncers/dualstack/migrator_test.go new file mode 100644 index 0000000000..5f92551dcc --- /dev/null +++ b/pkg/neg/syncers/dualstack/migrator_test.go @@ -0,0 +1,86 @@ +package dualstack + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/ingress-gce/pkg/neg/types" +) + +func TestFilter(t *testing.T) { + testCases := []struct { + desc string + migrator *Migrator + addEndpoints map[string]types.NetworkEndpointSet + removeEndpoints map[string]types.NetworkEndpointSet + committedEndpoints map[string]types.NetworkEndpointSet + wantAddEndpoints map[string]types.NetworkEndpointSet + wantRemoveEndpoints map[string]types.NetworkEndpointSet + wantMigrationZone bool + }{ + { + desc: "migrator should do nothing if enableDualStack is false", + migrator: &Migrator{enableDualStack: false}, + addEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, // migrating + {IP: "b"}, + }...), + }, + removeEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a"}, // migrating + {IP: "c", IPv6: "C"}, + }...), + }, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IPv6: "D"}, + }...), + }, + wantAddEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, // migrating + {IP: "b"}, + }...), + }, + wantRemoveEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a"}, // migrating + {IP: "c", IPv6: "C"}, + }...), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gotAddEndpoints := cloneZoneNetworkEndpointsMap(tc.addEndpoints) + gotRemoveEndpoints := cloneZoneNetworkEndpointsMap(tc.removeEndpoints) + gotCommittedEndpoints := cloneZoneNetworkEndpointsMap(tc.committedEndpoints) + gotMigrationZone := tc.migrator.Filter(gotAddEndpoints, gotRemoveEndpoints, gotCommittedEndpoints) + + if tc.wantMigrationZone && gotMigrationZone == "" { + t.Errorf("Filter() returned empty migrationZone; want non empty migrationZone") + } + + if diff := cmp.Diff(tc.wantAddEndpoints, gotAddEndpoints); diff != "" { + t.Errorf("Filter() returned unexpected diff in addEndpoints (-want +got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantRemoveEndpoints, gotRemoveEndpoints); diff != "" { + t.Errorf("Filter() returned unexpected diff in removeEndpoints (-want +got):\n%s", diff) + } + if diff := cmp.Diff(tc.committedEndpoints, gotCommittedEndpoints); diff != "" { + t.Errorf("Filter() returned unexpected diff in committedEndpoints; want no diff; (-want +got):\n%s", diff) + } + }) + } +} + +func cloneZoneNetworkEndpointsMap(m map[string]types.NetworkEndpointSet) map[string]types.NetworkEndpointSet { + clone := make(map[string]types.NetworkEndpointSet) + for zone, endpointSet := range m { + clone[zone] = types.NewNetworkEndpointSet(endpointSet.List()...) + } + return clone +} diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 9f5c8f0572..837f6b2d74 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -44,6 +44,7 @@ import ( "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/neg/metrics" "k8s.io/ingress-gce/pkg/neg/readiness" + "k8s.io/ingress-gce/pkg/neg/syncers/dualstack" "k8s.io/ingress-gce/pkg/neg/syncers/labels" negtypes "k8s.io/ingress-gce/pkg/neg/types" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" @@ -113,9 +114,7 @@ type transactionSyncer struct { // podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints podLabelPropagationConfig labels.PodLabelPropagationConfig - // enableDualStackNEG indicates if IPv6 endpoints should be considered while - // reconciling NEGs. - enableDualStackNEG bool + dsMigrator *dualstack.Migrator } func NewTransactionSyncer( @@ -163,7 +162,7 @@ func NewTransactionSyncer( logger: logger, enableDegradedMode: flags.F.EnableDegradedMode, podLabelPropagationConfig: lpConfig, - enableDualStackNEG: enableDualStackNEG, + dsMigrator: dualstack.NewMigrator(enableDualStackNEG), } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) @@ -287,17 +286,10 @@ func (s *transactionSyncer) syncInternalImpl() error { // Calculate Pods that are already in the NEG _, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap) - if s.enableDualStackNEG { - // Identification of migration endpoints should happen before we filter out - // the transaction endpoints. Not doing so could result in an attempt to - // attach an endpoint which is still undergoing detachment-due-to-migration. - _, migratingEndpointsInRemoveSet := findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints) - - // TODO(gauravkghildiyal): Implement rate limited migration-detachment. - for zone, endpointSet := range migratingEndpointsInRemoveSet { - removeEndpoints[zone] = removeEndpoints[zone].Union(endpointSet) - } - } + // Filtering of migration endpoints should happen before we filter out + // the transaction endpoints. Not doing so could result in an attempt to + // attach an endpoint which is still undergoing detachment-due-to-migration. + migrationZone := s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints) // Filter out the endpoints with existing transaction // This mostly happens when transaction entry require reconciliation but the transaction is still progress @@ -325,7 +317,7 @@ func (s *transactionSyncer) syncInternalImpl() error { s.logEndpoints(addEndpoints, "adding endpoint") s.logEndpoints(removeEndpoints, "removing endpoint") - return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, endpointPodLabelMap) + return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, endpointPodLabelMap, migrationZone) } func (s *transactionSyncer) getEndpointsCalculation( @@ -435,7 +427,7 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti } // syncNetworkEndpoints spins off go routines to execute NEG operations -func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap) error { +func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap, migrationZone string) error { syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { for zone, endpointSet := range endpointMap { zone := zone @@ -463,7 +455,12 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m go s.attachNetworkEndpoints(zone, batch) } if operation == detachOp { - go s.detachNetworkEndpoints(zone, batch) + if zone == migrationZone { + // Prevent any further migration-detachments from starting while one + // is already in progress. + s.dsMigrator.Pause() + } + go s.detachNetworkEndpoints(zone, batch, zone == migrationZone) } } return nil @@ -493,6 +490,12 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) err := s.operationInternal(detachOp, zone, networkEndpointMap) + if hasMigrationDetachments { + // Unpause the migration since the ongoing migration-detachments have + // concluded. + s.dsMigrator.Continue(err) + } + // WARNING: commitTransaction must be called at last for analyzing the operation result s.commitTransaction(err, networkEndpointMap) } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index ef87b251fb..18ed4a731d 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -204,7 +204,10 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } for _, tc := range testCases { - err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, labels.EndpointPodLabelMap{}) + // TODO(gauravkghildiyal): When the DualStack Migrator is fully + // implemented, check if we need to cover scenarios where `migrationZone` + // is not empty. + err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, labels.EndpointPodLabelMap{}, "") if err != nil { t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) } @@ -380,7 +383,7 @@ func TestSyncNetworkEndpointLabel(t *testing.T) { if err := transactionSyncer.ensureNetworkEndpointGroups(); err != nil { t.Errorf("Expect error == nil, but got %v", err) } - err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, map[string]negtypes.NetworkEndpointSet{}, tc.endpointPodLabelMap) + err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, map[string]negtypes.NetworkEndpointSet{}, tc.endpointPodLabelMap, "") if err != nil { t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) } @@ -1912,7 +1915,7 @@ func TestGetEndpointPodLabelMap(t *testing.T) { func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false) - ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG) + ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false) return negsyncer, ts }