diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index c0a186f10e..7922cd9578 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -285,6 +285,14 @@ func (s *transactionSyncer) syncInternalImpl() error { addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap) // Calculate Pods that are already in the NEG _, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap) + + if s.enableDualStackNEG { + // Identification of migration endpoints should happen before we filter out + // the transaction endpoints. Not doing so could result in an attempt to + // attach an endpoint which is still undergoing detachment-due-to-migration. + findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints) + } + // Filter out the endpoints with existing transaction // This mostly happens when transaction entry require reconciliation but the transaction is still progress // e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N. diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 223cba2745..831c2622f3 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -99,6 +99,75 @@ func calculateNetworkEndpointDifference(targetMap, currentMap map[string]negtype return addSet, removeSet } +// findAndFilterMigrationEndpoints will filter out the migration endpoints from +// the `addEndpoints` and `removeEndpoints` sets. The passed sets will get +// modified. The returned value will be two endpoints sets which will contain +// the values that were filtered out from the `addEndpoints` and +// `removeEndpoints` sets respectively. +// +// TODO(gauravkghildiyal): This function should be moved alongside +// DualStackMigrator once that gets merged. +func findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) (map[string]negtypes.NetworkEndpointSet, map[string]negtypes.NetworkEndpointSet) { + allEndpoints := make(map[string]negtypes.NetworkEndpointSet) + for zone, endpointSet := range addEndpoints { + allEndpoints[zone] = allEndpoints[zone].Union(endpointSet) + } + for zone, endpointSet := range removeEndpoints { + allEndpoints[zone] = allEndpoints[zone].Union(endpointSet) + } + + migrationEndpointsInAddSet := make(map[string]negtypes.NetworkEndpointSet) + migrationEndpointsInRemoveSet := make(map[string]negtypes.NetworkEndpointSet) + for zone, endpointSet := range allEndpoints { + for endpoint := range endpointSet { + if endpoint.IP == "" || endpoint.IPv6 == "" { + // Endpoint is not dual-stack so continue. + continue + } + + // At this point, we know that `endpoint` is a dual-stack endpoint. An + // endpoint can be identified as migrating if the IPs from the dual-stack + // endpoint exist as individual single-stack endpoint inside + // `addEndpoints` or `removeEndpoints`. + + ipv4Only := endpoint + ipv4Only.IPv6 = "" + ipv6Only := endpoint + ipv6Only.IP = "" + + isMigrating := false + isMigrating = isMigrating || moveEndpoint(ipv4Only, addEndpoints, migrationEndpointsInAddSet, zone) + isMigrating = isMigrating || moveEndpoint(ipv6Only, addEndpoints, migrationEndpointsInAddSet, zone) + isMigrating = isMigrating || moveEndpoint(ipv4Only, removeEndpoints, migrationEndpointsInRemoveSet, zone) + isMigrating = isMigrating || moveEndpoint(ipv6Only, removeEndpoints, migrationEndpointsInRemoveSet, zone) + + if isMigrating { + moveEndpoint(endpoint, addEndpoints, migrationEndpointsInAddSet, zone) + moveEndpoint(endpoint, removeEndpoints, migrationEndpointsInRemoveSet, zone) + } + } + } + + return migrationEndpointsInAddSet, migrationEndpointsInRemoveSet +} + +// moveEndpoint deletes endpoint `e` from `source[zone]` and adds it to +// `dest[zone]`. If the move was successful, `true` is returned. +func moveEndpoint(e negtypes.NetworkEndpoint, source, dest map[string]negtypes.NetworkEndpointSet, zone string) bool { + if source == nil || dest == nil { + return false + } + if source[zone].Has(e) { + source[zone].Delete(e) + if dest[zone] == nil { + dest[zone] = negtypes.NewNetworkEndpointSet() + } + dest[zone].Insert(e) + return true + } + return false +} + // getService retrieves service object from serviceLister based on the input Namespace and Name func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Service { if serviceLister == nil { diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 8bab128aa8..22886c011e 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -300,6 +300,236 @@ func TestNetworkEndpointCalculateDifference(t *testing.T) { } } +func TestFindAndFilterMigrationEndpoints(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + addEndpoints map[string]negtypes.NetworkEndpointSet + removeEndpoints map[string]negtypes.NetworkEndpointSet + wantMigrationEndpointsInAddSet map[string]negtypes.NetworkEndpointSet + wantMigrationEndpointsInRemoveSet map[string]negtypes.NetworkEndpointSet + }{ + { + desc: "normal", + addEndpoints: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, // migrating + {IP: "b"}, + {IP: "c", IPv6: "C"}, + {IP: "d"}, // migrating + }...), + "zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "e", IPv6: "E"}, // migrating + }...), + }, + removeEndpoints: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a"}, // migrating + {IP: "e", IPv6: "E"}, + {IP: "d", IPv6: "D"}, // migrating + }...), + "zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IPv6: "E"}, // migrating + }...), + }, + wantMigrationEndpointsInAddSet: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + {IP: "d"}, + }...), + "zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "e", IPv6: "E"}, + }...), + }, + wantMigrationEndpointsInRemoveSet: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a"}, + {IP: "d", IPv6: "D"}, + }...), + "zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IPv6: "E"}, + }...), + }, + }, + { + desc: "partial IP change without stack change is not considered migrating", + addEndpoints: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + }...), + }, + removeEndpoints: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", Port: "B"}, + }...), + }, + wantMigrationEndpointsInAddSet: map[string]negtypes.NetworkEndpointSet{}, + wantMigrationEndpointsInRemoveSet: map[string]negtypes.NetworkEndpointSet{}, + }, + { + desc: "difference in port or node is not considered migrating", + addEndpoints: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A", Port: "80"}, + {IP: "b", Node: "node2"}, + }...), + }, + removeEndpoints: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", Port: "81"}, + {IP: "b", IPv6: "B", Node: "node1"}, + }...), + }, + wantMigrationEndpointsInAddSet: map[string]negtypes.NetworkEndpointSet{}, + wantMigrationEndpointsInRemoveSet: map[string]negtypes.NetworkEndpointSet{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gotMigrationEndpointsInAddSet, gotMigrationEndpointsInRemoveSet := findAndFilterMigrationEndpoints(tc.addEndpoints, tc.removeEndpoints) + + if diff := cmp.Diff(tc.wantMigrationEndpointsInAddSet, gotMigrationEndpointsInAddSet); diff != "" { + t.Errorf("findAndFilterMigrationEndpoints(tc.addEndpoints, tc.removeEndpoints) returned unexpected diff for migrationEndpointsInAddSet (-want +got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantMigrationEndpointsInRemoveSet, gotMigrationEndpointsInRemoveSet); diff != "" { + t.Errorf("findAndFilterMigrationEndpoints(tc.addEndpoints, tc.removeEndpoints) returned unexpected diff for migrationEndpointsInRemoveSet (-want +got):\n%s", diff) + } + }) + } +} + +func TestMoveEndpoint(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + endpoint negtypes.NetworkEndpoint + inputSource map[string]negtypes.NetworkEndpointSet + inputDest map[string]negtypes.NetworkEndpointSet + wantSource map[string]negtypes.NetworkEndpointSet + wantDest map[string]negtypes.NetworkEndpointSet + zone string + wantSuccess bool + }{ + { + desc: "normal", + endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"}, + inputSource: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + {IP: "b", IPv6: "B"}, + }...), + }, + inputDest: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + wantSource: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "b", IPv6: "B"}, + }...), + }, + wantDest: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + {IP: "c", IPv6: "C"}, + }...), + }, + zone: "zone1", + wantSuccess: true, + }, + { + desc: "zone does not exist in source", + endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"}, + inputSource: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + }...), + }, + inputDest: map[string]negtypes.NetworkEndpointSet{ + "zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + wantSource: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + }...), + }, + wantDest: map[string]negtypes.NetworkEndpointSet{ + "zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + zone: "zone3", + }, + { + desc: "zone does not exist in destination", + endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"}, + inputSource: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + {IP: "b", IPv6: "B"}, + }...), + }, + inputDest: map[string]negtypes.NetworkEndpointSet{ + "zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + wantSource: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "b", IPv6: "B"}, + }...), + }, + wantDest: map[string]negtypes.NetworkEndpointSet{ + "zone1": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, + }...), + "zone2": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + zone: "zone1", + wantSuccess: true, + }, + { + desc: "source is nil", + endpoint: negtypes.NetworkEndpoint{IP: "a", IPv6: "A"}, + inputDest: map[string]negtypes.NetworkEndpointSet{ + "zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + wantDest: map[string]negtypes.NetworkEndpointSet{ + "zone3": negtypes.NewNetworkEndpointSet([]negtypes.NetworkEndpoint{ + {IP: "c", IPv6: "C"}, + }...), + }, + zone: "zone3", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gotSuccess := moveEndpoint(tc.endpoint, tc.inputSource, tc.inputDest, tc.zone) + + if gotSuccess != tc.wantSuccess { + t.Errorf("moveEndpoint(%v, ...) = %v, want = %v", tc.endpoint, gotSuccess, tc.wantSuccess) + } + if diff := cmp.Diff(tc.wantSource, tc.inputSource); diff != "" { + t.Errorf("moveEndpoint(%v, ...) returned unexpected diff for source (-want +got):\n%s", tc.endpoint, diff) + } + if diff := cmp.Diff(tc.wantDest, tc.inputDest); diff != "" { + t.Errorf("moveEndpoint(%v, ...) returned unexpected diff for destination (-want +got):\n%s", tc.endpoint, diff) + } + }) + } +} + func TestEnsureNetworkEndpointGroup(t *testing.T) { var ( testZone = "test-zone"