From af1ce523fe4ec802e768976bcc00176674688799 Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Thu, 30 Mar 2023 13:20:45 -0700 Subject: [PATCH 1/3] refactor: keep attach/detch functions synchronous and move async behaviour to caller --- pkg/neg/syncers/transaction.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 6a2ef955b8..8e0567aa0c 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -407,6 +407,7 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { for zone, endpointSet := range endpointMap { + zone := zone if endpointSet.Len() == 0 { s.logger.V(2).Info("0 endpoints in the endpoint list. Skipping operation", "operation", attachOp, "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) continue @@ -428,10 +429,18 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } if operation == attachOp { - s.attachNetworkEndpoints(zone, batch, &wg) + wg.Add(1) + go func() { + defer wg.Done() + _ = s.attachNetworkEndpoints(zone, batch) + }() } if operation == detachOp { - s.detachNetworkEndpoints(zone, batch, &wg) + wg.Add(1) + go func() { + defer wg.Done() + _ = s.detachNetworkEndpoints(zone, batch) + }() } } return nil @@ -471,25 +480,22 @@ func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) { s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult) } -// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints -func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { +// attachNetworkEndpoints runs operation for attaching network endpoints. +func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - wg.Add(1) - go s.operationInternal(attachOp, zone, networkEndpointMap, wg) + return s.operationInternal(attachOp, zone, networkEndpointMap) } -// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { +// detachNetworkEndpoints runs operation for detaching network endpoints. +func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - wg.Add(1) - go s.operationInternal(detachOp, zone, networkEndpointMap, wg) + return s.operationInternal(detachOp, zone, networkEndpointMap) } // operationInternal executes NEG API call and commits the transactions // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync -func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { - defer wg.Done() +func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { var err error start := time.Now() networkEndpoints := []*composite.NetworkEndpoint{} @@ -519,6 +525,7 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri // WARNING: commitTransaction must be called at last for analyzing the operation result s.commitTransaction(err, networkEndpointMap) metrics.PublishNegOperationMetrics(operation.String(), string(s.NegSyncerKey.NegType), string(s.NegSyncerKey.GetAPIVersion()), err, len(networkEndpointMap), start) + return err } func (s *transactionSyncer) recordEvent(eventType, reason, eventDesc string) { From 81ec30709b70269f2378257d68a17d2a0823d561 Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Thu, 30 Mar 2023 11:44:22 -0700 Subject: [PATCH 2/3] Define the interface for the NEG dual stack migration handler. --- pkg/neg/syncers/dual_stack_migrator.go | 98 ++++++++++++++++++++++++++ pkg/neg/syncers/transaction.go | 42 ++++++++--- pkg/neg/syncers/transaction_test.go | 2 +- 3 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 pkg/neg/syncers/dual_stack_migrator.go diff --git a/pkg/neg/syncers/dual_stack_migrator.go b/pkg/neg/syncers/dual_stack_migrator.go new file mode 100644 index 0000000000..42e25c1af7 --- /dev/null +++ b/pkg/neg/syncers/dual_stack_migrator.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncers + +import "k8s.io/ingress-gce/pkg/neg/types" + +// DualStackMigrator exposes functions to control the migration of single-stack +// NEG endpoints to dual-stack NEG endpoints (and vice versa) +// +// # Single-stack vs Dual-stack +// +// - A NEG endpoint is said to be single-stack if it just has an IPv4 or IPv6 +// address (but not both.) +// +// - A NEG endpoint is said to be dual-stack if it has both IPv4 and IPv6 +// address. +// +// # Migration endpoint +// +// An endpoint is said to be a migration-endpoint if its current state is +// single-stack but desired state is dual-stack (and vice versa.) +// +// TODO(gauravkghildiyal): Add details about the heuristics as we go on +// implementing. +type DualStackMigrator struct { + // Setting this to false will make all exported functions a no-op. + enableDualStackNEG bool +} + +// Filter will modify the `addEndpoints` and `removeEndpoints` in TWO DISTINCT +// ways: +// 1. Remove all migration-endpoints, irrespective of whether the migrator is +// paused or not. +// 2. If the migrator is not currently paused, it will also start the +// detachment of a subset of migration-endpoints. +// +// Refer the comment on [DualStackMigrator] for further details and +// terminologies. +func (d *DualStackMigrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) { + if !d.enableDualStackNEG { + return + } +} + +// Pause will prevent any subsequent Filter() invocations from starting +// detachment of migration-endpoints. Pause should be invoked before starting +// any NEG-endpoint detach operations that include migration-endpoints. +// +// Invoking Pause on a migrator which is already paused will be a no-op. +// +// Pause is usually paired with a Continue() invocation once the NEG-endpoint +// detach operation completes. +func (d *DualStackMigrator) Pause() { + if !d.enableDualStackNEG { + return + } +} + +// IsPaused returns whether the migrator is paused or not. +func (d *DualStackMigrator) IsPaused() bool { + if !d.enableDualStackNEG { + return true + } + // TODO(gauravkghildiyal): Returns true until implemented. + return true +} + +// Continue will unpause the migration. It expects an error as input which +// specifies the result of the NEG-endpoint detach operation. Depending on +// whether the detach operation passed or failed, the effect of unpause could be +// delayed: +// - If the NEG detach operation succeeded, a 1 minute timer will be started, +// which upon completion will unpause the migration and also trigger another +// sync. Continue will not keep the caller blocked for the completion of the +// timer. +// - If the NEG detach operation failed, the migration will be unpaused +// immediately before Continue returns. This would allow any resyncs to +// reattempt the migration. The migrator itself doesn't trigger any sync in +// this case. +func (d *DualStackMigrator) Continue(err error) { + if !d.enableDualStackNEG { + return + } +} diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 8e0567aa0c..79c7ccd6f9 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -113,9 +113,7 @@ type transactionSyncer struct { // podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints podLabelPropagationConfig labels.PodLabelPropagationConfig - // enableDualStackNEG indicates if IPv6 endpoints should be considered while - // reconciling NEGs. - enableDualStackNEG bool + dsMigrator *DualStackMigrator } func NewTransactionSyncer( @@ -163,7 +161,7 @@ func NewTransactionSyncer( logger: logger, enableDegradedMode: flags.F.EnableDegradedMode, podLabelPropagationConfig: lpConfig, - enableDualStackNEG: enableDualStackNEG, + dsMigrator: &DualStackMigrator{enableDualStackNEG: enableDualStackNEG}, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) @@ -294,6 +292,12 @@ func (s *transactionSyncer) syncInternalImpl() error { addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap) // Calculate Pods that are already in the NEG _, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap) + + // Filtering of migration endpoints should happen before we filter out + // the transaction endpoints. Not doing so could result in an attempt to + // attach an endpoint which is still undergoing detachment-due-to-migration. + s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints) + // Filter out the endpoints with existing transaction // This mostly happens when transaction entry require reconciliation but the transaction is still progress // e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N. @@ -435,11 +439,19 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m _ = s.attachNetworkEndpoints(zone, batch) }() } + if operation == detachOp { wg.Add(1) + + hasMigrationDetachments := false + if !s.dsMigrator.IsPaused() { + s.dsMigrator.Pause() + hasMigrationDetachments = true + } + go func() { defer wg.Done() - _ = s.detachNetworkEndpoints(zone, batch) + _ = s.detachNetworkEndpoints(zone, batch, hasMigrationDetachments) }() } } @@ -483,13 +495,25 @@ func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) { // attachNetworkEndpoints runs operation for attaching network endpoints. func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - return s.operationInternal(attachOp, zone, networkEndpointMap) + err := s.operationInternal(attachOp, zone, networkEndpointMap) + + // WARNING: commitTransaction must be called at last for analyzing the operation result + s.commitTransaction(err, networkEndpointMap) + return err } // detachNetworkEndpoints runs operation for detaching network endpoints. -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { +func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) error { s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - return s.operationInternal(detachOp, zone, networkEndpointMap) + err := s.operationInternal(detachOp, zone, networkEndpointMap) + + if hasMigrationDetachments { + s.dsMigrator.Continue(err) + } + + // WARNING: commitTransaction must be called at last for analyzing the operation result + s.commitTransaction(err, networkEndpointMap) + return err } // operationInternal executes NEG API call and commits the transactions @@ -522,8 +546,6 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri } } - // WARNING: commitTransaction must be called at last for analyzing the operation result - s.commitTransaction(err, networkEndpointMap) metrics.PublishNegOperationMetrics(operation.String(), string(s.NegSyncerKey.NegType), string(s.NegSyncerKey.GetAPIVersion()), err, len(networkEndpointMap), start) return err } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 3329de962d..fc120aa216 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1391,7 +1391,7 @@ func TestUnknownNodes(t *testing.T) { func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false) - ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG) + ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false) return negsyncer, ts } From 9e33221c77a2f96c64d7591e05c072565d9a5033 Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Mon, 3 Apr 2023 15:42:32 -0700 Subject: [PATCH 3/3] v2 Define the interface for the NEG dual stack migration handler. --- pkg/neg/syncers/dual_stack_migrator.go | 21 +++++++++------------ pkg/neg/syncers/transaction.go | 12 +++++------- pkg/neg/syncers/transaction_test.go | 4 +++- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pkg/neg/syncers/dual_stack_migrator.go b/pkg/neg/syncers/dual_stack_migrator.go index 42e25c1af7..1395b6bc56 100644 --- a/pkg/neg/syncers/dual_stack_migrator.go +++ b/pkg/neg/syncers/dual_stack_migrator.go @@ -46,14 +46,20 @@ type DualStackMigrator struct { // 1. Remove all migration-endpoints, irrespective of whether the migrator is // paused or not. // 2. If the migrator is not currently paused, it will also start the -// detachment of a subset of migration-endpoints. +// detachment of a subset of migration-endpoints from a single zone. +// +// The returned string represents the zone for which detachment was started. An +// empty return value signifies that detachment was not started (which is the +// case when there were no migration-endpoints to begin with, or the migrator +// was paused.) // // Refer the comment on [DualStackMigrator] for further details and // terminologies. -func (d *DualStackMigrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) { +func (d *DualStackMigrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string { if !d.enableDualStackNEG { - return + return "" } + return "" } // Pause will prevent any subsequent Filter() invocations from starting @@ -70,15 +76,6 @@ func (d *DualStackMigrator) Pause() { } } -// IsPaused returns whether the migrator is paused or not. -func (d *DualStackMigrator) IsPaused() bool { - if !d.enableDualStackNEG { - return true - } - // TODO(gauravkghildiyal): Returns true until implemented. - return true -} - // Continue will unpause the migration. It expects an error as input which // specifies the result of the NEG-endpoint detach operation. Depending on // whether the detach operation passed or failed, the effect of unpause could be diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 79c7ccd6f9..27a99c8f48 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -296,7 +296,7 @@ func (s *transactionSyncer) syncInternalImpl() error { // Filtering of migration endpoints should happen before we filter out // the transaction endpoints. Not doing so could result in an attempt to // attach an endpoint which is still undergoing detachment-due-to-migration. - s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints) + migrationZone := s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints) // Filter out the endpoints with existing transaction // This mostly happens when transaction entry require reconciliation but the transaction is still progress @@ -318,7 +318,7 @@ func (s *transactionSyncer) syncInternalImpl() error { s.logEndpoints(addEndpoints, "adding endpoint") s.logEndpoints(removeEndpoints, "removing endpoint") - return s.syncNetworkEndpoints(addEndpoints, removeEndpoints) + return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, migrationZone) } // syncLock must already be acquired before execution @@ -406,7 +406,7 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti } // syncNetworkEndpoints spins off go routines to execute NEG operations -func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { +func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, migrationZone string) error { var wg sync.WaitGroup syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { @@ -443,15 +443,13 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m if operation == detachOp { wg.Add(1) - hasMigrationDetachments := false - if !s.dsMigrator.IsPaused() { + if zone == migrationZone { s.dsMigrator.Pause() - hasMigrationDetachments = true } go func() { defer wg.Done() - _ = s.detachNetworkEndpoints(zone, batch, hasMigrationDetachments) + _ = s.detachNetworkEndpoints(zone, batch, zone == migrationZone) }() } } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index fc120aa216..984c133715 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -202,7 +202,9 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } for _, tc := range testCases { - err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints) + // TODO(gauravkghildiyal): Update tests when we have a finalized the API + // and concrete implementation of the [DualStackMigrator]. + err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, "") if err != nil { t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) }