Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] PR for early feedback on the interface for neg dual-stack-migrator. #2065

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/neg/syncers/dual_stack_migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers

import "k8s.io/ingress-gce/pkg/neg/types"

// DualStackMigrator exposes functions to control the migration of single-stack
// NEG endpoints to dual-stack NEG endpoints (and vice versa)
//
// # Single-stack vs Dual-stack
//
// - A NEG endpoint is said to be single-stack if it just has an IPv4 or IPv6
// address (but not both.)
//
// - A NEG endpoint is said to be dual-stack if it has both IPv4 and IPv6
// address.
//
// # Migration endpoint
//
// An endpoint is said to be a migration-endpoint if its current state is
// single-stack but desired state is dual-stack (and vice versa.)
//
// TODO(gauravkghildiyal): Add details about the heuristics as we go on
// implementing.
type DualStackMigrator struct {
// Setting this to false will make all exported functions a no-op.
enableDualStackNEG bool
}

// Filter will modify the `addEndpoints` and `removeEndpoints` in TWO DISTINCT
// ways:
// 1. Remove all migration-endpoints, irrespective of whether the migrator is
// paused or not.
// 2. If the migrator is not currently paused, it will also start the
// detachment of a subset of migration-endpoints from a single zone.
//
// The returned string represents the zone for which detachment was started. An
// empty return value signifies that detachment was not started (which is the
// case when there were no migration-endpoints to begin with, or the migrator
// was paused.)
//
// Refer the comment on [DualStackMigrator] for further details and
// terminologies.
func (d *DualStackMigrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string {
if !d.enableDualStackNEG {
return ""
}
return ""
}

// Pause will prevent any subsequent Filter() invocations from starting
// detachment of migration-endpoints. Pause should be invoked before starting
// any NEG-endpoint detach operations that include migration-endpoints.
//
// Invoking Pause on a migrator which is already paused will be a no-op.
//
// Pause is usually paired with a Continue() invocation once the NEG-endpoint
// detach operation completes.
func (d *DualStackMigrator) Pause() {
if !d.enableDualStackNEG {
return
}
}

// Continue will unpause the migration. It expects an error as input which
// specifies the result of the NEG-endpoint detach operation. Depending on
// whether the detach operation passed or failed, the effect of unpause could be
// delayed:
// - If the NEG detach operation succeeded, a 1 minute timer will be started,
// which upon completion will unpause the migration and also trigger another
// sync. Continue will not keep the caller blocked for the completion of the
// timer.
// - If the NEG detach operation failed, the migration will be unpaused
// immediately before Continue returns. This would allow any resyncs to
// reattempt the migration. The migrator itself doesn't trigger any sync in
// this case.
func (d *DualStackMigrator) Continue(err error) {
if !d.enableDualStackNEG {
return
}
}
67 changes: 47 additions & 20 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ type transactionSyncer struct {
// podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints
podLabelPropagationConfig labels.PodLabelPropagationConfig

// enableDualStackNEG indicates if IPv6 endpoints should be considered while
// reconciling NEGs.
enableDualStackNEG bool
dsMigrator *DualStackMigrator
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -163,7 +161,7 @@ func NewTransactionSyncer(
logger: logger,
enableDegradedMode: flags.F.EnableDegradedMode,
podLabelPropagationConfig: lpConfig,
enableDualStackNEG: enableDualStackNEG,
dsMigrator: &DualStackMigrator{enableDualStackNEG: enableDualStackNEG},
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger)
Expand Down Expand Up @@ -294,6 +292,12 @@ func (s *transactionSyncer) syncInternalImpl() error {
addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap)
// Calculate Pods that are already in the NEG
_, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap)

// Filtering of migration endpoints should happen before we filter out
// the transaction endpoints. Not doing so could result in an attempt to
// attach an endpoint which is still undergoing detachment-due-to-migration.
migrationZone := s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints)

// Filter out the endpoints with existing transaction
// This mostly happens when transaction entry require reconciliation but the transaction is still progress
// e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N.
Expand All @@ -314,7 +318,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")

return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, migrationZone)
}

// syncLock must already be acquired before execution
Expand Down Expand Up @@ -402,11 +406,12 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, migrationZone string) error {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
for zone, endpointSet := range endpointMap {
zone := zone
if endpointSet.Len() == 0 {
s.logger.V(2).Info("0 endpoints in the endpoint list. Skipping operation", "operation", attachOp, "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
continue
Expand All @@ -428,10 +433,24 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if operation == attachOp {
s.attachNetworkEndpoints(zone, batch, &wg)
wg.Add(1)
go func() {
defer wg.Done()
_ = s.attachNetworkEndpoints(zone, batch)
}()
}

if operation == detachOp {
s.detachNetworkEndpoints(zone, batch, &wg)
wg.Add(1)

if zone == migrationZone {
s.dsMigrator.Pause()
}

go func() {
defer wg.Done()
_ = s.detachNetworkEndpoints(zone, batch, zone == migrationZone)
}()
}
}
return nil
Expand Down Expand Up @@ -471,25 +490,34 @@ func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
// attachNetworkEndpoints runs operation for attaching network endpoints.
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap, wg)
err := s.operationInternal(attachOp, zone, networkEndpointMap)

// WARNING: commitTransaction must be called at last for analyzing the operation result
s.commitTransaction(err, networkEndpointMap)
return err
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
// detachNetworkEndpoints runs operation for detaching network endpoints.
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) error {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap, wg)
err := s.operationInternal(detachOp, zone, networkEndpointMap)

if hasMigrationDetachments {
s.dsMigrator.Continue(err)
}

// WARNING: commitTransaction must be called at last for analyzing the operation result
s.commitTransaction(err, networkEndpointMap)
return err
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
defer wg.Done()
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) error {
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand All @@ -516,9 +544,8 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
}
}

// WARNING: commitTransaction must be called at last for analyzing the operation result
s.commitTransaction(err, networkEndpointMap)
metrics.PublishNegOperationMetrics(operation.String(), string(s.NegSyncerKey.NegType), string(s.NegSyncerKey.GetAPIVersion()), err, len(networkEndpointMap), start)
return err
}

func (s *transactionSyncer) recordEvent(eventType, reason, eventDesc string) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) {
}

for _, tc := range testCases {
err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints)
// TODO(gauravkghildiyal): Update tests when we have a finalized the API
// and concrete implementation of the [DualStackMigrator].
err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, "")
if err != nil {
t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err)
}
Expand Down Expand Up @@ -1391,7 +1393,7 @@ func TestUnknownNodes(t *testing.T) {

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false)
return negsyncer, ts
}

Expand Down