diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index e27da180f0..052f9793de 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -235,6 +235,9 @@ func (s *transactionSyncer) syncInternalImpl() error { } endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) targetMap, endpointPodMap, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) + if s.isNodeNameMissing(endpointsData) || s.isZoneMissing(targetMap) { + s.setErrorState() + } if err != nil { return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) } @@ -346,6 +349,27 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { return utilerrors.NewAggregate(errList) } +// isNodeNameMissing returns true if there is endpoint missing nodeName +func (s *transactionSyncer) isNodeNameMissing(eds []negtypes.EndpointsData) bool { + for _, ed := range eds { + for _, endpointAddress := range ed.Addresses { + nodeName := endpointAddress.NodeName + if nodeName == nil || len(*nodeName) == 0 { + return true + } + } + } + return false +} + +// isZoneMissing returns true if there is invalid(empty) zone in zoneNetworkEndpointMap +func (s *transactionSyncer) isZoneMissing(zoneNetworkEndpointMap map[string]negtypes.NetworkEndpointSet) bool { + if _, isPresent := zoneNetworkEndpointMap[""]; isPresent { + return true + } + return false +} + // syncNetworkEndpoints spins off go routines to execute NEG operations func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 3fbef9f631..9ce78ee86f 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1409,6 +1409,262 @@ func TestUnknownNodes(t *testing.T) { } } +func TestIsNodeNameMissing(t *testing.T) { + t.Parallel() + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + + instance1 := testInstance1 + instance2 := testInstance2 + instance4 := testInstance4 + testPortName := "port1" + testServiceNamespace := "namespace" + port80 := int32(80) + port81 := int32(81) + + testCases := []struct { + desc string + endpointsData []negtypes.EndpointsData + expect bool + }{ + { + desc: "no missing nodeNames", + endpointsData: []negtypes.EndpointsData{ + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port80, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + NodeName: &instance1, + Addresses: []string{"10.100.1.1"}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + NodeName: &instance1, + Addresses: []string{"10.100.1.2"}, + Ready: true, + }, + }, + }, + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-2", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port81, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod7", + }, + NodeName: &instance2, + Addresses: []string{"10.100.2.2"}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod8", + }, + NodeName: &instance4, + Addresses: []string{"10.100.4.1"}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod9", + }, + NodeName: &instance4, + Addresses: []string{"10.100.4.3"}, + Ready: true, + }, + }, + }, + }, + expect: false, + }, + { + desc: "contains missing nodeName", + endpointsData: []negtypes.EndpointsData{ + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port80, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + NodeName: nil, + Addresses: []string{"10.100.1.1"}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + NodeName: &instance1, + Addresses: []string{"10.100.1.2"}, + Ready: true, + }, + }, + }, + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-2", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port81, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod7", + }, + NodeName: &instance2, + Addresses: []string{"10.100.2.2"}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod8", + }, + NodeName: &instance4, + Addresses: []string{"10.100.4.1"}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod9", + }, + NodeName: &instance4, + Addresses: []string{"10.100.4.3"}, + Ready: true, + }, + }, + }, + }, + expect: true, + }, + } + + for _, tc := range testCases { + if got := transactionSyncer.isNodeNameMissing(tc.endpointsData); got != tc.expect { + t.Errorf("For case %q, expect equal to be %v, but got %v", tc.desc, tc.expect, got) + } + } +} + +func TestIsZoneMissing(t *testing.T) { + t.Parallel() + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + instance1 := testInstance1 + instance2 := testInstance2 + + zone1 := "us-central1-a" + zone2 := "us-central1-b" + + testCases := []struct { + desc string + zoneNetworkEndpointMap map[string]negtypes.NetworkEndpointSet + expect bool + }{ + { + desc: "no missing zones", + zoneNetworkEndpointMap: map[string]negtypes.NetworkEndpointSet{ + zone1: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.1.2", + Port: "80", + Node: instance1, + }, + negtypes.NetworkEndpoint{ + IP: "10.100.1.3", + Port: "80", + Node: instance1, + }, + ), + zone2: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.2.2", + Port: "81", + Node: instance2, + }, + ), + }, + expect: false, + }, + { + desc: "one missing zone", + zoneNetworkEndpointMap: map[string]negtypes.NetworkEndpointSet{ + zone1: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.1.2", + Port: "80", + Node: instance1, + }, + negtypes.NetworkEndpoint{ + IP: "10.100.1.3", + Port: "80", + Node: instance1, + }, + ), + "": negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.2.2", + Port: "81", + Node: instance2, + }, + ), + }, + expect: true, + }, + } + for _, tc := range testCases { + if got := transactionSyncer.isZoneMissing(tc.zoneNetworkEndpointMap); got != tc.expect { + t.Errorf("For case %q, expect equal to be %v, but got %v", tc.desc, tc.expect, got) + } + } +} + func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode, enableEndpointSlices bool) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false, enableEndpointSlices) ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO())