Skip to content

Commit

Permalink
Define the interface for NEG dual stack migration handler
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Apr 17, 2023
1 parent 6216d71 commit bca6ee4
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 21 deletions.
120 changes: 120 additions & 0 deletions pkg/neg/syncers/dualstack/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dualstack

import (
"time"

"k8s.io/ingress-gce/pkg/neg/types"
)

const (
defaultMigrationWaitDuration = 1 * time.Minute
)

// Migrator exposes functions to control the migration of single-stack
// NEG endpoints to dual-stack NEG endpoints (and vice versa)
//
// # Single-stack vs Dual-stack
//
// - A NEG endpoint is said to be single-stack if it just has an IPv4 or IPv6
// address (but not both.)
//
// - A NEG endpoint is said to be dual-stack if it has both IPv4 and IPv6
// address.
//
// # Migration endpoint
//
// An endpoint is said to be a migration-endpoint if its current state is
// single-stack but desired state is dual-stack (and vice versa.)
//
// TODO(gauravkghildiyal): Add details about the heuristics as we go on
// implementing.
type Migrator struct {
// Setting this to false will make all exported functions a no-op.
enableDualStack bool
}

func NewMigrator(enableDualStack bool) *Migrator {
return &Migrator{enableDualStack: enableDualStack}
}

// Filter will modify the `addEndpoints` and `removeEndpoints` in TWO DISTINCT
// ways:
// 1. Remove all migration-endpoints, irrespective of whether the migrator is
// paused or not.
// 2. If the migrator is not currently paused, it will also start the
// detachment of a subset of migration-endpoints from a single zone.
//
// The returned string represents the zone for which detachment was started. An
// empty return value signifies that detachment was not started (which is the
// case when there were no migration-endpoints to begin with, or the migrator
// was paused.)
//
// Refer the comment on [Migrator] for further details and
// terminologies.
func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string {
if !d.enableDualStack {
return ""
}

_, migrationEndpointsInRemoveSet := findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints)

// TODO(gauravkghildiyal): Implement rate limited migration-detachment.
for zone, endpointSet := range migrationEndpointsInRemoveSet {
if endpointSet.Len() != 0 {
removeEndpoints[zone] = removeEndpoints[zone].Union(endpointSet)
return zone
}
}

return ""
}

// Pause will prevent any subsequent Filter() invocations from starting
// detachment of migration-endpoints. Pause should be invoked before starting
// any NEG-endpoint detach operations that include migration-endpoints.
//
// Invoking Pause on a migrator which is already paused will be a no-op.
//
// Pause is usually paired with a Continue() invocation once the NEG-endpoint
// detach operation completes.
func (d *Migrator) Pause() {
if !d.enableDualStack {
return
}
}

// Continue will unpause the migration. It expects an error as input which
// specifies the result of the NEG-endpoint detach operation. Depending on
// whether the detach operation passed or failed, the effect of unpause could be
// delayed:
// - If the NEG detach operation failed, the migration will be unpaused
// immediately before Continue returns. This would allow any resyncs to
// reattempt the migration. The migrator itself doesn't trigger any sync in
// this case.
// - If the NEG detach operation succeeded, a migrationWaitDuration timer will
// be started, which upon completion will unpause the migration and also
// trigger another sync. Continue will not keep the caller blocked for the
// completion of the timer. If Continue is invoked multiple times, only the
// first continue will trigger a resync.
func (d *Migrator) Continue(err error) {
if !d.enableDualStack {
return
}
}

86 changes: 86 additions & 0 deletions pkg/neg/syncers/dualstack/migrator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package dualstack

import (
"testing"

"github.com/google/go-cmp/cmp"
"k8s.io/ingress-gce/pkg/neg/types"
)

func TestFilter(t *testing.T) {
testCases := []struct {
desc string
migrator *Migrator
addEndpoints map[string]types.NetworkEndpointSet
removeEndpoints map[string]types.NetworkEndpointSet
committedEndpoints map[string]types.NetworkEndpointSet
wantAddEndpoints map[string]types.NetworkEndpointSet
wantRemoveEndpoints map[string]types.NetworkEndpointSet
wantMigrationZone bool
}{
{
desc: "migrator should do nothing if enableDualStack is false",
migrator: &Migrator{enableDualStack: false},
addEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "a", IPv6: "A"}, // migrating
{IP: "b"},
}...),
},
removeEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "a"}, // migrating
{IP: "c", IPv6: "C"},
}...),
},
committedEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IPv6: "D"},
}...),
},
wantAddEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "a", IPv6: "A"}, // migrating
{IP: "b"},
}...),
},
wantRemoveEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "a"}, // migrating
{IP: "c", IPv6: "C"},
}...),
},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
gotAddEndpoints := cloneZoneNetworkEndpointsMap(tc.addEndpoints)
gotRemoveEndpoints := cloneZoneNetworkEndpointsMap(tc.removeEndpoints)
gotCommittedEndpoints := cloneZoneNetworkEndpointsMap(tc.committedEndpoints)
gotMigrationZone := tc.migrator.Filter(gotAddEndpoints, gotRemoveEndpoints, gotCommittedEndpoints)

if tc.wantMigrationZone && gotMigrationZone == "" {
t.Errorf("Filter() returned empty migrationZone; want non empty migrationZone")
}

if diff := cmp.Diff(tc.wantAddEndpoints, gotAddEndpoints); diff != "" {
t.Errorf("Filter() returned unexpected diff in addEndpoints (-want +got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantRemoveEndpoints, gotRemoveEndpoints); diff != "" {
t.Errorf("Filter() returned unexpected diff in removeEndpoints (-want +got):\n%s", diff)
}
if diff := cmp.Diff(tc.committedEndpoints, gotCommittedEndpoints); diff != "" {
t.Errorf("Filter() returned unexpected diff in committedEndpoints; want no diff; (-want +got):\n%s", diff)
}
})
}
}

func cloneZoneNetworkEndpointsMap(m map[string]types.NetworkEndpointSet) map[string]types.NetworkEndpointSet {
clone := make(map[string]types.NetworkEndpointSet)
for zone, endpointSet := range m {
clone[zone] = types.NewNetworkEndpointSet(endpointSet.List()...)
}
return clone
}
39 changes: 21 additions & 18 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
"k8s.io/ingress-gce/pkg/neg/syncers/dualstack"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
Expand Down Expand Up @@ -113,9 +114,7 @@ type transactionSyncer struct {
// podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints
podLabelPropagationConfig labels.PodLabelPropagationConfig

// enableDualStackNEG indicates if IPv6 endpoints should be considered while
// reconciling NEGs.
enableDualStackNEG bool
dsMigrator *dualstack.Migrator
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -163,7 +162,7 @@ func NewTransactionSyncer(
logger: logger,
enableDegradedMode: flags.F.EnableDegradedMode,
podLabelPropagationConfig: lpConfig,
enableDualStackNEG: enableDualStackNEG,
dsMigrator: dualstack.NewMigrator(enableDualStackNEG),
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger)
Expand Down Expand Up @@ -287,17 +286,10 @@ func (s *transactionSyncer) syncInternalImpl() error {
// Calculate Pods that are already in the NEG
_, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap)

if s.enableDualStackNEG {
// Identification of migration endpoints should happen before we filter out
// the transaction endpoints. Not doing so could result in an attempt to
// attach an endpoint which is still undergoing detachment-due-to-migration.
_, migratingEndpointsInRemoveSet := findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints)

// TODO(gauravkghildiyal): Implement rate limited migration-detachment.
for zone, endpointSet := range migratingEndpointsInRemoveSet {
removeEndpoints[zone] = removeEndpoints[zone].Union(endpointSet)
}
}
// Filtering of migration endpoints should happen before we filter out
// the transaction endpoints. Not doing so could result in an attempt to
// attach an endpoint which is still undergoing detachment-due-to-migration.
migrationZone := s.dsMigrator.Filter(addEndpoints, removeEndpoints, committedEndpoints)

// Filter out the endpoints with existing transaction
// This mostly happens when transaction entry require reconciliation but the transaction is still progress
Expand Down Expand Up @@ -325,7 +317,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")

return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, endpointPodLabelMap)
return s.syncNetworkEndpoints(addEndpoints, removeEndpoints, endpointPodLabelMap, migrationZone)
}

func (s *transactionSyncer) getEndpointsCalculation(
Expand Down Expand Up @@ -435,7 +427,7 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap) error {
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap, migrationZone string) error {
syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
for zone, endpointSet := range endpointMap {
zone := zone
Expand Down Expand Up @@ -463,7 +455,12 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
go s.attachNetworkEndpoints(zone, batch)
}
if operation == detachOp {
go s.detachNetworkEndpoints(zone, batch)
if zone == migrationZone {
// Prevent any further migration-detachments from starting while one
// is already in progress.
s.dsMigrator.Pause()
}
go s.detachNetworkEndpoints(zone, batch, zone == migrationZone)
}
}
return nil
Expand Down Expand Up @@ -493,6 +490,12 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
err := s.operationInternal(detachOp, zone, networkEndpointMap)

if hasMigrationDetachments {
// Unpause the migration since the ongoing migration-detachments have
// concluded.
s.dsMigrator.Continue(err)
}

// WARNING: commitTransaction must be called at last for analyzing the operation result
s.commitTransaction(err, networkEndpointMap)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) {
}

for _, tc := range testCases {
err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, labels.EndpointPodLabelMap{})
// TODO(gauravkghildiyal): When the DualStack Migrator is fully
// implemented, check if we need to cover scenarios where `migrationZone`
// is not empty.
err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, labels.EndpointPodLabelMap{}, "")
if err != nil {
t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err)
}
Expand Down Expand Up @@ -380,7 +383,7 @@ func TestSyncNetworkEndpointLabel(t *testing.T) {
if err := transactionSyncer.ensureNetworkEndpointGroups(); err != nil {
t.Errorf("Expect error == nil, but got %v", err)
}
err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, map[string]negtypes.NetworkEndpointSet{}, tc.endpointPodLabelMap)
err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, map[string]negtypes.NetworkEndpointSet{}, tc.endpointPodLabelMap, "")
if err != nil {
t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err)
}
Expand Down Expand Up @@ -1912,7 +1915,7 @@ func TestGetEndpointPodLabelMap(t *testing.T) {

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false)
return negsyncer, ts
}

Expand Down

0 comments on commit bca6ee4

Please sign in to comment.