diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 0e6634fa0c..86955d9c7e 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -198,7 +198,6 @@ func main() { EnableASMConfigMap: flags.F.EnableASMConfigMapBasedConfig, ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace, ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName, - EndpointSlicesEnabled: flags.F.EnableEndpointSlices, MaxIGSize: flags.F.MaxIGSize, EnableL4ILBDualStack: flags.F.EnableL4ILBDualStack, EnableL4NetLBDualStack: flags.F.EnableL4NetLBDualStack, @@ -323,7 +322,6 @@ func runControllers(ctx *ingctx.ControllerContext) { ctx.ServiceInformer, ctx.PodInformer, ctx.NodeInformer, - ctx.EndpointInformer, ctx.EndpointSliceInformer, ctx.SvcNegInformer, ctx.HasSynced, @@ -341,7 +339,6 @@ func runControllers(ctx *ingctx.ControllerContext) { flags.F.EnableNonGCPMode, enableAsm, asmServiceNEGSkipNamespaces, - flags.F.EnableEndpointSlices, klog.TODO(), // TODO(#1761): Replace this with a top level logger configuration once one is available. ) diff --git a/pkg/context/context.go b/pkg/context/context.go index 225edce1af..6a7e659bfb 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -85,9 +85,7 @@ type ControllerContext struct { FrontendConfigInformer cache.SharedIndexInformer PodInformer cache.SharedIndexInformer NodeInformer cache.SharedIndexInformer - EndpointInformer cache.SharedIndexInformer EndpointSliceInformer cache.SharedIndexInformer - UseEndpointSlices bool ConfigMapInformer cache.SharedIndexInformer SvcNegInformer cache.SharedIndexInformer IngClassInformer cache.SharedIndexInformer @@ -124,7 +122,6 @@ type ControllerContextConfig struct { EnableASMConfigMap bool ASMConfigMapNamespace string ASMConfigMapName string - EndpointSlicesEnabled bool MaxIGSize int EnableL4ILBDualStack bool EnableL4NetLBDualStack bool @@ -180,25 +177,18 @@ func NewControllerContext( context.RegionalCluster = true } - context.UseEndpointSlices = config.EndpointSlicesEnabled - // Do not trigger periodic resync on Endpoints or EndpointSlices object. + // Do not trigger periodic resync on EndpointSlices object. // This aims improve NEG controller performance by avoiding unnecessary NEG sync that triggers for each NEG syncer. // As periodic resync may temporary starve NEG API ratelimit quota. - if config.EndpointSlicesEnabled { - context.EndpointSliceInformer = discoveryinformer.NewEndpointSliceInformer(kubeClient, config.Namespace, 0, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc}) - } else { - context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, config.Namespace, 0, utils.NewNamespaceIndexer()) - } + context.EndpointSliceInformer = discoveryinformer.NewEndpointSliceInformer(kubeClient, config.Namespace, 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc}) context.Translator = translator.NewTranslator( context.ServiceInformer, context.BackendConfigInformer, context.NodeInformer, context.PodInformer, - context.EndpointInformer, context.EndpointSliceInformer, - context.UseEndpointSlices, context.KubeClient, ) context.InstancePool = instancegroups.NewManager(&instancegroups.ManagerConfig{ @@ -249,14 +239,7 @@ func (ctx *ControllerContext) HasSynced() bool { ctx.PodInformer.HasSynced, ctx.NodeInformer.HasSynced, ctx.SvcNegInformer.HasSynced, - } - - if ctx.EndpointInformer != nil { - funcs = append(funcs, ctx.EndpointInformer.HasSynced) - } - - if ctx.EndpointSliceInformer != nil { - funcs = append(funcs, ctx.EndpointSliceInformer.HasSynced) + ctx.EndpointSliceInformer.HasSynced, } if ctx.FrontendConfigInformer != nil { @@ -336,12 +319,8 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) { go ctx.ServiceInformer.Run(stopCh) go ctx.PodInformer.Run(stopCh) go ctx.NodeInformer.Run(stopCh) - if ctx.EndpointInformer != nil { - go ctx.EndpointInformer.Run(stopCh) - } - if ctx.EndpointSliceInformer != nil { - go ctx.EndpointSliceInformer.Run(stopCh) - } + go ctx.EndpointSliceInformer.Run(stopCh) + if ctx.BackendConfigInformer != nil { go ctx.BackendConfigInformer.Run(stopCh) } diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index ba8e60e930..d654f4ea11 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -63,18 +63,14 @@ func NewTranslator(serviceInformer cache.SharedIndexInformer, backendConfigInformer cache.SharedIndexInformer, nodeInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, - endpointInformer cache.SharedIndexInformer, endpointSliceInformer cache.SharedIndexInformer, - useEndpointSlices bool, kubeClient kubernetes.Interface) *Translator { return &Translator{ serviceInformer, backendConfigInformer, nodeInformer, podInformer, - endpointInformer, endpointSliceInformer, - useEndpointSlices, kubeClient, } } @@ -85,9 +81,7 @@ type Translator struct { BackendConfigInformer cache.SharedIndexInformer NodeInformer cache.SharedIndexInformer PodInformer cache.SharedIndexInformer - EndpointInformer cache.SharedIndexInformer EndpointSliceInformer cache.SharedIndexInformer - UseEndpointSlices bool KubeClient kubernetes.Interface } @@ -492,11 +486,7 @@ func (t *Translator) GatherEndpointPorts(svcPorts []utils.ServicePort) []string if p.TargetPort.Type == intstr.Int { endpointPorts = []int{p.TargetPort.IntValue()} } else { - if t.UseEndpointSlices { - endpointPorts = listEndpointTargetPortsFromEndpointSlices(t.EndpointSliceInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.PortName) - } else { - endpointPorts = listEndpointTargetPortsFromEndpoints(t.EndpointInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.PortName) - } + endpointPorts = listEndpointTargetPortsFromEndpointSlices(t.EndpointSliceInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.PortName) } for _, ep := range endpointPorts { portMap[int64(ep)] = true @@ -609,37 +599,6 @@ func listEndpointTargetPortsFromEndpointSlices(indexer cache.Indexer, namespace, return ret } -// finds the actual target port behind named target port, the name of the target port is the same as service port name -func listEndpointTargetPortsFromEndpoints(indexer cache.Indexer, namespace, name, svcPortName string) []int { - ep, exists, err := indexer.Get( - &api_v1.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - }, - ) - - if !exists { - klog.Errorf("Endpoint object %v/%v does not exist.", namespace, name) - return []int{} - } - if err != nil { - klog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err) - return []int{} - } - - ret := []int{} - for _, subset := range ep.(*api_v1.Endpoints).Subsets { - for _, port := range subset.Ports { - if port.Protocol == api_v1.ProtocolTCP && port.Name == svcPortName { - ret = append(ret, int(port.Port)) - } - } - } - return ret -} - // orderedPods sorts a list of Pods by creation timestamp, using their names as a tie breaker. type orderedPods []*api_v1.Pod diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index 8bed73e1f2..077e03d79c 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -63,10 +63,10 @@ var ( ) func fakeTranslator() *Translator { - return configuredFakeTranslator(false) + return configuredFakeTranslator() } -func configuredFakeTranslator(useEndpointSlices bool) *Translator { +func configuredFakeTranslator() *Translator { client := fake.NewSimpleClientset() backendConfigClient := backendconfigclient.NewSimpleClientset() namespace := apiv1.NamespaceAll @@ -76,22 +76,14 @@ func configuredFakeTranslator(useEndpointSlices bool) *Translator { BackendConfigInformer := informerbackendconfig.NewBackendConfigInformer(backendConfigClient, namespace, resyncPeriod, utils.NewNamespaceIndexer()) PodInformer := informerv1.NewPodInformer(client, namespace, resyncPeriod, utils.NewNamespaceIndexer()) NodeInformer := informerv1.NewNodeInformer(client, resyncPeriod, utils.NewNamespaceIndexer()) - var EndpointSliceInformer cache.SharedIndexInformer - var EndpointInformer cache.SharedIndexInformer - if useEndpointSlices { - EndpointSliceInformer = discoveryinformer.NewEndpointSliceInformer(client, namespace, 0, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc}) - } else { - EndpointInformer = informerv1.NewEndpointsInformer(client, namespace, 0, utils.NewNamespaceIndexer()) - } + EndpointSliceInformer := discoveryinformer.NewEndpointSliceInformer(client, namespace, 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc}) return NewTranslator( ServiceInformer, BackendConfigInformer, NodeInformer, PodInformer, - EndpointInformer, EndpointSliceInformer, - useEndpointSlices, client, ) } @@ -933,25 +925,16 @@ func TestGatherEndpointPorts(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - for _, useSlices := range []bool{false, true} { - translator := configuredFakeTranslator(useSlices) - // Add endpoints or endpoint slices to informers. - if useSlices { - endpointSliceLister := translator.EndpointSliceInformer.GetIndexer() - for _, slice := range tc.endpointSlices { - endpointSliceLister.Add(slice) - } - } else { - endpointLister := translator.EndpointInformer.GetIndexer() - for _, ep := range tc.endpoints { - endpointLister.Add(ep) - } - } + translator := configuredFakeTranslator() + // Add endpoints or endpoint slices to informers. + endpointSliceLister := translator.EndpointSliceInformer.GetIndexer() + for _, slice := range tc.endpointSlices { + endpointSliceLister.Add(slice) + } - gotPorts := translator.GatherEndpointPorts(tc.svcPorts) - if !sets.NewString(gotPorts...).Equal(sets.NewString(tc.expectedPorts...)) { - t.Errorf("GatherEndpointPorts() = %v, expected %v (using slices: %v)", gotPorts, tc.expectedPorts, useSlices) - } + gotPorts := translator.GatherEndpointPorts(tc.svcPorts) + if !sets.NewString(gotPorts...).Equal(sets.NewString(tc.expectedPorts...)) { + t.Errorf("GatherEndpointPorts() = %v, expected %v", gotPorts, tc.expectedPorts) } }) } diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 9bc2d0ae42..78b6865c28 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -253,7 +253,6 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 flag.StringVar(&F.GKEClusterHash, "gke-cluster-hash", "", "The cluster hash of the GKE cluster this Ingress Controller will be interacting with") flag.StringVar(&F.GKEClusterType, "gke-cluster-type", "ZONAL", "The cluster type of the GKE cluster this Ingress Controller will be interacting with") flag.BoolVar(&F.EnableTrafficScaling, "enable-traffic-scaling", false, "Enable support for Service {max-rate-per-endpoint, capacity-scaler}") - flag.BoolVar(&F.EnableEndpointSlices, "enable-endpoint-slices", false, "Enable using Endpoint Slices API instead of Endpoints API") flag.BoolVar(&F.EnablePinhole, "enable-pinhole", false, "Enable Pinhole firewall feature") flag.BoolVar(&F.EnableL4ILBDualStack, "enable-l4ilb-dual-stack", false, "Enable Dual-Stack handling for L4 Internal Load Balancers") flag.BoolVar(&F.EnableL4NetLBDualStack, "enable-l4netlb-dual-stack", false, "Enable Dual-Stack handling for L4 External Load Balancers") diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index c1666c4c6c..c66c297c9d 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -111,7 +111,6 @@ func NewController( serviceInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, nodeInformer cache.SharedIndexInformer, - endpointInformer cache.SharedIndexInformer, endpointSliceInformer cache.SharedIndexInformer, svcNegInformer cache.SharedIndexInformer, hasSynced func() bool, @@ -129,7 +128,6 @@ func NewController( enableNonGcpMode bool, enableAsm bool, asmServiceNEGSkipNamespaces []string, - enableEndpointSlices bool, logger klog.Logger, ) *Controller { logger = logger.WithName("NEGController") @@ -153,13 +151,6 @@ func NewController( recorder := eventBroadcaster.NewRecorder(negScheme, apiv1.EventSource{Component: "neg-controller"}) - var endpointIndexer, endpointSliceIndexer cache.Indexer - if enableEndpointSlices { - endpointSliceIndexer = endpointSliceInformer.GetIndexer() - } else { - endpointIndexer = endpointInformer.GetIndexer() - } - syncerMetrics := metrics.NewNegMetricsCollector(flags.F.NegMetricsExportInterval, logger) manager := newSyncerManager( namer, @@ -170,13 +161,11 @@ func NewController( kubeSystemUID, podInformer.GetIndexer(), serviceInformer.GetIndexer(), - endpointIndexer, - endpointSliceIndexer, + endpointSliceInformer.GetIndexer(), nodeInformer.GetIndexer(), svcNegInformer.GetIndexer(), syncerMetrics, enableNonGcpMode, - enableEndpointSlices, logger) var reflector readiness.Reflector @@ -269,24 +258,13 @@ func NewController( negController.enqueueService(cur) }, }) - if enableEndpointSlices { - endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: negController.enqueueEndpointSlice, - DeleteFunc: negController.enqueueEndpointSlice, - UpdateFunc: func(old, cur interface{}) { - negController.enqueueEndpointSlice(cur) - }, - }) - } else { - endpointInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: negController.enqueueEndpoint, - DeleteFunc: negController.enqueueEndpoint, - UpdateFunc: func(old, cur interface{}) { - negController.enqueueEndpoint(cur) - }, - }) - } - + endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: negController.enqueueEndpointSlice, + DeleteFunc: negController.enqueueEndpointSlice, + UpdateFunc: func(old, cur interface{}) { + negController.enqueueEndpointSlice(cur) + }, + }) nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*apiv1.Node) @@ -721,16 +699,6 @@ func (c *Controller) handleErr(err error, key interface{}) { c.serviceQueue.AddRateLimited(key) } -func (c *Controller) enqueueEndpoint(obj interface{}) { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - c.logger.Error(err, "Failed to generate endpoint key") - return - } - c.logger.V(3).Info("Adding Endpoints to endpointQueue for processing", "endpoints", key) - c.endpointQueue.Add(key) -} - func (c *Controller) enqueueEndpointSlice(obj interface{}) { endpointSlice, ok := obj.(*discovery.EndpointSlice) if !ok { diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 7d07aea457..34a9fbac21 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -109,7 +109,7 @@ var ( } ) -func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, testContext *negtypes.TestContext, runL4, enableEndpointSlices bool, enableASM bool) *Controller { +func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, testContext *negtypes.TestContext, runL4 bool, enableASM bool) *Controller { if enableASM { kubeClient.CoreV1().ConfigMaps("kube-system").Create(context.TODO(), &apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "ingress-controller-config-test"}, Data: map[string]string{"enable-asm": "true"}}, metav1.CreateOptions{}) } @@ -121,7 +121,6 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test testContext.ServiceInformer, testContext.PodInformer, testContext.NodeInformer, - testContext.EndpointInformer, testContext.EndpointSliceInformer, testContext.SvcNegInformer, func() bool { return true }, @@ -140,17 +139,16 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test false, //enableNonGcpMode enableASM, //enableAsm []string{}, - enableEndpointSlices, klog.TODO(), ) } func newTestControllerWithASM(kubeClient kubernetes.Interface) *Controller { testContext := negtypes.NewTestContextWithKubeClient(kubeClient) - return newTestControllerWithParamsAndContext(kubeClient, testContext, false, false, true) + return newTestControllerWithParamsAndContext(kubeClient, testContext, false, true) } func newTestController(kubeClient kubernetes.Interface) *Controller { testContext := negtypes.NewTestContextWithKubeClient(kubeClient) - return newTestControllerWithParamsAndContext(kubeClient, testContext, false, false, false) + return newTestControllerWithParamsAndContext(kubeClient, testContext, false, false) } func TestIsHealthy(t *testing.T) { @@ -160,10 +158,6 @@ func TestIsHealthy(t *testing.T) { enableEndpointSlices bool desc string }{ - { - enableEndpointSlices: false, - desc: "Controller with endpoint slices disabled", - }, { enableEndpointSlices: true, desc: "Controller with endpoint slices enabled", @@ -171,7 +165,7 @@ func TestIsHealthy(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - controller := newTestControllerWithParamsAndContext(kubeClient, testContext, false, tc.enableEndpointSlices, false) + controller := newTestControllerWithParamsAndContext(kubeClient, testContext, false, false) defer controller.stop() err := controller.IsHealthy() @@ -339,7 +333,7 @@ func TestEnableNEGServiceWithIngress(t *testing.T) { func TestEnableNEGServiceWithL4ILB(t *testing.T) { kubeClient := fake.NewSimpleClientset() testContext := negtypes.NewTestContextWithKubeClient(kubeClient) - controller := newTestControllerWithParamsAndContext(kubeClient, testContext, true, false, false) + controller := newTestControllerWithParamsAndContext(kubeClient, testContext, true, false) manager := controller.manager.(*syncerManager) // L4 ILB NEGs will be created in zones with ready and unready nodes. Zones with upgrading nodes will be skipped. expectZones := []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3} @@ -414,7 +408,7 @@ func TestEnableNEGServiceWithL4ILB(t *testing.T) { func TestEnqueueNodeWithILBSubsetting(t *testing.T) { kubeClient := fake.NewSimpleClientset() testContext := negtypes.NewTestContextWithKubeClient(kubeClient) - controller := newTestControllerWithParamsAndContext(kubeClient, testContext, true, false, false) + controller := newTestControllerWithParamsAndContext(kubeClient, testContext, true, false) stopChan := make(chan struct{}, 1) // start the informer directly, without starting the entire controller. go testContext.NodeInformer.Run(stopChan) @@ -1192,25 +1186,12 @@ func TestEnqueueEndpoints(t *testing.T) { t.Parallel() testCases := []struct { desc string - useSlices bool endpoints *v1.Endpoints endpointSlice *discovery.EndpointSlice expectedKey string }{ { - desc: "Enqueue endpoint", - useSlices: false, - endpoints: &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: service, - Namespace: namespace, - }, - }, - expectedKey: fmt.Sprintf("%s/%s", namespace, service), - }, - { - desc: "Enqueue endpoint slices", - useSlices: true, + desc: "Enqueue endpoint slices", endpointSlice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: service + "-1", @@ -1221,8 +1202,7 @@ func TestEnqueueEndpoints(t *testing.T) { expectedKey: fmt.Sprintf("%s/%s", namespace, service), }, { - desc: "Enqueue malformed endpoint slices", - useSlices: true, + desc: "Enqueue malformed endpoint slices", endpointSlice: &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: service + "-1", @@ -1237,35 +1217,22 @@ func TestEnqueueEndpoints(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { kubeClient := fake.NewSimpleClientset() testContext := negtypes.NewTestContextWithKubeClient(kubeClient) - controller := newTestControllerWithParamsAndContext(kubeClient, testContext, false, tc.useSlices, false) + controller := newTestControllerWithParamsAndContext(kubeClient, testContext, false, false) stopChan := make(chan struct{}, 1) // start the informer directly, without starting the entire controller. - if tc.useSlices { - go testContext.EndpointSliceInformer.Run(stopChan) - } else { - go testContext.EndpointInformer.Run(stopChan) - } + go testContext.EndpointSliceInformer.Run(stopChan) defer func() { stopChan <- struct{}{} controller.stop() }() ctx := context.Background() var informer cache.SharedIndexInformer - if tc.useSlices { - endpointSliceClient := controller.client.DiscoveryV1().EndpointSlices(namespace) - _, err := endpointSliceClient.Create(ctx, tc.endpointSlice, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Failed to create test endpoint slice, error - %v", err) - } - informer = testContext.EndpointSliceInformer - } else { - endpointsClient := controller.client.CoreV1().Endpoints(namespace) - _, err := endpointsClient.Create(ctx, tc.endpoints, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Failed to create test endpoint, error - %v", err) - } - informer = testContext.EndpointInformer + endpointSliceClient := controller.client.DiscoveryV1().EndpointSlices(namespace) + _, err := endpointSliceClient.Create(ctx, tc.endpointSlice, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test endpoint slice, error - %v", err) } + informer = testContext.EndpointSliceInformer time.Sleep(5 * time.Second) if list := informer.GetIndexer().List(); len(list) != 1 { t.Errorf("Got list - %v of size %d, want 1 element", list, len(list)) diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 743bd52a6a..ec8ebb4e75 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -68,7 +68,6 @@ type syncerManager struct { nodeLister cache.Indexer podLister cache.Indexer serviceLister cache.Indexer - endpointLister cache.Indexer endpointSliceLister cache.Indexer svcNegLister cache.Indexer @@ -94,8 +93,7 @@ type syncerManager struct { // enableNonGcpMode indicates whether nonGcpMode have been enabled // This will make all NEGs created by NEG controller to be NON_GCP_PRIVATE_IP_PORT type. - enableNonGcpMode bool - enableEndpointSlices bool + enableNonGcpMode bool logger klog.Logger @@ -113,13 +111,11 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, kubeSystemUID types.UID, podLister cache.Indexer, serviceLister cache.Indexer, - endpointLister cache.Indexer, endpointSliceLister cache.Indexer, nodeLister cache.Indexer, svcNegLister cache.Indexer, syncerMetrics *metrics.SyncerMetrics, enableNonGcpMode bool, - enableEndpointSlices bool, logger klog.Logger) *syncerManager { var vmIpZoneMap, vmIpPortZoneMap map[string]struct{} @@ -127,26 +123,24 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, updateZoneMap(&vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), zoneGetter, logger) return &syncerManager{ - namer: namer, - recorder: recorder, - cloud: cloud, - zoneGetter: zoneGetter, - nodeLister: nodeLister, - podLister: podLister, - serviceLister: serviceLister, - endpointLister: endpointLister, - endpointSliceLister: endpointSliceLister, - svcNegLister: svcNegLister, - svcPortMap: make(map[serviceKey]negtypes.PortInfoMap), - syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer), - syncerMetrics: syncerMetrics, - svcNegClient: svcNegClient, - kubeSystemUID: kubeSystemUID, - enableNonGcpMode: enableNonGcpMode, - enableEndpointSlices: enableEndpointSlices, - logger: logger, - vmIpZoneMap: vmIpZoneMap, - vmIpPortZoneMap: vmIpPortZoneMap, + namer: namer, + recorder: recorder, + cloud: cloud, + zoneGetter: zoneGetter, + nodeLister: nodeLister, + podLister: podLister, + serviceLister: serviceLister, + endpointSliceLister: endpointSliceLister, + svcNegLister: svcNegLister, + svcPortMap: make(map[serviceKey]negtypes.PortInfoMap), + syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer), + syncerMetrics: syncerMetrics, + svcNegClient: svcNegClient, + kubeSystemUID: kubeSystemUID, + enableNonGcpMode: enableNonGcpMode, + logger: logger, + vmIpZoneMap: vmIpZoneMap, + vmIpPortZoneMap: vmIpPortZoneMap, } } @@ -224,7 +218,6 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.zoneGetter, manager.podLister, manager.serviceLister, - manager.endpointLister, manager.endpointSliceLister, manager.nodeLister, manager.svcNegLister, @@ -234,7 +227,6 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.svcNegClient, manager.syncerMetrics, !manager.namer.IsNEG(portInfo.NegName), - manager.enableEndpointSlices, manager.logger, ) manager.syncerMap[syncerKey] = syncer diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index dd286bf20b..42b8a4f6fb 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -88,13 +88,11 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce testContext.KubeSystemUID, testContext.PodInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(), - testContext.EndpointInformer.GetIndexer(), testContext.EndpointSliceInformer.GetIndexer(), testContext.NodeInformer.GetIndexer(), testContext.SvcNegInformer.GetIndexer(), metrics.FakeSyncerMetrics(), false, //enableNonGcpMode - false, //enableEndpointSlices klog.TODO(), ) return manager, testContext.Cloud diff --git a/pkg/neg/syncers/endpoints_calculator.go b/pkg/neg/syncers/endpoints_calculator.go index d3f9767b55..4025f892f7 100644 --- a/pkg/neg/syncers/endpoints_calculator.go +++ b/pkg/neg/syncers/endpoints_calculator.go @@ -187,7 +187,7 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode { // CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) { - return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.podLister, l.networkEndpointType) + return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType) } func nodeMapToString(nodeMap map[string][]*v1.Node) string { diff --git a/pkg/neg/syncers/endpoints_calculator_test.go b/pkg/neg/syncers/endpoints_calculator_test.go index d6cb70f9d8..6a8c6a5e37 100644 --- a/pkg/neg/syncers/endpoints_calculator_test.go +++ b/pkg/neg/syncers/endpoints_calculator_test.go @@ -36,84 +36,82 @@ import ( func TestLocalGetEndpointSet(t *testing.T) { t.Parallel() mode := negtypes.L4LocalMode - for _, use_endpoint_slices := range []bool{true, false} { - _, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), mode, use_endpoint_slices) - zoneGetter := negtypes.NewFakeZoneGetter() - nodeLister := listers.NewNodeLister(transactionSyncer.nodeLister) + _, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), mode) + zoneGetter := negtypes.NewFakeZoneGetter() + nodeLister := listers.NewNodeLister(transactionSyncer.nodeLister) - testCases := []struct { - desc string - endpointsData []negtypes.EndpointsData - endpointSets map[string]negtypes.NetworkEndpointSet - networkEndpointType negtypes.NetworkEndpointType - nodeLabelsMap map[string]map[string]string - nodeReadyStatusMap map[string]v1.ConditionStatus - nodeNames []string - }{ - { - desc: "default endpoints", - endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), - // only 4 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. - endpointSets: map[string]negtypes.NetworkEndpointSet{ - negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), - negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), - }, - networkEndpointType: negtypes.VmIpEndpointType, - nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, + testCases := []struct { + desc string + endpointsData []negtypes.EndpointsData + endpointSets map[string]negtypes.NetworkEndpointSet + networkEndpointType negtypes.NetworkEndpointType + nodeLabelsMap map[string]map[string]string + nodeReadyStatusMap map[string]v1.ConditionStatus + nodeNames []string + }{ + { + desc: "default endpoints", + endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), + // only 4 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), }, - { - desc: "default endpoints, all nodes unready", - endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), - // only 4 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. - endpointSets: map[string]negtypes.NetworkEndpointSet{ - negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), - negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), - }, - networkEndpointType: negtypes.VmIpEndpointType, - nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, - nodeReadyStatusMap: map[string]v1.ConditionStatus{ - testInstance1: v1.ConditionFalse, testInstance2: v1.ConditionFalse, testInstance3: v1.ConditionFalse, testInstance4: v1.ConditionFalse, testInstance5: v1.ConditionFalse, testInstance6: v1.ConditionFalse, - }, + networkEndpointType: negtypes.VmIpEndpointType, + nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, + }, + { + desc: "default endpoints, all nodes unready", + endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), + // only 4 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), }, - { - desc: "default endpoints, some nodes marked as non-candidates", - endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), - nodeLabelsMap: map[string]map[string]string{ - testInstance1: {utils.LabelNodeRoleExcludeBalancer: "true"}, - testInstance3: {utils.GKECurrentOperationLabel: utils.NodeDrain}, - // label for testInstance4 will not remove it from endpoints list, since operation value is "random". - testInstance4: {utils.GKECurrentOperationLabel: "random"}, - }, - nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, - // only 2 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. 2 out of those 4 are non-candidates. - endpointSets: map[string]negtypes.NetworkEndpointSet{ - negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), - negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), - }, - networkEndpointType: negtypes.VmIpEndpointType, + networkEndpointType: negtypes.VmIpEndpointType, + nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, + nodeReadyStatusMap: map[string]v1.ConditionStatus{ + testInstance1: v1.ConditionFalse, testInstance2: v1.ConditionFalse, testInstance3: v1.ConditionFalse, testInstance4: v1.ConditionFalse, testInstance5: v1.ConditionFalse, testInstance6: v1.ConditionFalse, + }, + }, + { + desc: "default endpoints, some nodes marked as non-candidates", + endpointsData: negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), + nodeLabelsMap: map[string]map[string]string{ + testInstance1: {utils.LabelNodeRoleExcludeBalancer: "true"}, + testInstance3: {utils.GKECurrentOperationLabel: utils.NodeDrain}, + // label for testInstance4 will not remove it from endpoints list, since operation value is "random". + testInstance4: {utils.GKECurrentOperationLabel: "random"}, }, - { - desc: "no endpoints", - endpointsData: []negtypes.EndpointsData{}, - // No nodes are picked as there are no service endpoints. - endpointSets: nil, - networkEndpointType: negtypes.VmIpEndpointType, - nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, + nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, + // only 2 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. 2 out of those 4 are non-candidates. + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), }, + networkEndpointType: negtypes.VmIpEndpointType, + }, + { + desc: "no endpoints", + endpointsData: []negtypes.EndpointsData{}, + // No nodes are picked as there are no service endpoints. + endpointSets: nil, + networkEndpointType: negtypes.VmIpEndpointType, + nodeNames: []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}, + }, + } + svcKey := fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace) + ec := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO()) + for _, tc := range testCases { + createNodes(t, tc.nodeNames, tc.nodeLabelsMap, tc.nodeReadyStatusMap, transactionSyncer.nodeLister) + retSet, _, _, err := ec.CalculateEndpoints(tc.endpointsData, nil) + if err != nil { + t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) } - svcKey := fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace) - ec := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO()) - for _, tc := range testCases { - createNodes(t, tc.nodeNames, tc.nodeLabelsMap, tc.nodeReadyStatusMap, transactionSyncer.nodeLister) - retSet, _, _, err := ec.CalculateEndpoints(tc.endpointsData, nil) - if err != nil { - t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) - } - if !reflect.DeepEqual(retSet, tc.endpointSets) { - t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, retSet) - } - deleteNodes(t, tc.nodeNames, transactionSyncer.nodeLister) + if !reflect.DeepEqual(retSet, tc.endpointSets) { + t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, retSet) } + deleteNodes(t, tc.nodeNames, transactionSyncer.nodeLister) } } @@ -122,7 +120,7 @@ func TestClusterGetEndpointSet(t *testing.T) { t.Parallel() mode := negtypes.L4ClusterMode // The "enableEndpointSlices=false" case is enough since this test does not depend on endpoints at all. - _, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), mode, false) + _, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), mode) zoneGetter := negtypes.NewFakeZoneGetter() nodeLister := listers.NewNodeLister(transactionSyncer.nodeLister) testCases := []struct { diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index fbc1078927..a0666333e6 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -69,7 +69,6 @@ type transactionSyncer struct { podLister cache.Indexer serviceLister cache.Indexer - endpointLister cache.Indexer endpointSliceLister cache.Indexer nodeLister cache.Indexer svcNegLister cache.Indexer @@ -93,8 +92,6 @@ type transactionSyncer struct { // customName indicates whether the NEG name is a generated one or custom one customName bool - enableEndpointSlices bool - logger klog.Logger // errorState indicates if the syncer is in any of 4 error scenarios @@ -116,7 +113,6 @@ func NewTransactionSyncer( zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, - endpointLister cache.Indexer, endpointSliceLister cache.Indexer, nodeLister cache.Indexer, svcNegLister cache.Indexer, @@ -126,34 +122,31 @@ func NewTransactionSyncer( svcNegClient svcnegclient.Interface, syncerMetrics *metrics.SyncerMetrics, customName bool, - enableEndpointSlices bool, log klog.Logger) negtypes.NegSyncer { logger := log.WithName("Syncer").WithValues("service", klog.KRef(negSyncerKey.Namespace, negSyncerKey.Name), "negName", negSyncerKey.NegName) // TransactionSyncer implements the syncer core ts := &transactionSyncer{ - NegSyncerKey: negSyncerKey, - needInit: true, - transactions: NewTransactionTable(), - nodeLister: nodeLister, - podLister: podLister, - serviceLister: serviceLister, - endpointLister: endpointLister, - endpointSliceLister: endpointSliceLister, - svcNegLister: svcNegLister, - recorder: recorder, - cloud: cloud, - zoneGetter: zoneGetter, - endpointsCalculator: epc, - reflector: reflector, - kubeSystemUID: kubeSystemUID, - svcNegClient: svcNegClient, - syncCollector: syncerMetrics, - customName: customName, - enableEndpointSlices: enableEndpointSlices, - errorState: "", - logger: logger, + NegSyncerKey: negSyncerKey, + needInit: true, + transactions: NewTransactionTable(), + nodeLister: nodeLister, + podLister: podLister, + serviceLister: serviceLister, + endpointSliceLister: endpointSliceLister, + svcNegLister: svcNegLister, + recorder: recorder, + cloud: cloud, + zoneGetter: zoneGetter, + endpointsCalculator: epc, + reflector: reflector, + kubeSystemUID: kubeSystemUID, + svcNegClient: svcNegClient, + syncCollector: syncerMetrics, + customName: customName, + errorState: "", + logger: logger, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) @@ -235,68 +228,45 @@ func (s *transactionSyncer) syncInternalImpl() error { var endpointPodMap negtypes.EndpointPodMap var dupCount int - if s.enableEndpointSlices { - slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name)) + slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name)) + if err != nil { + return err + } + if len(slices) < 1 { + s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync") + return nil + } + endpointSlices := make([]*discovery.EndpointSlice, len(slices)) + negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) + for i, slice := range slices { + endpointslice := slice.(*discovery.EndpointSlice) + endpointSlices[i] = endpointslice if err != nil { - return err - } - if len(slices) < 1 { - s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync") - return nil + s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName)) + continue } - endpointSlices := make([]*discovery.EndpointSlice, len(slices)) - negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) - for i, slice := range slices { - endpointslice := slice.(*discovery.EndpointSlice) - endpointSlices[i] = endpointslice - if err != nil { - s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName)) - continue - } - lastSyncTimestamp := negCR.Status.LastSyncTime - epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp + lastSyncTimestamp := negCR.Status.LastSyncTime + epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp - epsStaleness := time.Since(lastSyncTimestamp.Time) - // if this endpoint slice is newly created/created after last sync - if lastSyncTimestamp.Before(&epsCreationTimestamp) { - epsStaleness = time.Since(epsCreationTimestamp.Time) - } - metrics.PublishNegEPSStalenessMetrics(epsStaleness) - s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness) - - } - endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) - targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if valid, reason := s.isValidEPField(err); !valid { - s.setErrorState(reason) - } - if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid { - s.setErrorState(reason) - } - if err != nil { - return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) - } - } else { - ep, exists, err := s.endpointLister.Get( - &apiv1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: s.Name, - Namespace: s.Namespace, - }, - }, - ) - if err != nil { - return err - } - if !exists { - s.logger.Info("Endpoint does not exist. Skipping NEG sync", "endpoint", klog.KRef(s.Namespace, s.Name)) - return nil - } - endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints)) - targetMap, endpointPodMap, _, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if err != nil { - return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) + epsStaleness := time.Since(lastSyncTimestamp.Time) + // if this endpoint slice is newly created/created after last sync + if lastSyncTimestamp.Before(&epsCreationTimestamp) { + epsStaleness = time.Since(epsCreationTimestamp.Time) } + metrics.PublishNegEPSStalenessMetrics(epsStaleness) + s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness) + + } + endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) + targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) + if valid, reason := s.isValidEPField(err); !valid { + s.setErrorState(reason) + } + if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid { + s.setErrorState(reason) + } + if err != nil { + return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) } s.logStats(targetMap, "desired NEG endpoints") diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index bf58e56ce9..fb8d49ae81 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -77,350 +77,346 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { negtypes.VmIpPortEndpointType, } - for _, enableEndpointSlices := range []bool{false, true} { - for _, testNegType := range testNegTypes { - _, transactionSyncer := newTestTransactionSyncer(fakeCloud, testNegType, false, enableEndpointSlices) - if err := transactionSyncer.ensureNetworkEndpointGroups(); err != nil { - t.Errorf("Expect error == nil, but got %v", err) - } - var targetPort string - if testNegType == negtypes.VmIpPortEndpointType { - targetPort = "8080" - } + for _, testNegType := range testNegTypes { + _, transactionSyncer := newTestTransactionSyncer(fakeCloud, testNegType, false) + if err := transactionSyncer.ensureNetworkEndpointGroups(); err != nil { + t.Errorf("Expect error == nil, but got %v", err) + } + var targetPort string + if testNegType == negtypes.VmIpPortEndpointType { + targetPort = "8080" + } - // Verify the NEGs are created as expected - ret, _ := transactionSyncer.cloud.AggregatedListNetworkEndpointGroup(transactionSyncer.NegSyncerKey.GetAPIVersion()) - // Though the test cases below only add instances in zone1 and zone2, NEGs will be created in zone3 or zone4 as well since fakeZoneGetter includes those zones. - var expectZones []string - if testNegType == negtypes.VmIpEndpointType { - expectZones = []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3} - } else { - expectZones = []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4} - } - retZones := sets.NewString() + // Verify the NEGs are created as expected + ret, _ := transactionSyncer.cloud.AggregatedListNetworkEndpointGroup(transactionSyncer.NegSyncerKey.GetAPIVersion()) + // Though the test cases below only add instances in zone1 and zone2, NEGs will be created in zone3 or zone4 as well since fakeZoneGetter includes those zones. + var expectZones []string + if testNegType == negtypes.VmIpEndpointType { + expectZones = []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3} + } else { + expectZones = []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4} + } + retZones := sets.NewString() - for key := range ret { - retZones.Insert(key.Zone) + for key := range ret { + retZones.Insert(key.Zone) + } + for _, zone := range expectZones { + _, ok := retZones[zone] + if !ok { + t.Errorf("Failed to find zone %q from ret %v for negType %v", zone, ret, testNegType) + continue } - for _, zone := range expectZones { - _, ok := retZones[zone] - if !ok { - t.Errorf("Failed to find zone %q from ret %v for negType %v", zone, ret, testNegType) - continue - } + } + for _, neg := range ret { + if neg.Name != transactionSyncer.NegName { + t.Errorf("Unexpected neg %q, expected %q", neg.Name, transactionSyncer.NegName) } - for _, neg := range ret { - if neg.Name != transactionSyncer.NegName { - t.Errorf("Unexpected neg %q, expected %q", neg.Name, transactionSyncer.NegName) - } - if neg.NetworkEndpointType != string(testNegType) { - t.Errorf("Unexpected neg type %q, expected %q", neg.Type, testNegType) - } - if neg.Description == "" { - t.Errorf("Neg Description should be populated when NEG CRD is enabled") - } + if neg.NetworkEndpointType != string(testNegType) { + t.Errorf("Unexpected neg type %q, expected %q", neg.Type, testNegType) } - - testCases := []struct { - desc string - addEndpoints map[string]negtypes.NetworkEndpointSet - removeEndpoints map[string]negtypes.NetworkEndpointSet - expectEndpoints map[string]negtypes.NetworkEndpointSet - }{ - { - "empty input", - map[string]negtypes.NetworkEndpointSet{}, - map[string]negtypes.NetworkEndpointSet{}, - map[string]negtypes.NetworkEndpointSet{}, - }, - { - "add some endpoints", - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{}, - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - }, - { - "remove some endpoints", - map[string]negtypes.NetworkEndpointSet{}, - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{ - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - }, - { - "add duplicate endpoints", - map[string]negtypes.NetworkEndpointSet{ - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{}, - map[string]negtypes.NetworkEndpointSet{ - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - }, - { - "add and remove endpoints", - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{ - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), - }, - }, - { - "add more endpoints", - map[string]negtypes.NetworkEndpointSet{ - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{}, - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), - }, - }, - { - "add and remove endpoints in both zones", - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), - }, - map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), - }, - }, + if neg.Description == "" { + t.Errorf("Neg Description should be populated when NEG CRD is enabled") } - - for _, tc := range testCases { - err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints) - if err != nil { - t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) - } - - if err := waitForTransactions(transactionSyncer); err != nil { - t.Errorf("For case %q, waitForTransactions() got %v, want nil", tc.desc, err) - } - - for zone, endpoints := range tc.expectEndpoints { - list, err := fakeCloud.ListNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, false, transactionSyncer.NegSyncerKey.GetAPIVersion()) - if err != nil { - t.Errorf("For case %q, ListNetworkEndpoints() got %v, want nil", tc.desc, err) - } - - endpointSet := negtypes.NewNetworkEndpointSet() - for _, ep := range list { - tmp := negtypes.NetworkEndpoint{IP: ep.NetworkEndpoint.IpAddress, Node: ep.NetworkEndpoint.Instance} - if testNegType == negtypes.VmIpPortEndpointType { - tmp.Port = strconv.FormatInt(ep.NetworkEndpoint.Port, 10) - } - endpointSet.Insert(tmp) - } - - if !endpoints.Equal(endpointSet) { - t.Errorf("For case %q, in zone %q, negType %q, endpointSets endpoints == %v, but got %v, difference: \n(want - got) = %v\n(got - want) = %v", tc.desc, zone, testNegType, endpoints, endpointSet, endpoints.Difference(endpointSet), endpointSet.Difference(endpoints)) - } - } - } - transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone1, transactionSyncer.NegSyncerKey.GetAPIVersion()) - transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone2, transactionSyncer.NegSyncerKey.GetAPIVersion()) - transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone3, transactionSyncer.NegSyncerKey.GetAPIVersion()) - transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone4, transactionSyncer.NegSyncerKey.GetAPIVersion()) } - } -} - -func TestCommitTransaction(t *testing.T) { - t.Parallel() - for _, enableEndpointSlices := range []bool{false, true} { - s, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, enableEndpointSlices) - // use testSyncer to track the number of Sync got triggered - testSyncer := &testSyncer{s.(*syncer), 0} - testRetryer := &testRetryHandler{testSyncer, 0} - transactionSyncer.syncer = testSyncer - // assume NEG is initialized - transactionSyncer.needInit = false - transactionSyncer.retry = testRetryer testCases := []struct { - desc string - err error - endpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint - table func() networkEndpointTransactionTable - expect func() networkEndpointTransactionTable - expectSyncCount int - expectRetryCount int - expectNeedInit bool + desc string + addEndpoints map[string]negtypes.NetworkEndpointSet + removeEndpoints map[string]negtypes.NetworkEndpointSet + expectEndpoints map[string]negtypes.NetworkEndpointSet }{ { - "empty inputs", - nil, - map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{}, - func() networkEndpointTransactionTable { return NewTransactionTable() }, - func() networkEndpointTransactionTable { return NewTransactionTable() }, - 1, - 0, - false, + "empty input", + map[string]negtypes.NetworkEndpointSet{}, + map[string]negtypes.NetworkEndpointSet{}, + map[string]negtypes.NetworkEndpointSet{}, }, { - "attach 10 endpoints on 1 instance successfully", - nil, - generateEndpointBatch(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")), - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - return table + "add some endpoints", + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - func() networkEndpointTransactionTable { return NewTransactionTable() }, - 2, - 0, - false, - }, - { - "detach 20 endpoints on 2 instances successfully", - nil, - generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - return table + map[string]negtypes.NetworkEndpointSet{}, + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - func() networkEndpointTransactionTable { return NewTransactionTable() }, - 3, - 0, - false, }, { - "attach 20 endpoints on 2 instances successfully with unrelated 10 entries in the transaction table", - nil, - generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + "remove some endpoints", + map[string]negtypes.NetworkEndpointSet{}, + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), }, - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + map[string]negtypes.NetworkEndpointSet{ + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - 4, - 0, - false, }, { - "error and retry", - fmt.Errorf("dummy error"), - map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{}, - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + "add duplicate endpoints", + map[string]negtypes.NetworkEndpointSet{ + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + map[string]negtypes.NetworkEndpointSet{}, + map[string]negtypes.NetworkEndpointSet{ + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - 5, - 1, - true, }, { - "error and retry #2", - fmt.Errorf("dummy error"), - generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + "add and remove endpoints", + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + }, + map[string]negtypes.NetworkEndpointSet{ + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), }, - 6, - 2, - true, }, { - "detach 20 endpoints on 2 instance but missing transaction entries on 1 instance", - nil, - generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - return table + "add more endpoints", + map[string]negtypes.NetworkEndpointSet{ + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + }, + map[string]negtypes.NetworkEndpointSet{}, + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), }, - func() networkEndpointTransactionTable { return NewTransactionTable() }, - 7, - 2, - false, }, { - "detach 20 endpoints on 2 instance but 10 endpoints needs reconcile", - nil, - generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + "add and remove endpoints in both zones", + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - func() networkEndpointTransactionTable { - table := NewTransactionTable() - generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - return table + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + }, + map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + testZone2: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, - 8, - 2, - false, }, } for _, tc := range testCases { - transactionSyncer.transactions = tc.table() - transactionSyncer.commitTransaction(tc.err, tc.endpointMap) - if transactionSyncer.needInit != tc.expectNeedInit { - t.Errorf("For case %q, endpointSets needInit == %v, but got %v", tc.desc, tc.expectNeedInit, transactionSyncer.needInit) + err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints) + if err != nil { + t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) } - if transactionSyncer.needInit == true { - transactionSyncer.needInit = false + + if err := waitForTransactions(transactionSyncer); err != nil { + t.Errorf("For case %q, waitForTransactions() got %v, want nil", tc.desc, err) } - validateTransactionTableEquality(t, tc.desc, transactionSyncer.transactions, tc.expect()) - // wait for the sync count to bump - if err := wait.PollImmediate(time.Microsecond, 5*time.Second, func() (bool, error) { - if tc.expectSyncCount == testSyncer.SyncCount && tc.expectRetryCount == testRetryer.RetryCount { - return true, nil + for zone, endpoints := range tc.expectEndpoints { + list, err := fakeCloud.ListNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, false, transactionSyncer.NegSyncerKey.GetAPIVersion()) + if err != nil { + t.Errorf("For case %q, ListNetworkEndpoints() got %v, want nil", tc.desc, err) + } + + endpointSet := negtypes.NewNetworkEndpointSet() + for _, ep := range list { + tmp := negtypes.NetworkEndpoint{IP: ep.NetworkEndpoint.IpAddress, Node: ep.NetworkEndpoint.Instance} + if testNegType == negtypes.VmIpPortEndpointType { + tmp.Port = strconv.FormatInt(ep.NetworkEndpoint.Port, 10) + } + endpointSet.Insert(tmp) + } + + if !endpoints.Equal(endpointSet) { + t.Errorf("For case %q, in zone %q, negType %q, endpointSets endpoints == %v, but got %v, difference: \n(want - got) = %v\n(got - want) = %v", tc.desc, zone, testNegType, endpoints, endpointSet, endpoints.Difference(endpointSet), endpointSet.Difference(endpoints)) } - return false, nil - }); err != nil { - t.Errorf("For case %q, endpointSets sync count == %v, but got %v", tc.desc, tc.expectSyncCount, testSyncer.SyncCount) - t.Errorf("For case %q, endpointSets retry count == %v, but got %v", tc.desc, tc.expectRetryCount, testRetryer.RetryCount) } + } + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone1, transactionSyncer.NegSyncerKey.GetAPIVersion()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone2, transactionSyncer.NegSyncerKey.GetAPIVersion()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone3, transactionSyncer.NegSyncerKey.GetAPIVersion()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone4, transactionSyncer.NegSyncerKey.GetAPIVersion()) + } +} +func TestCommitTransaction(t *testing.T) { + t.Parallel() + s, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false) + // use testSyncer to track the number of Sync got triggered + testSyncer := &testSyncer{s.(*syncer), 0} + testRetryer := &testRetryHandler{testSyncer, 0} + transactionSyncer.syncer = testSyncer + // assume NEG is initialized + transactionSyncer.needInit = false + transactionSyncer.retry = testRetryer + + testCases := []struct { + desc string + err error + endpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint + table func() networkEndpointTransactionTable + expect func() networkEndpointTransactionTable + expectSyncCount int + expectRetryCount int + expectNeedInit bool + }{ + { + "empty inputs", + nil, + map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{}, + func() networkEndpointTransactionTable { return NewTransactionTable() }, + func() networkEndpointTransactionTable { return NewTransactionTable() }, + 1, + 0, + false, + }, + { + "attach 10 endpoints on 1 instance successfully", + nil, + generateEndpointBatch(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")), + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + return table + }, + func() networkEndpointTransactionTable { return NewTransactionTable() }, + 2, + 0, + false, + }, + { + "detach 20 endpoints on 2 instances successfully", + nil, + generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + return table + }, + func() networkEndpointTransactionTable { return NewTransactionTable() }, + 3, + 0, + false, + }, + { + "attach 20 endpoints on 2 instances successfully with unrelated 10 entries in the transaction table", + nil, + generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + 4, + 0, + false, + }, + { + "error and retry", + fmt.Errorf("dummy error"), + map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{}, + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + 5, + 1, + true, + }, + { + "error and retry #2", + fmt.Errorf("dummy error"), + generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: attachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + 6, + 2, + true, + }, + { + "detach 20 endpoints on 2 instance but missing transaction entries on 1 instance", + nil, + generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + return table + }, + func() networkEndpointTransactionTable { return NewTransactionTable() }, + 7, + 2, + false, + }, + { + "detach 20 endpoints on 2 instance but 10 endpoints needs reconcile", + nil, + generateEndpointBatch(negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080")).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080"))), + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + generateTransaction(table, transactionEntry{Zone: testZone1, Operation: detachOp}, net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + func() networkEndpointTransactionTable { + table := NewTransactionTable() + generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + return table + }, + 8, + 2, + false, + }, + } + + for _, tc := range testCases { + transactionSyncer.transactions = tc.table() + transactionSyncer.commitTransaction(tc.err, tc.endpointMap) + if transactionSyncer.needInit != tc.expectNeedInit { + t.Errorf("For case %q, endpointSets needInit == %v, but got %v", tc.desc, tc.expectNeedInit, transactionSyncer.needInit) + } + if transactionSyncer.needInit == true { + transactionSyncer.needInit = false } + + validateTransactionTableEquality(t, tc.desc, transactionSyncer.transactions, tc.expect()) + // wait for the sync count to bump + if err := wait.PollImmediate(time.Microsecond, 5*time.Second, func() (bool, error) { + if tc.expectSyncCount == testSyncer.SyncCount && tc.expectRetryCount == testRetryer.RetryCount { + return true, nil + } + return false, nil + }); err != nil { + t.Errorf("For case %q, endpointSets sync count == %v, but got %v", tc.desc, tc.expectSyncCount, testSyncer.SyncCount) + t.Errorf("For case %q, endpointSets retry count == %v, but got %v", tc.desc, tc.expectRetryCount, testRetryer.RetryCount) + } + } } @@ -661,209 +657,207 @@ func TestFilterEndpointByTransaction(t *testing.T) { func TestCommitPods(t *testing.T) { t.Parallel() - for _, enableEndpointSlices := range []bool{false, true} { - _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, enableEndpointSlices) - reflector := &testReflector{} - transactionSyncer.reflector = reflector + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false) + reflector := &testReflector{} + transactionSyncer.reflector = reflector - for _, tc := range []struct { - desc string - input func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) - expectOutput func() map[string]negtypes.EndpointPodMap - }{ - { - desc: "empty input", - input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { - return nil, nil - }, - expectOutput: func() map[string]negtypes.EndpointPodMap { return map[string]negtypes.EndpointPodMap{} }, + for _, tc := range []struct { + desc string + input func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) + expectOutput func() map[string]negtypes.EndpointPodMap + }{ + { + desc: "empty input", + input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { + return nil, nil }, - { - desc: "10 endpoints from 1 instance in 1 zone", - input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { - endpointSet, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - return map[string]negtypes.NetworkEndpointSet{testZone1: endpointSet}, endpointMap - }, - expectOutput: func() map[string]negtypes.EndpointPodMap { - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - return map[string]negtypes.EndpointPodMap{testZone1: endpointMap} - }, + expectOutput: func() map[string]negtypes.EndpointPodMap { return map[string]negtypes.EndpointPodMap{} }, + }, + { + desc: "10 endpoints from 1 instance in 1 zone", + input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { + endpointSet, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + return map[string]negtypes.NetworkEndpointSet{testZone1: endpointSet}, endpointMap }, - { - desc: "40 endpoints from 4 instances in 2 zone", - input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { - retSet := map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet(), - testZone2: negtypes.NewNetworkEndpointSet(), - } - retMap := negtypes.EndpointPodMap{} - endpointSet, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - retMap = unionEndpointMap(retMap, endpointMap) - endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - retMap = unionEndpointMap(retMap, endpointMap) - endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - retMap = unionEndpointMap(retMap, endpointMap) - endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - retMap = unionEndpointMap(retMap, endpointMap) - return retSet, retMap - }, - expectOutput: func() map[string]negtypes.EndpointPodMap { - retMap := map[string]negtypes.EndpointPodMap{ - testZone1: {}, - testZone2: {}, - } - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - return retMap - }, + expectOutput: func() map[string]negtypes.EndpointPodMap { + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + return map[string]negtypes.EndpointPodMap{testZone1: endpointMap} }, - { - desc: "40 endpoints from 4 instances in 2 zone, but half of the endpoints does not have corresponding pod mapping", - input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { - retSet := map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet(), - testZone2: negtypes.NewNetworkEndpointSet(), - } - retMap := negtypes.EndpointPodMap{} - - endpointSet, _ := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 5, testInstance1, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - - endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 5, testInstance2, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - - endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 5, testInstance3, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - - endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 5, testInstance4, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - return retSet, retMap - }, - expectOutput: func() map[string]negtypes.EndpointPodMap { - retMap := map[string]negtypes.EndpointPodMap{ - testZone1: {}, - testZone2: {}, - } - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 5, testInstance1, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 5, testInstance2, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 5, testInstance3, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 5, testInstance4, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - return retMap - }, + }, + { + desc: "40 endpoints from 4 instances in 2 zone", + input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { + retSet := map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet(), + testZone2: negtypes.NewNetworkEndpointSet(), + } + retMap := negtypes.EndpointPodMap{} + endpointSet, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + retMap = unionEndpointMap(retMap, endpointMap) + endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + retMap = unionEndpointMap(retMap, endpointMap) + endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + retMap = unionEndpointMap(retMap, endpointMap) + endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + retMap = unionEndpointMap(retMap, endpointMap) + return retSet, retMap }, - { - desc: "40 endpoints from 4 instances in 2 zone, and more endpoints are in pod mapping", - input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { - retSet := map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet(), - testZone2: negtypes.NewNetworkEndpointSet(), - } - retMap := negtypes.EndpointPodMap{} - - endpointSet, _ := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 15, testInstance1, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - - endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 15, testInstance2, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - - endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 15, testInstance3, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - - endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 15, testInstance4, "8080") - retMap = unionEndpointMap(retMap, endpointMap) - return retSet, retMap - }, - expectOutput: func() map[string]negtypes.EndpointPodMap { - retMap := map[string]negtypes.EndpointPodMap{ - testZone1: {}, - testZone2: {}, - } - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - return retMap - }, + expectOutput: func() map[string]negtypes.EndpointPodMap { + retMap := map[string]negtypes.EndpointPodMap{ + testZone1: {}, + testZone2: {}, + } + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + return retMap }, - { - desc: "40 endpoints from 4 instances in 2 zone, but some nodes do not have endpoint pod mapping", - input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { - retSet := map[string]negtypes.NetworkEndpointSet{ - testZone1: negtypes.NewNetworkEndpointSet(), - testZone2: negtypes.NewNetworkEndpointSet(), - } - retMap := negtypes.EndpointPodMap{} - endpointSet, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - retMap = unionEndpointMap(retMap, endpointMap) - endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") - retSet[testZone1] = retSet[testZone1].Union(endpointSet) - endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - retMap = unionEndpointMap(retMap, endpointMap) - endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") - retSet[testZone2] = retSet[testZone2].Union(endpointSet) - return retSet, retMap - }, - expectOutput: func() map[string]negtypes.EndpointPodMap { - retMap := map[string]negtypes.EndpointPodMap{ - testZone1: {}, - testZone2: {}, - } - _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") - retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) - _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") - retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) - return retMap - }, + }, + { + desc: "40 endpoints from 4 instances in 2 zone, but half of the endpoints does not have corresponding pod mapping", + input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { + retSet := map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet(), + testZone2: negtypes.NewNetworkEndpointSet(), + } + retMap := negtypes.EndpointPodMap{} + + endpointSet, _ := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 5, testInstance1, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + + endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 5, testInstance2, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + + endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 5, testInstance3, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + + endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 5, testInstance4, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + return retSet, retMap }, - } { - reflector.Flush() - endpointMap, endpointPodMap := tc.input() - expectOutput := tc.expectOutput() - transactionSyncer.commitPods(endpointMap, endpointPodMap) - negNameSet := sets.NewString(reflector.negNames...) - if len(expectOutput) != 0 && !(negNameSet.Len() == 1 && negNameSet.Has(transactionSyncer.NegSyncerKey.NegName)) { - t.Errorf("For test case %q, expect neg name to be %v, but got %v", tc.desc, transactionSyncer.NegSyncerKey.NegName, negNameSet.List()) - } + expectOutput: func() map[string]negtypes.EndpointPodMap { + retMap := map[string]negtypes.EndpointPodMap{ + testZone1: {}, + testZone2: {}, + } + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 5, testInstance1, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 5, testInstance2, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 5, testInstance3, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 5, testInstance4, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + return retMap + }, + }, + { + desc: "40 endpoints from 4 instances in 2 zone, and more endpoints are in pod mapping", + input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { + retSet := map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet(), + testZone2: negtypes.NewNetworkEndpointSet(), + } + retMap := negtypes.EndpointPodMap{} + + endpointSet, _ := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 15, testInstance1, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + + endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 15, testInstance2, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + + endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 15, testInstance3, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + + endpointSet, _ = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 15, testInstance4, "8080") + retMap = unionEndpointMap(retMap, endpointMap) + return retSet, retMap + }, + expectOutput: func() map[string]negtypes.EndpointPodMap { + retMap := map[string]negtypes.EndpointPodMap{ + testZone1: {}, + testZone2: {}, + } + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + return retMap + }, + }, + { + desc: "40 endpoints from 4 instances in 2 zone, but some nodes do not have endpoint pod mapping", + input: func() (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap) { + retSet := map[string]negtypes.NetworkEndpointSet{ + testZone1: negtypes.NewNetworkEndpointSet(), + testZone2: negtypes.NewNetworkEndpointSet(), + } + retMap := negtypes.EndpointPodMap{} + endpointSet, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + retMap = unionEndpointMap(retMap, endpointMap) + endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.2.1"), 10, testInstance2, "8080") + retSet[testZone1] = retSet[testZone1].Union(endpointSet) + endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + retMap = unionEndpointMap(retMap, endpointMap) + endpointSet, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.4.1"), 10, testInstance4, "8080") + retSet[testZone2] = retSet[testZone2].Union(endpointSet) + return retSet, retMap + }, + expectOutput: func() map[string]negtypes.EndpointPodMap { + retMap := map[string]negtypes.EndpointPodMap{ + testZone1: {}, + testZone2: {}, + } + _, endpointMap := generateEndpointSetAndMap(net.ParseIP("1.1.1.1"), 10, testInstance1, "8080") + retMap[testZone1] = unionEndpointMap(retMap[testZone1], endpointMap) + _, endpointMap = generateEndpointSetAndMap(net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") + retMap[testZone2] = unionEndpointMap(retMap[testZone2], endpointMap) + return retMap + }, + }, + } { + reflector.Flush() + endpointMap, endpointPodMap := tc.input() + expectOutput := tc.expectOutput() + transactionSyncer.commitPods(endpointMap, endpointPodMap) + negNameSet := sets.NewString(reflector.negNames...) + if len(expectOutput) != 0 && !(negNameSet.Len() == 1 && negNameSet.Has(transactionSyncer.NegSyncerKey.NegName)) { + t.Errorf("For test case %q, expect neg name to be %v, but got %v", tc.desc, transactionSyncer.NegSyncerKey.NegName, negNameSet.List()) + } - if !reflect.DeepEqual(expectOutput, reflector.endpointMaps) { - t.Errorf("For test case %q, expect endpoint map to be %v, but got %v", tc.desc, expectOutput, reflector.endpointMaps) - } + if !reflect.DeepEqual(expectOutput, reflector.endpointMaps) { + t.Errorf("For test case %q, expect endpoint map to be %v, but got %v", tc.desc, expectOutput, reflector.endpointMaps) } } } @@ -998,106 +992,104 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { }, } - for _, enableEndpointSlices := range []bool{false, true} { - for _, tc := range testCases { - _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, tc.customName, enableEndpointSlices) - negClient := syncer.svcNegClient - t.Run(tc.desc, func(t *testing.T) { - // fakeZoneGetter will list 3 zones for VM_IP_PORT NEGs. - expectZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4) - - var expectedNegRefs map[string]negv1beta1.NegObjectReference - if tc.negExists { - for zone := range expectZones { - fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ - Version: syncer.NegSyncerKey.GetAPIVersion(), - Name: testNegName, - NetworkEndpointType: string(syncer.NegSyncerKey.NegType), - Network: fakeCloud.NetworkURL(), - Subnetwork: fakeCloud.SubnetworkURL(), - Description: tc.negDesc, - }, zone) - } - ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion()) - expectedNegRefs = negObjectReferences(ret) - } - var refs []negv1beta1.NegObjectReference - if tc.crStatusPopulated { - for _, neg := range expectedNegRefs { - refs = append(refs, neg) - } + for _, tc := range testCases { + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, tc.customName) + negClient := syncer.svcNegClient + t.Run(tc.desc, func(t *testing.T) { + // fakeZoneGetter will list 3 zones for VM_IP_PORT NEGs. + expectZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4) + + var expectedNegRefs map[string]negv1beta1.NegObjectReference + if tc.negExists { + for zone := range expectZones { + fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: testNegName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + Description: tc.negDesc, + }, zone) } - - // Since timestamp gets truncated to the second, there is a chance that the timestamps will be the same as LastTransitionTime or LastSyncTime so use creation TS from an earlier date. - creationTS := v1.Date(2020, time.July, 23, 0, 0, 0, 0, time.UTC) - //Create NEG CR for Syncer to update status on - origCR := createNegCR(testNegName, creationTS, tc.crStatusPopulated, tc.crStatusPopulated, refs) - neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) - if err != nil { - t.Errorf("Failed to create test NEG CR: %s", err) + ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion()) + expectedNegRefs = negObjectReferences(ret) + } + var refs []negv1beta1.NegObjectReference + if tc.crStatusPopulated { + for _, neg := range expectedNegRefs { + refs = append(refs, neg) } - syncer.svcNegLister.Add(neg) + } - err = syncer.ensureNetworkEndpointGroups() - if !tc.expectErr && err != nil { - t.Errorf("Expected no error, but got: %v", err) - } else if tc.expectErr && err == nil { - t.Errorf("Expected error, but got none") - } + // Since timestamp gets truncated to the second, there is a chance that the timestamps will be the same as LastTransitionTime or LastSyncTime so use creation TS from an earlier date. + creationTS := v1.Date(2020, time.July, 23, 0, 0, 0, 0, time.UTC) + //Create NEG CR for Syncer to update status on + origCR := createNegCR(testNegName, creationTS, tc.crStatusPopulated, tc.crStatusPopulated, refs) + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } + syncer.svcNegLister.Add(neg) - negCR, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) - if err != nil { - t.Errorf("Failed to get NEG from neg client: %s", err) - } - ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion()) - if len(expectedNegRefs) == 0 && !tc.expectErr { - expectedNegRefs = negObjectReferences(ret) - } - // if error occurs, expect that neg object references are not populated - if tc.expectErr && !tc.crStatusPopulated { - expectedNegRefs = nil - } + err = syncer.ensureNetworkEndpointGroups() + if !tc.expectErr && err != nil { + t.Errorf("Expected no error, but got: %v", err) + } else if tc.expectErr && err == nil { + t.Errorf("Expected error, but got none") + } - checkNegCR(t, negCR, creationTS, expectZones, expectedNegRefs, false, tc.expectErr) - if tc.expectErr { - // If status is already populated, expect no change even when error occurs - checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionFalse, true) - } else if tc.crStatusPopulated { - checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionTrue, false) - } else { - checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionTrue, true) - } + negCR, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) + if err != nil { + t.Errorf("Failed to get NEG from neg client: %s", err) + } + ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion()) + if len(expectedNegRefs) == 0 && !tc.expectErr { + expectedNegRefs = negObjectReferences(ret) + } + // if error occurs, expect that neg object references are not populated + if tc.expectErr && !tc.crStatusPopulated { + expectedNegRefs = nil + } - if tc.expectErr || tc.negExists { - // Errored, so no expectation on created negs or negs were created beforehand - return - } + checkNegCR(t, negCR, creationTS, expectZones, expectedNegRefs, false, tc.expectErr) + if tc.expectErr { + // If status is already populated, expect no change even when error occurs + checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionFalse, true) + } else if tc.crStatusPopulated { + checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionTrue, false) + } else { + checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionTrue, true) + } - // Verify the NEGs are created as expected - retZones := sets.NewString() + if tc.expectErr || tc.negExists { + // Errored, so no expectation on created negs or negs were created beforehand + return + } - for key, neg := range ret { - retZones.Insert(key.Zone) - if neg.Name != testNegName { - t.Errorf("Unexpected neg %q, expected %q", neg.Name, testNegName) - } + // Verify the NEGs are created as expected + retZones := sets.NewString() - checkNegDescription(t, syncer, neg.Description) + for key, neg := range ret { + retZones.Insert(key.Zone) + if neg.Name != testNegName { + t.Errorf("Unexpected neg %q, expected %q", neg.Name, testNegName) } - if !expectZones.Equal(retZones) { - t.Errorf("Expected to find these zones: %+v, instead found: %+v", expectZones, retZones) - } - }) + checkNegDescription(t, syncer, neg.Description) + } - negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Delete(context2.TODO(), testNegName, v1.DeleteOptions{}) + if !expectZones.Equal(retZones) { + t.Errorf("Expected to find these zones: %+v, instead found: %+v", expectZones, retZones) + } + }) - syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone1, syncer.NegSyncerKey.GetAPIVersion()) - syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone2, syncer.NegSyncerKey.GetAPIVersion()) - syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone3, syncer.NegSyncerKey.GetAPIVersion()) - syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone4, syncer.NegSyncerKey.GetAPIVersion()) + negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Delete(context2.TODO(), testNegName, v1.DeleteOptions{}) + + syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone1, syncer.NegSyncerKey.GetAPIVersion()) + syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone2, syncer.NegSyncerKey.GetAPIVersion()) + syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone3, syncer.NegSyncerKey.GetAPIVersion()) + syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone4, syncer.NegSyncerKey.GetAPIVersion()) - } } } @@ -1171,63 +1163,61 @@ func TestUpdateStatus(t *testing.T) { expectedNeedInit: true, }, } - for _, enableEndpointSlices := range []bool{false, true} { - for _, syncErr := range []error{nil, fmt.Errorf("error")} { - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) - _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false, enableEndpointSlices) - svcNegClient := syncer.svcNegClient - syncer.needInit = false - if len(tc.negRefs) == 0 { - err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ - Version: syncer.NegSyncerKey.GetAPIVersion(), - Name: testNegName, - NetworkEndpointType: string(syncer.NegSyncerKey.NegType), - Network: fakeCloud.NetworkURL(), - Subnetwork: fakeCloud.SubnetworkURL(), - Description: "", - }, testZone1) - - _, err = fakeCloud.GetNetworkEndpointGroup(testNegName, testZone1, syncer.NegSyncerKey.GetAPIVersion()) - if err != nil { - t.Errorf("failed to get neg from cloud: %s ", err) - } - } - - // Since timestamp gets truncated to the second, there is a chance that the timestamps will be the same as LastTransitionTime or LastSyncTime so use creation TS from an earlier date - creationTS := v1.Date(2020, time.July, 23, 0, 0, 0, 0, time.UTC) - origCR := createNegCR(testNegName, creationTS, tc.populateConditions[negv1beta1.Initialized], tc.populateConditions[negv1beta1.Synced], tc.negRefs) - origCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) + for _, syncErr := range []error{nil, fmt.Errorf("error")} { + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false) + svcNegClient := syncer.svcNegClient + syncer.needInit = false + if len(tc.negRefs) == 0 { + err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: testNegName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + Description: "", + }, testZone1) + + _, err = fakeCloud.GetNetworkEndpointGroup(testNegName, testZone1, syncer.NegSyncerKey.GetAPIVersion()) if err != nil { - t.Errorf("Failed to create test NEG CR: %s", err) + t.Errorf("failed to get neg from cloud: %s ", err) } - syncer.svcNegLister.Add(origCR) + } - syncer.updateStatus(syncErr) + // Since timestamp gets truncated to the second, there is a chance that the timestamps will be the same as LastTransitionTime or LastSyncTime so use creation TS from an earlier date + creationTS := v1.Date(2020, time.July, 23, 0, 0, 0, 0, time.UTC) + origCR := createNegCR(testNegName, creationTS, tc.populateConditions[negv1beta1.Initialized], tc.populateConditions[negv1beta1.Synced], tc.negRefs) + origCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } + syncer.svcNegLister.Add(origCR) - negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) - if err != nil { - t.Errorf("Failed to create test NEG CR: %s", err) - } + syncer.updateStatus(syncErr) - if syncErr != nil { - checkCondition(t, negCR.Status.Conditions, negv1beta1.Synced, creationTS, corev1.ConditionFalse, true) - } else if tc.populateConditions[negv1beta1.Synced] { - checkCondition(t, negCR.Status.Conditions, negv1beta1.Synced, creationTS, corev1.ConditionTrue, false) - } else { - checkCondition(t, negCR.Status.Conditions, negv1beta1.Synced, creationTS, corev1.ConditionTrue, true) - } + negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } - if syncer.needInit != tc.expectedNeedInit { - t.Errorf("expected manager.needInit to be %t, but was %t", tc.expectedNeedInit, syncer.needInit) - } + if syncErr != nil { + checkCondition(t, negCR.Status.Conditions, negv1beta1.Synced, creationTS, corev1.ConditionFalse, true) + } else if tc.populateConditions[negv1beta1.Synced] { + checkCondition(t, negCR.Status.Conditions, negv1beta1.Synced, creationTS, corev1.ConditionTrue, false) + } else { + checkCondition(t, negCR.Status.Conditions, negv1beta1.Synced, creationTS, corev1.ConditionTrue, true) + } - if !creationTS.Before(&negCR.Status.LastSyncTime) { - t.Errorf("neg cr should have an updated LastSyncTime") - } - }) - } + if syncer.needInit != tc.expectedNeedInit { + t.Errorf("expected manager.needInit to be %t, but was %t", tc.expectedNeedInit, syncer.needInit) + } + + if !creationTS.Before(&negCR.Status.LastSyncTime) { + t.Errorf("neg cr should have an updated LastSyncTime") + } + }) } } } @@ -1262,7 +1252,7 @@ func TestIsZoneChange(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false, false) + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false) fakeZoneGetter := syncer.zoneGetter.(*negtypes.FakeZoneGetter) origZones, err := fakeZoneGetter.ListZones(negtypes.NodePredicateForEndpointCalculatorMode(syncer.EpCalculatorMode)) @@ -1320,9 +1310,6 @@ func TestUnknownNodes(t *testing.T) { testIP3 := "10.100.2.1" testPort := int64(80) - testEndpoint := getTestEndpoint(testService, testNamespace) - testEndpoint.Subsets[0].Addresses[0].NodeName = utilpointer.StringPtr("unknown-node") - testEndpointSlices := getTestEndpointSlices(testService, testNamespace) testEndpointSlices[0].Endpoints[0].NodeName = utilpointer.StringPtr("unknown-node") testEndpointMap := map[string]*composite.NetworkEndpoint{ @@ -1367,56 +1354,47 @@ func TestUnknownNodes(t *testing.T) { }, } - for _, enableEndpointSlice := range []bool{true, false} { - _, s := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, enableEndpointSlice) - s.needInit = false - if enableEndpointSlice { - for _, e := range testEndpointSlices { - s.endpointSliceLister.Add(e) - } - } else { - s.endpointLister.Add(testEndpoint) - } + _, s := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false) + s.needInit = false - for _, eps := range testEndpointSlices { - s.endpointSliceLister.Add(eps) - } - s.svcNegLister.Add(neg) - // mark syncer as started without starting the syncer routine - (s.syncer.(*syncer)).stopped = false + for _, eps := range testEndpointSlices { + s.endpointSliceLister.Add(eps) + } + s.svcNegLister.Add(neg) + // mark syncer as started without starting the syncer routine + (s.syncer.(*syncer)).stopped = false - err := s.syncInternal() - if err == nil { - t.Errorf("syncInternal returned nil, expected an error") - } + err := s.syncInternal() + if err == nil { + t.Errorf("syncInternal returned nil, expected an error") + } - // Check that unknown zone did not cause endpoints to be removed - out, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode) - if err != nil { - t.Errorf("errored retrieving existing network endpoints") - } + // Check that unknown zone did not cause endpoints to be removed + out, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode) + if err != nil { + t.Errorf("errored retrieving existing network endpoints") + } - expectedEndpoints := map[string]negtypes.NetworkEndpointSet{ - negtypes.TestZone1: negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{IP: testIP1, Node: negtypes.TestInstance1, Port: strconv.Itoa(int(testPort))}, - ), - negtypes.TestZone2: negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{IP: testIP2, Node: negtypes.TestInstance3, Port: strconv.Itoa(int(testPort))}, - ), - negtypes.TestZone4: negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{IP: testIP3, Node: negtypes.TestUpgradeInstance1, Port: strconv.Itoa(int(testPort))}, - ), - } + expectedEndpoints := map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{IP: testIP1, Node: negtypes.TestInstance1, Port: strconv.Itoa(int(testPort))}, + ), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{IP: testIP2, Node: negtypes.TestInstance3, Port: strconv.Itoa(int(testPort))}, + ), + negtypes.TestZone4: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{IP: testIP3, Node: negtypes.TestUpgradeInstance1, Port: strconv.Itoa(int(testPort))}, + ), + } - if !reflect.DeepEqual(expectedEndpoints, out) { - t.Errorf("endpoints were modified after syncInternal:\ngot %+v,\n expected %+v", out, expectedEndpoints) - } + if !reflect.DeepEqual(expectedEndpoints, out) { + t.Errorf("endpoints were modified after syncInternal:\ngot %+v,\n expected %+v", out, expectedEndpoints) } } func TestIsValidEndpointInfo(t *testing.T) { t.Parallel() - _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false) instance1 := testInstance1 instance2 := testInstance2 @@ -1998,7 +1976,7 @@ func TestIsValidEndpointInfo(t *testing.T) { func TestIsValidEPField(t *testing.T) { t.Parallel() - _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false) instance1 := testInstance1 instance2 := testInstance2 @@ -2365,7 +2343,7 @@ func TestIsValidEPBatch(t *testing.T) { mockGCE.MockNetworkEndpointGroups.AttachNetworkEndpointsHook = func(ctx context2.Context, key *meta.Key, arg0 *compute.NetworkEndpointGroupsAttachEndpointsRequest, neg *cloud.MockNetworkEndpointGroups) error { return tc.APIResponse } - _, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true) + _, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false) err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion()) if got, reason := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect && reason != tc.expectedReason { @@ -2375,13 +2353,13 @@ func TestIsValidEPBatch(t *testing.T) { } } -func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode, enableEndpointSlices bool) (negtypes.NegSyncer, *transactionSyncer) { - negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false, enableEndpointSlices) +func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) { + negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false) ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO()) return negsyncer, ts } -func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negType negtypes.NetworkEndpointType, customName bool, enableEndpointSlices bool) (negtypes.NegSyncer, *transactionSyncer) { +func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negType negtypes.NetworkEndpointType, customName bool) (negtypes.NegSyncer, *transactionSyncer) { testContext := negtypes.NewTestContext() svcPort := negtypes.NegSyncerKey{ Namespace: testNamespace, @@ -2414,7 +2392,6 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp fakeZoneGetter, testContext.PodInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(), - testContext.EndpointInformer.GetIndexer(), testContext.EndpointSliceInformer.GetIndexer(), testContext.NodeInformer.GetIndexer(), testContext.SvcNegInformer.GetIndexer(), @@ -2425,7 +2402,6 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp testContext.SvcNegClient, metrics.FakeSyncerMetrics(), customName, - enableEndpointSlices, klog.TODO(), ) transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer) diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 4c2b0b2dc3..6c17e46f09 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -25,7 +25,6 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" apiv1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" @@ -225,7 +224,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService } // toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints -func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, podLister cache.Indexer, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) { +func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) { zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} networkEndpointPodMap := negtypes.EndpointPodMap{} dupCount := 0 @@ -271,26 +270,19 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes. zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet() } - // TODO: This check and EndpointsData.Ready field may be deleted once Endpoints support is removed. - // The purpose of this check is to handle endpoints in terminating state. - // The Endpoints API doesn't have terminating field. Terminating endpoints are marked as not ready. - // This check support this case. For not ready endpoints it checks if the endpoint is not yet ready or terminating. - // The EndpointSlices API has terminating field which solves this problem. - if endpointAddress.Ready || shouldPodBeInNeg(podLister, endpointAddress.TargetRef.Namespace, endpointAddress.TargetRef.Name) { - for _, address := range endpointAddress.Addresses { - networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: *endpointAddress.NodeName} - if networkEndpointType == negtypes.NonGCPPrivateEndpointType { - // Non-GCP network endpoints don't have associated nodes. - networkEndpoint.Node = "" - } - zoneNetworkEndpointMap[zone].Insert(networkEndpoint) - - // increment the count for duplicated endpoint - if _, contains := networkEndpointPodMap[networkEndpoint]; contains { - dupCount += 1 - } - networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name} + for _, address := range endpointAddress.Addresses { + networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: *endpointAddress.NodeName} + if networkEndpointType == negtypes.NonGCPPrivateEndpointType { + // Non-GCP network endpoints don't have associated nodes. + networkEndpoint.Node = "" } + zoneNetworkEndpointMap[zone].Insert(networkEndpoint) + + // increment the count for duplicated endpoint + if _, contains := networkEndpointPodMap[networkEndpoint]; contains { + dupCount += 1 + } + networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name} } } } @@ -372,34 +364,3 @@ func makeEndpointBatch(endpoints negtypes.NetworkEndpointSet, negType negtypes.N } return endpointBatch, nil } - -func keyFunc(namespace, name string) string { - return fmt.Sprintf("%s/%s", namespace, name) -} - -// shouldPodBeInNeg returns true if pod is not in graceful termination state -func shouldPodBeInNeg(podLister cache.Indexer, namespace, name string) bool { - if podLister == nil { - return false - } - key := keyFunc(namespace, name) - obj, exists, err := podLister.GetByKey(key) - if err != nil { - klog.Errorf("Failed to retrieve pod %s from pod lister: %v", key, err) - return false - } - if !exists { - return false - } - pod, ok := obj.(*v1.Pod) - if !ok { - klog.Errorf("Failed to convert obj %s to v1.Pod. The object type is %T", key, obj) - return false - } - - // if pod has DeletionTimestamp, that means pod is in graceful termination state. - if pod.DeletionTimestamp != nil { - return false - } - return true -} diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 0aa82a6aa5..74c9a9ad6a 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -441,35 +441,6 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { func TestToZoneNetworkEndpointMapUtil(t *testing.T) { t.Parallel() - _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, false) - podLister := transactionSyncer.podLister - - // add all pods in default endpoint into podLister - for i := 1; i <= 12; i++ { - podLister.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: fmt.Sprintf("pod%v", i), - }, - }) - } - // pod6 is deleted - podLister.Update(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: "pod6", - DeletionTimestamp: &metav1.Time{}, - }, - }) - - podLister.Update(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: "pod12", - DeletionTimestamp: &metav1.Time{}, - }, - }) - zoneGetter := negtypes.NewFakeZoneGetter() testCases := []struct { desc string @@ -492,8 +463,9 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { negtypes.TestZone1: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"), networkEndpointFromEncodedEndpoint("10.100.1.2||instance1||80"), - networkEndpointFromEncodedEndpoint("10.100.2.1||instance2||80"), - networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80")), + networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"), + networkEndpointFromEncodedEndpoint("10.100.1.4||instance1||80"), + networkEndpointFromEncodedEndpoint("10.100.2.1||instance2||80")), negtypes.TestZone2: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80")), }, @@ -503,6 +475,7 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.2.1||instance2||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"}, networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"}, networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"}, + networkEndpointFromEncodedEndpoint("10.100.1.4||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod6"}, }, networkEndpointType: negtypes.VmIpPortEndpointType, }, @@ -516,7 +489,8 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.4.1||instance4||81"), networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"), networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"), - networkEndpointFromEncodedEndpoint("10.100.4.3||instance4||81")), + networkEndpointFromEncodedEndpoint("10.100.4.3||instance4||81"), + networkEndpointFromEncodedEndpoint("10.100.4.4||instance4||8081")), }, expectMap: negtypes.EndpointPodMap{ networkEndpointFromEncodedEndpoint("10.100.2.2||instance2||81"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod7"}, @@ -524,6 +498,7 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.4.3||instance4||81"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod9"}, networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod10"}, networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod11"}, + networkEndpointFromEncodedEndpoint("10.100.4.4||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod12"}, }, networkEndpointType: negtypes.VmIpPortEndpointType, }, @@ -534,8 +509,9 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { negtypes.TestZone1: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.1.1||||80"), networkEndpointFromEncodedEndpoint("10.100.1.2||||80"), - networkEndpointFromEncodedEndpoint("10.100.2.1||||80"), - networkEndpointFromEncodedEndpoint("10.100.1.3||||80")), + networkEndpointFromEncodedEndpoint("10.100.1.3||||80"), + networkEndpointFromEncodedEndpoint("10.100.1.4||||80"), + networkEndpointFromEncodedEndpoint("10.100.2.1||||80")), negtypes.TestZone2: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.3.1||||80")), }, @@ -545,13 +521,14 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.2.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"}, networkEndpointFromEncodedEndpoint("10.100.3.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"}, networkEndpointFromEncodedEndpoint("10.100.1.3||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"}, + networkEndpointFromEncodedEndpoint("10.100.1.4||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod6"}, }, networkEndpointType: negtypes.NonGCPPrivateEndpointType, }, } for _, tc := range testCases { - retSet, retMap, _, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpoints(getDefaultEndpoint()), zoneGetter, tc.portName, podLister, tc.networkEndpointType) + retSet, retMap, _, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, tc.portName, tc.networkEndpointType) if err != nil { t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) } @@ -883,82 +860,6 @@ func TestMakeEndpointBatch(t *testing.T) { } } -func TestShouldPodBeInNeg(t *testing.T) { - t.Parallel() - - _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, false) - - podLister := transactionSyncer.podLister - - namespace1 := "ns1" - namespace2 := "ns2" - name1 := "n1" - name2 := "n2" - - podLister.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace1, - Name: name1, - }, - }) - - // deleted pod - podLister.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace1, - Name: name2, - DeletionTimestamp: &metav1.Time{}, - }, - }) - - podLister.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace2, - Name: name2, - }, - }) - - for _, tc := range []struct { - desc string - namespace string - name string - expect bool - }{ - { - desc: "empty input", - }, - { - desc: "non exists pod", - namespace: "non exists", - name: "non exists", - expect: false, - }, - { - desc: "pod exists and not deleted", - namespace: namespace1, - name: name1, - expect: true, - }, - { - desc: "pod exists and deleted", - namespace: namespace1, - name: name2, - expect: false, - }, - { - desc: "pod exists and not deleted 2", - namespace: namespace2, - name: name2, - expect: true, - }, - } { - ret := shouldPodBeInNeg(podLister, tc.namespace, tc.name) - if ret != tc.expect { - t.Errorf("For test case %q, endpointSets output = %+v, but got %+v", tc.desc, tc.expect, ret) - } - } -} - func TestNameUniqueness(t *testing.T) { var ( testZone = "test-zone" @@ -1513,157 +1414,3 @@ func getTestEndpointSlices(name, namespace string) []*discovery.EndpointSlice { }, } } - -func getTestEndpoint(name, namespace string) *v1.Endpoints { - instance1 := negtypes.TestInstance1 - instance2 := negtypes.TestInstance2 - instance3 := negtypes.TestInstance3 - instance4 := negtypes.TestInstance4 - return &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Subsets: []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{ - { - IP: "10.100.1.1", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod1", - }, - }, - { - IP: "10.100.1.2", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod2", - }, - }, - { - IP: "10.100.2.1", - NodeName: &instance2, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod3", - }, - }, - { - IP: "10.100.3.1", - NodeName: &instance3, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod4", - }, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "10.100.1.3", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod5", - }, - }, - { - IP: "10.100.1.4", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod6", - }, - }, - }, - Ports: []v1.EndpointPort{ - { - Name: "", - Port: int32(80), - Protocol: v1.ProtocolTCP, - }, - }, - }, - { - Addresses: []v1.EndpointAddress{ - { - IP: "10.100.2.2", - NodeName: &instance2, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod7", - }, - }, - { - IP: "10.100.4.1", - NodeName: &instance4, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod8", - }, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "10.100.4.3", - NodeName: &instance4, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod9", - }, - }, - }, - Ports: []v1.EndpointPort{ - { - Name: testNamedPort, - Port: int32(81), - Protocol: v1.ProtocolTCP, - }, - }, - }, - { - Addresses: []v1.EndpointAddress{ - { - IP: "10.100.3.2", - NodeName: &instance3, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod10", - }, - }, - { - IP: "10.100.4.2", - NodeName: &instance4, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod11", - }, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "10.100.4.4", - NodeName: &instance4, - TargetRef: &v1.ObjectReference{ - Namespace: namespace, - Name: "pod12", - }, - }, - }, - Ports: []v1.EndpointPort{ - { - Name: testNamedPort, - Port: int32(8081), - Protocol: v1.ProtocolTCP, - }, - }, - }, - }, - } -} - -func getDefaultEndpoint() *v1.Endpoints { - return getTestEndpoint(testServiceName, testServiceNamespace) -} diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index 0c7144adce..ded5284364 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -314,27 +314,6 @@ type AddressData struct { Ready bool } -// Converts API Endpoints to the EndpointsData abstraction. -// All endpoints are converted, not ready addresses are converted with Ready=false. -func EndpointsDataFromEndpoints(ep *apiv1.Endpoints) []EndpointsData { - result := make([]EndpointsData, 0, len(ep.Subsets)) - for _, subset := range ep.Subsets { - ports := make([]PortData, 0) - addresses := make([]AddressData, 0) - for _, port := range subset.Ports { - ports = append(ports, PortData{Name: port.Name, Port: port.Port}) - } - for _, addr := range subset.Addresses { - addresses = append(addresses, AddressData{TargetRef: addr.TargetRef, NodeName: addr.NodeName, Addresses: []string{addr.IP}, Ready: true}) - } - for _, addr := range subset.NotReadyAddresses { - addresses = append(addresses, AddressData{TargetRef: addr.TargetRef, NodeName: addr.NodeName, Addresses: []string{addr.IP}, Ready: false}) - } - result = append(result, EndpointsData{Meta: &ep.ObjectMeta, Ports: ports, Addresses: addresses}) - } - return result -} - // Converts API EndpointSlice list to the EndpointsData abstraction. // Terminating endpoints are ignored. func EndpointsDataFromEndpointSlices(slices []*discovery.EndpointSlice) []EndpointsData { diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index 66b32255c6..6f0b3776a1 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -435,121 +435,6 @@ func TestCustomNamedNegs(t *testing.T) { } } -func TestEndpointsDataFromEndpoints(t *testing.T) { - t.Parallel() - instance1 := TestInstance1 - instance2 := TestInstance2 - instance4 := TestInstance4 - testServiceName := "service" - testServiceNamespace := "namespace" - testNamedPort := "port1" - endpoints := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: testServiceName, - Namespace: testServiceNamespace, - }, - Subsets: []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{ - { - IP: "10.100.1.1", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod1", - }, - }, - { - IP: "10.100.1.2", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod2", - }, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "10.100.1.3", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod5", - }, - }, - { - IP: "10.100.1.4", - NodeName: &instance1, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod6", - }, - }, - }, - Ports: []v1.EndpointPort{ - { - Name: "", - Port: int32(80), - Protocol: v1.ProtocolTCP, - }, - }, - }, - { - Addresses: []v1.EndpointAddress{ - { - IP: "10.100.2.2", - NodeName: &instance2, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod7", - }, - }, - { - IP: "10.100.4.1", - NodeName: &instance4, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod8", - }, - }, - }, - NotReadyAddresses: []v1.EndpointAddress{ - { - IP: "10.100.4.3", - NodeName: &instance4, - TargetRef: &v1.ObjectReference{ - Namespace: testServiceNamespace, - Name: "pod9", - }, - }, - }, - Ports: []v1.EndpointPort{ - { - Name: testNamedPort, - Port: int32(81), - Protocol: v1.ProtocolTCP, - }, - }, - }, - }, - } - endpointsData := EndpointsDataFromEndpoints(endpoints) - - if len(endpointsData) != 2 { - t.Errorf("Expected the same number of endpoints subsets and endpoints data, got %d endpoints data for 2 subsets", len(endpointsData)) - } - for i, subset := range endpoints.Subsets { - for j, port := range subset.Ports { - ValidatePortData(endpointsData[i].Ports[j], port.Port, port.Name, t) - } - ValidateAddressDataForEndpointsAddresses(endpointsData[i].Addresses, subset.Addresses, true, t) - ValidateAddressDataForEndpointsAddresses(endpointsData[i].Addresses, subset.NotReadyAddresses, false, t) - if len(endpointsData[i].Addresses) != len(subset.Addresses)+len(subset.NotReadyAddresses) { - t.Errorf("Unexpected len of endpointsData addresses, got %d, expected %d", len(endpointsData[i].Addresses), len(subset.Addresses)+len(subset.NotReadyAddresses)) - } - } -} - func TestEndpointsDataFromEndpointSlices(t *testing.T) { t.Parallel() instance1 := TestInstance1