diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index d5058ff6e7..e318d7e58f 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -242,7 +242,7 @@ func (s *transactionSyncer) syncInternalImpl() error { } endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if s.invalidEndpointInfo(endpointsData, endpointPodMap, dupCount) { + if s.invalidEndpointInfo(endpointsData, endpointPodMap, dupCount) || s.isZoneMissing(targetMap) { s.setErrorState() } if err != nil { @@ -362,6 +362,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { // 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap: // endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed // and we compare the endpoint counts with duplicates +// 2. There is at least one endpoint in endpointData with missing nodeName func (s *transactionSyncer) invalidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool { // Endpoint count from EndpointPodMap countFromPodMap := len(endpointPodMap) + dupCount @@ -370,7 +371,15 @@ func (s *transactionSyncer) invalidEndpointInfo(eds []negtypes.EndpointsData, en countFromEndpointData := 0 for _, ed := range eds { countFromEndpointData += len(ed.Addresses) + for _, endpointAddress := range ed.Addresses { + nodeName := endpointAddress.NodeName + if nodeName == nil || len(*nodeName) == 0 { + s.logger.Info("Detected error when checking nodeName, marking syncer in error state", "endpoint", endpointAddress, "endpointData", eds) + return true + } + } } + if countFromEndpointData != countFromPodMap { s.logger.Info("Detected error when comparing endpoint counts, marking syncer in error state", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount) return true @@ -378,6 +387,15 @@ func (s *transactionSyncer) invalidEndpointInfo(eds []negtypes.EndpointsData, en return false } +// isZoneMissing returns true if there is invalid(empty) zone in zoneNetworkEndpointMap +func (s *transactionSyncer) isZoneMissing(zoneNetworkEndpointMap map[string]negtypes.NetworkEndpointSet) bool { + if _, isPresent := zoneNetworkEndpointMap[""]; isPresent { + s.logger.Info("Detected error when checking missing zone, marking syncer in error state", "zoneNetworkEndpointMap", zoneNetworkEndpointMap) + return true + } + return false +} + // syncNetworkEndpoints spins off go routines to execute NEG operations func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 916ee81737..ef97aaaa08 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1765,6 +1765,291 @@ func TestInvalidEndpointInfoEndpointCountsDiffer(t *testing.T) { } } +func TestInvalidEndpointInfoNodeNameMissing(t *testing.T) { + t.Parallel() + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + + instance1 := testInstance1 + instance2 := testInstance2 + instance4 := testInstance4 + testPortName := "port1" + testServiceNamespace := "namespace" + port80 := int32(80) + port81 := int32(81) + testIP1 := "10.100.1.1" + testIP2 := "10.100.1.2" + testIP3 := "10.100.2.2" + testIP4 := "10.100.4.1" + testPodName1 := "pod1" + testPodName2 := "pod2" + testPodName3 := "pod3" + testPodName4 := "pod4" + + endpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{ + { + IP: testIP1, + Port: "80", + Node: instance1, + }: { + Namespace: testServiceNamespace, + Name: testPodName1, + }, + { + IP: testIP2, + Port: "80", + Node: instance1, + }: { + Namespace: testServiceNamespace, + Name: testPodName2, + }, + { + IP: testIP3, + Port: "81", + Node: instance2, + }: { + Namespace: testServiceNamespace, + Name: testPodName3, + }, + { + IP: testIP4, + Port: "81", + Node: instance4, + }: { + Namespace: testServiceNamespace, + Name: testPodName4, + }, + } + + testCases := []struct { + desc string + endpointsData []negtypes.EndpointsData + expect bool + }{ + { + desc: "no missing nodeNames", + endpointsData: []negtypes.EndpointsData{ + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port80, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName1, + }, + NodeName: &instance1, + Addresses: []string{testIP1}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName2, + }, + NodeName: &instance1, + Addresses: []string{testIP2}, + Ready: true, + }, + }, + }, + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-2", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port81, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName3, + }, + NodeName: &instance2, + Addresses: []string{testIP3}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName4, + }, + NodeName: &instance4, + Addresses: []string{testIP4}, + Ready: true, + }, + }, + }, + }, + expect: false, + }, + { + desc: "at least one endpoint is missing a nodeName", + endpointsData: []negtypes.EndpointsData{ + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port80, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName1, + }, + NodeName: nil, + Addresses: []string{testIP1}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName2, + }, + NodeName: &instance1, + Addresses: []string{testIP2}, + Ready: true, + }, + }, + }, + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-2", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port81, + }, + }, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName3, + }, + NodeName: &instance2, + Addresses: []string{testIP3}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName4, + }, + NodeName: &instance4, + Addresses: []string{testIP4}, + Ready: true, + }, + }, + }, + }, + expect: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + if got := transactionSyncer.invalidEndpointInfo(tc.endpointsData, endpointPodMap, 0); got != tc.expect { + t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect) + } + }) + } +} + +func TestIsZoneMissing(t *testing.T) { + t.Parallel() + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + instance1 := testInstance1 + instance2 := testInstance2 + + zone1 := "us-central1-a" + zone2 := "us-central1-b" + + testCases := []struct { + desc string + zoneNetworkEndpointMap map[string]negtypes.NetworkEndpointSet + expect bool + }{ + { + desc: "no missing zones", + zoneNetworkEndpointMap: map[string]negtypes.NetworkEndpointSet{ + zone1: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.1.2", + Port: "80", + Node: instance1, + }, + negtypes.NetworkEndpoint{ + IP: "10.100.1.3", + Port: "80", + Node: instance1, + }, + ), + zone2: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.2.2", + Port: "81", + Node: instance2, + }, + ), + }, + expect: false, + }, + { + desc: "contains one missing zone", + zoneNetworkEndpointMap: map[string]negtypes.NetworkEndpointSet{ + zone1: negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.1.2", + Port: "80", + Node: instance1, + }, + negtypes.NetworkEndpoint{ + IP: "10.100.1.3", + Port: "80", + Node: instance1, + }, + ), + "": negtypes.NewNetworkEndpointSet( + negtypes.NetworkEndpoint{ + IP: "10.100.2.2", + Port: "81", + Node: instance2, + }, + ), + }, + expect: true, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + if got := transactionSyncer.isZoneMissing(tc.zoneNetworkEndpointMap); got != tc.expect { + t.Errorf("isZoneMissing() = %t, expected %t", got, tc.expect) + } + }) + } +} + func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode, enableEndpointSlices bool) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false, enableEndpointSlices) ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO())