Skip to content

Commit

Permalink
Filter out migrating endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Apr 11, 2023
1 parent 0e3f583 commit 8c62032
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,14 @@ func (s *transactionSyncer) syncInternalImpl() error {
addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap)
// Calculate Pods that are already in the NEG
_, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap)

if s.enableDualStackNEG {
// Identification of migration endpoints should happen before we filter out
// the transaction endpoints. Not doing so could result in an attempt to
// attach an endpoint which is still undergoing detachment-due-to-migration.
findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints)
}

// Filter out the endpoints with existing transaction
// This mostly happens when transaction entry require reconciliation but the transaction is still progress
// e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N.
Expand Down
77 changes: 77 additions & 0 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,83 @@ func calculateNetworkEndpointDifference(targetMap, currentMap map[string]negtype
return addSet, removeSet
}

// findAndFilterMigrationEndpoints will filter out the migration endpoints from
// the `addEndpoints` and `removeEndpoints` sets. The passed sets will get
// modified. The returned value will be two endpoints sets which will contain
// the values that were filtered out from the `addEndpoints` and
// `removeEndpoints` sets respectively.
//
// TODO(gauravkghildiyal): This function should be moved alongside
// DualStackMigrator once that gets merged.
func findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) (map[string]negtypes.NetworkEndpointSet, map[string]negtypes.NetworkEndpointSet) {
allEndpoints := make(map[string]negtypes.NetworkEndpointSet)
for zone, endpointSet := range addEndpoints {
allEndpoints[zone] = allEndpoints[zone].Union(endpointSet)
}
for zone, endpointSet := range removeEndpoints {
allEndpoints[zone] = allEndpoints[zone].Union(endpointSet)
}

migrationEndpointsInAddSet := make(map[string]negtypes.NetworkEndpointSet)
migrationEndpointsInRemoveSet := make(map[string]negtypes.NetworkEndpointSet)
for zone, endpointSet := range allEndpoints {
for endpoint := range endpointSet {
if endpoint.IP == "" || endpoint.IPv6 == "" {
// Endpoint is not dual-stack so continue.
continue
}

// At this point, we know that `endpoint` is a dual-stack endpoint. An
// endpoint can be identified as migrating if the IPs from the dual-stack
// endpoint exist as individual single-stack endpoint inside
// `addEndpoints` or `removeEndpoints`.

// Construct single-stack endpoint corresponding to the dual-stack
// endpoint. Their existence will determine if an endpoint is migrating.
ipv4Only := endpoint
ipv4Only.IPv6 = ""
ipv6Only := endpoint
ipv6Only.IP = ""

isMigrating := false
// Check if endpoint is migrating from dual-stack to single-stack
// migration.
isMigrating = isMigrating || moveEndpoint(ipv4Only, addEndpoints, migrationEndpointsInAddSet, zone)
isMigrating = isMigrating || moveEndpoint(ipv6Only, addEndpoints, migrationEndpointsInAddSet, zone)
// Check if endpoint is migrating from single-stack to dual-stack
// migration.
isMigrating = isMigrating || moveEndpoint(ipv4Only, removeEndpoints, migrationEndpointsInRemoveSet, zone)
isMigrating = isMigrating || moveEndpoint(ipv6Only, removeEndpoints, migrationEndpointsInRemoveSet, zone)

if isMigrating {
moveEndpoint(endpoint, addEndpoints, migrationEndpointsInAddSet, zone)
moveEndpoint(endpoint, removeEndpoints, migrationEndpointsInRemoveSet, zone)
}
}
}

return migrationEndpointsInAddSet, migrationEndpointsInRemoveSet
}

// moveEndpoint deletes endpoint `e` from `source[zone]` and adds it to
// `dest[zone]`. If the move was successful, `true` is returned. A return value
// of `false` denotes that nothing was moved and no input variable were
// modified.
func moveEndpoint(e negtypes.NetworkEndpoint, source, dest map[string]negtypes.NetworkEndpointSet, zone string) bool {
if source == nil || dest == nil {
return false
}
if source[zone].Has(e) {
source[zone].Delete(e)
if dest[zone] == nil {
dest[zone] = negtypes.NewNetworkEndpointSet()
}
dest[zone].Insert(e)
return true
}
return false
}

// getService retrieves service object from serviceLister based on the input Namespace and Name
func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Service {
if serviceLister == nil {
Expand Down
245 changes: 245 additions & 0 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,251 @@ func TestNetworkEndpointCalculateDifference(t *testing.T) {
}
}

func TestFindAndFilterMigrationEndpoints(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
addEndpoints map[string]negtypes.NetworkEndpointSet
removeEndpoints map[string]negtypes.NetworkEndpointSet
wantMigrationEndpointsInAddSet map[string]negtypes.NetworkEndpointSet
wantMigrationEndpointsInRemoveSet map[string]negtypes.NetworkEndpointSet
}{
{
name: "detect multiple migrating endpoints",
addEndpoints: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"}, // migrating
{IP: "b"},
{IP: "c", IPv6: "C"},
{IP: "d"}, // migrating
}...),
"zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "e", IPv6: "E"}, // migrating
}...),
},
removeEndpoints: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a"}, // migrating
{IP: "f", IPv6: "F"},
{IP: "d", IPv6: "D"}, // migrating
}...),
"zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IPv6: "E"}, // migrating
}...),
},
wantMigrationEndpointsInAddSet: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
{IP: "d"},
}...),
"zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "e", IPv6: "E"},
}...),
},
wantMigrationEndpointsInRemoveSet: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a"},
{IP: "d", IPv6: "D"},
}...),
"zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IPv6: "E"},
}...),
},
},
{
name: "partial IP change without stack change is not considered migrating",
addEndpoints: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
}...),
},
removeEndpoints: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", Port: "B"},
}...),
},
wantMigrationEndpointsInAddSet: map[string]negtypes.NetworkEndpointSet{},
wantMigrationEndpointsInRemoveSet: map[string]negtypes.NetworkEndpointSet{},
},
{
name: "difference in port or node is not considered migrating",
addEndpoints: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A", Port: "80"},
{IP: "b", Node: "node2"},
}...),
},
removeEndpoints: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", Port: "81"},
{IP: "b", IPv6: "B", Node: "node1"},
}...),
},
wantMigrationEndpointsInAddSet: map[string]negtypes.NetworkEndpointSet{},
wantMigrationEndpointsInRemoveSet: map[string]negtypes.NetworkEndpointSet{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotMigrationEndpointsInAddSet, gotMigrationEndpointsInRemoveSet := findAndFilterMigrationEndpoints(tc.addEndpoints, tc.removeEndpoints)

if diff := cmp.Diff(tc.wantMigrationEndpointsInAddSet, gotMigrationEndpointsInAddSet); diff != "" {
t.Errorf("findAndFilterMigrationEndpoints(tc.addEndpoints, tc.removeEndpoints) returned unexpected diff for migrationEndpointsInAddSet (-want +got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantMigrationEndpointsInRemoveSet, gotMigrationEndpointsInRemoveSet); diff != "" {
t.Errorf("findAndFilterMigrationEndpoints(tc.addEndpoints, tc.removeEndpoints) returned unexpected diff for migrationEndpointsInRemoveSet (-want +got):\n%s", diff)
}
})
}
}

func TestMoveEndpoint(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
endpoint negtypes.NetworkEndpoint
inputSource map[string]negtypes.NetworkEndpointSet
inputDest map[string]negtypes.NetworkEndpointSet
wantSource map[string]negtypes.NetworkEndpointSet
wantDest map[string]negtypes.NetworkEndpointSet
zone string
wantSuccess bool
}{
{
name: "normal",
endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"},
inputSource: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
{IP: "b", IPv6: "B"},
}...),
},
inputDest: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
wantSource: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "b", IPv6: "B"},
}...),
},
wantDest: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
{IP: "c", IPv6: "C"},
}...),
},
zone: "zone1",
wantSuccess: true,
},
{
name: "zone does not exist in source",
endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"},
inputSource: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
}...),
},
inputDest: map[string]negtypes.NetworkEndpointSet{
"zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
wantSource: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
}...),
},
wantDest: map[string]negtypes.NetworkEndpointSet{
"zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
zone: "zone3",
},
{
name: "zone does not exist in destination",
endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"},
inputSource: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
{IP: "b", IPv6: "B"},
}...),
},
inputDest: map[string]negtypes.NetworkEndpointSet{
"zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
wantSource: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "b", IPv6: "B"},
}...),
},
wantDest: map[string]negtypes.NetworkEndpointSet{
"zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "a", IPv6: "A"},
}...),
"zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
zone: "zone1",
wantSuccess: true,
},
{
name: "source is nil",
endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"},
inputDest: map[string]negtypes.NetworkEndpointSet{
"zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
wantDest: map[string]negtypes.NetworkEndpointSet{
"zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
zone: "zone3",
},
{
name: "destination is nil",
endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"},
inputSource: map[string]negtypes.NetworkEndpointSet{
"zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
wantSource: map[string]negtypes.NetworkEndpointSet{
"zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{
{IP: "c", IPv6: "C"},
}...),
},
zone: "zone3",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotSuccess := moveEndpoint(tc.endpoint, tc.inputSource, tc.inputDest, tc.zone)

if gotSuccess != tc.wantSuccess {
t.Errorf("moveEndpoint(%v, ...) = %v, want = %v", tc.endpoint, gotSuccess, tc.wantSuccess)
}
if diff := cmp.Diff(tc.wantSource, tc.inputSource); diff != "" {
t.Errorf("moveEndpoint(%v, ...) returned unexpected diff for source (-want +got):\n%s", tc.endpoint, diff)
}
if diff := cmp.Diff(tc.wantDest, tc.inputDest); diff != "" {
t.Errorf("moveEndpoint(%v, ...) returned unexpected diff for destination (-want +got):\n%s", tc.endpoint, diff)
}
})
}
}

func TestEnsureNetworkEndpointGroup(t *testing.T) {
var (
testZone = "test-zone"
Expand Down

0 comments on commit 8c62032

Please sign in to comment.