diff --git a/pkg/neg/syncers/dual_stack_migrator.go b/pkg/neg/syncers/dual_stack_migrator.go new file mode 100644 index 0000000000..1395b6bc56 --- /dev/null +++ b/pkg/neg/syncers/dual_stack_migrator.go @@ -0,0 +1,95 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncers + +import "k8s.io/ingress-gce/pkg/neg/types" + +// DualStackMigrator exposes functions to control the migration of single-stack +// NEG endpoints to dual-stack NEG endpoints (and vice versa) +// +// # Single-stack vs Dual-stack +// +// - A NEG endpoint is said to be single-stack if it just has an IPv4 or IPv6 +// address (but not both.) +// +// - A NEG endpoint is said to be dual-stack if it has both IPv4 and IPv6 +// address. +// +// # Migration endpoint +// +// An endpoint is said to be a migration-endpoint if its current state is +// single-stack but desired state is dual-stack (and vice versa.) +// +// TODO(gauravkghildiyal): Add details about the heuristics as we go on +// implementing. +type DualStackMigrator struct { + // Setting this to false will make all exported functions a no-op. + enableDualStackNEG bool +} + +// Filter will modify the `addEndpoints` and `removeEndpoints` in TWO DISTINCT +// ways: +// 1. Remove all migration-endpoints, irrespective of whether the migrator is +// paused or not. +// 2. If the migrator is not currently paused, it will also start the +// detachment of a subset of migration-endpoints from a single zone. +// +// The returned string represents the zone for which detachment was started. An +// empty return value signifies that detachment was not started (which is the +// case when there were no migration-endpoints to begin with, or the migrator +// was paused.) +// +// Refer the comment on [DualStackMigrator] for further details and +// terminologies. +func (d *DualStackMigrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string { + if !d.enableDualStackNEG { + return "" + } + return "" +} + +// Pause will prevent any subsequent Filter() invocations from starting +// detachment of migration-endpoints. Pause should be invoked before starting +// any NEG-endpoint detach operations that include migration-endpoints. +// +// Invoking Pause on a migrator which is already paused will be a no-op. +// +// Pause is usually paired with a Continue() invocation once the NEG-endpoint +// detach operation completes. +func (d *DualStackMigrator) Pause() { + if !d.enableDualStackNEG { + return + } +} + +// Continue will unpause the migration. It expects an error as input which +// specifies the result of the NEG-endpoint detach operation. Depending on +// whether the detach operation passed or failed, the effect of unpause could be +// delayed: +// - If the NEG detach operation succeeded, a 1 minute timer will be started, +// which upon completion will unpause the migration and also trigger another +// sync. Continue will not keep the caller blocked for the completion of the +// timer. +// - If the NEG detach operation failed, the migration will be unpaused +// immediately before Continue returns. This would allow any resyncs to +// reattempt the migration. The migrator itself doesn't trigger any sync in +// this case. +func (d *DualStackMigrator) Continue(err error) { + if !d.enableDualStackNEG { + return + } +} diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 6a2ef955b8..27a99c8f48 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -113,9 +113,7 @@ type transactionSyncer struct { // podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints podLabelPropagationConfig labels.PodLabelPropagationConfig - // enableDualStackNEG indicates if IPv6 endpoints should be considered while - // reconciling NEGs. - enableDualStackNEG bool + dsMigrator *DualStackMigrator } func NewTransactionSyncer( @@ -163,7 +161,7 @@ func NewTransactionSyncer( logger: logger, enableDegradedMode: flags.F.EnableDegradedMode, podLabelPropagationConfig: lpConfig, - enableDualStackNEG: enableDualStackNEG, + dsMigrator: &DualStackMigrator{enableDualStackNEG: enableDualStackNEG}, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) @@ -294,6 +292,12 @@ func (s *transactionSyncer) syncInternalImpl() error { addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap) // Calculate Pods that are already in the NEG _, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap) + + // Filtering of migration endpoints should happen before we filter out + // the transaction endpoints. Not doing so could result in an attempt to + // attach an endpoint which is still undergoing detachment-due-to-migration. + migrationZone := s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints) + // Filter out the endpoints with existing transaction // This mostly happens when transaction entry require reconciliation but the transaction is still progress // e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N. @@ -314,7 +318,7 @@ func (s *transactionSyncer) syncInternalImpl() error { s.logEndpoints(addEndpoints, "adding endpoint") s.logEndpoints(removeEndpoints, "removing endpoint") - return s.syncNetworkEndpoints(addEndpoints, removeEndpoints) + return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, migrationZone) } // syncLock must already be acquired before execution @@ -402,11 +406,12 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti } // syncNetworkEndpoints spins off go routines to execute NEG operations -func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { +func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, migrationZone string) error { var wg sync.WaitGroup syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { for zone, endpointSet := range endpointMap { + zone := zone if endpointSet.Len() == 0 { s.logger.V(2).Info("0 endpoints in the endpoint list. Skipping operation", "operation", attachOp, "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) continue @@ -428,10 +433,24 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } if operation == attachOp { - s.attachNetworkEndpoints(zone, batch, &wg) + wg.Add(1) + go func() { + defer wg.Done() + _ = s.attachNetworkEndpoints(zone, batch) + }() } + if operation == detachOp { - s.detachNetworkEndpoints(zone, batch, &wg) + wg.Add(1) + + if zone == migrationZone { + s.dsMigrator.Pause() + } + + go func() { + defer wg.Done() + _ = s.detachNetworkEndpoints(zone, batch, zone == migrationZone) + }() } } return nil @@ -471,25 +490,34 @@ func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) { s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult) } -// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints -func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { +// attachNetworkEndpoints runs operation for attaching network endpoints. +func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - wg.Add(1) - go s.operationInternal(attachOp, zone, networkEndpointMap, wg) + err := s.operationInternal(attachOp, zone, networkEndpointMap) + + // WARNING: commitTransaction must be called at last for analyzing the operation result + s.commitTransaction(err, networkEndpointMap) + return err } -// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { +// detachNetworkEndpoints runs operation for detaching network endpoints. +func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) error { s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - wg.Add(1) - go s.operationInternal(detachOp, zone, networkEndpointMap, wg) + err := s.operationInternal(detachOp, zone, networkEndpointMap) + + if hasMigrationDetachments { + s.dsMigrator.Continue(err) + } + + // WARNING: commitTransaction must be called at last for analyzing the operation result + s.commitTransaction(err, networkEndpointMap) + return err } // operationInternal executes NEG API call and commits the transactions // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync -func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { - defer wg.Done() +func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error { var err error start := time.Now() networkEndpoints := []*composite.NetworkEndpoint{} @@ -516,9 +544,8 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri } } - // WARNING: commitTransaction must be called at last for analyzing the operation result - s.commitTransaction(err, networkEndpointMap) metrics.PublishNegOperationMetrics(operation.String(), string(s.NegSyncerKey.NegType), string(s.NegSyncerKey.GetAPIVersion()), err, len(networkEndpointMap), start) + return err } func (s *transactionSyncer) recordEvent(eventType, reason, eventDesc string) { diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 3329de962d..984c133715 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -202,7 +202,9 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } for _, tc := range testCases { - err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints) + // TODO(gauravkghildiyal): Update tests when we have a finalized the API + // and concrete implementation of the [DualStackMigrator]. + err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, "") if err != nil { t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) } @@ -1391,7 +1393,7 @@ func TestUnknownNodes(t *testing.T) { func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false) - ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG) + ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false) return negsyncer, ts }