diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 219230a8d6..2cc107acf5 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -18,6 +18,7 @@ package syncers import ( "context" + "errors" "fmt" "net/http" "strings" @@ -244,7 +245,7 @@ func (s *transactionSyncer) syncInternalImpl() error { } endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if s.invalidEndpointInfo(endpointsData, endpointPodMap, dupCount) || s.isZoneMissing(targetMap) { + if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) { s.setErrorState() } if err != nil { @@ -358,67 +359,64 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { return utilerrors.NewAggregate(errList) } -// invalidEndpointInfo checks if endpoint information is correct. -// It returns true if any of the following checks fails: +// isValidEndpointInfo checks if endpoint information is correct. +// It returns false if one of the two checks fails: // // 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap: // endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed // and we compare the endpoint counts with duplicates -// 2. There is at least one endpoint in endpointData with missing nodeName -// 3. The endpoint count from endpointData or the one from endpointPodMap is 0 -func (s *transactionSyncer) invalidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool { +// 2. The endpoint count from endpointData or the one from endpointPodMap is 0 +func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool { // Endpoint count from EndpointPodMap countFromPodMap := len(endpointPodMap) + dupCount if countFromPodMap == 0 { s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap) - return true + return false } // Endpoint count from EndpointData countFromEndpointData := 0 for _, ed := range eds { countFromEndpointData += len(ed.Addresses) - for _, endpointAddress := range ed.Addresses { - nodeName := endpointAddress.NodeName - if nodeName == nil || len(*nodeName) == 0 { - s.logger.Info("Detected error when checking nodeName", "endpoint", endpointAddress, "endpointData", eds) - return true - } - } } if countFromEndpointData == 0 { s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds) - return true + return false } if countFromEndpointData != countFromPodMap { s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount) - return true + return false } - return false + return true } -// isZoneMissing returns true if there is invalid(empty) zone in zoneNetworkEndpointMap -func (s *transactionSyncer) isZoneMissing(zoneNetworkEndpointMap map[string]negtypes.NetworkEndpointSet) bool { - if _, isPresent := zoneNetworkEndpointMap[""]; isPresent { - s.logger.Info("Detected error when checking missing zone", "zoneNetworkEndpointMap", zoneNetworkEndpointMap) - return true +// isValidEPField returns false if there is endpoint with missing zone or nodeName +func (s *transactionSyncer) isValidEPField(err error) bool { + if errors.Is(err, ErrEPMissingNodeName) { + s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err) + return false + } + if errors.Is(err, ErrEPMissingZone) { + s.logger.Info("Detected unexpected error when checking missing zone", "error", err) + return false } - return false + return true } -func (s *transactionSyncer) isInvalidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool { +// isValidEPBatch returns false if the error from endpoint batch response is due to bad request +func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool { apiErr, ok := err.(*googleapi.Error) if !ok { s.logger.Info("Detected error when parsing batch request error", "operation", operation, "error", err) - return true + return false } errCode := apiErr.Code if errCode == http.StatusBadRequest { s.logger.Info("Detected error when sending endpoint batch information", "operation", operation, "errorCode", errCode) - return true + return false } - return false + return true } // syncNetworkEndpoints spins off go routines to execute NEG operations @@ -499,7 +497,7 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone)) } else { s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err)) - if s.isInvalidEPBatch(err, operation, networkEndpoints) { + if !s.isValidEPBatch(err, operation, networkEndpoints) { s.setErrorState() } } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 47b686182b..a7aa82c487 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1412,14 +1412,14 @@ func TestUnknownNodes(t *testing.T) { } } -func TestInvalidEndpointInfo(t *testing.T) { +func TestIsValidEndpointInfo(t *testing.T) { t.Parallel() _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) instance1 := testInstance1 instance2 := testInstance2 instance4 := testInstance4 - testPortName := "port1" + testPortName := "" testServiceNamespace := "namespace" port80 := int32(80) port81 := int32(81) @@ -1544,7 +1544,7 @@ func TestInvalidEndpointInfo(t *testing.T) { }, endpointPodMap: testEndpointPodMap, dupCount: 0, - expect: false, + expect: true, }, { desc: "counts equal, endpointData has duplicated endpoints", @@ -1625,7 +1625,7 @@ func TestInvalidEndpointInfo(t *testing.T) { }, endpointPodMap: testEndpointPodMap, dupCount: 1, - expect: false, + expect: true, }, { desc: "counts not equal, endpointData has no duplicated endpoints", @@ -1688,7 +1688,7 @@ func TestInvalidEndpointInfo(t *testing.T) { }, endpointPodMap: testEndpointPodMap, dupCount: 0, - expect: true, + expect: false, }, { desc: "counts not equal, endpointData has duplicated endpoints", @@ -1760,10 +1760,44 @@ func TestInvalidEndpointInfo(t *testing.T) { }, endpointPodMap: testEndpointPodMap, dupCount: 1, - expect: true, + expect: false, }, { - desc: "no missing nodeNames", + desc: "endpointData has zero endpoint", + endpointsData: []negtypes.EndpointsData{ + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port80, + }, + }, + Addresses: []negtypes.AddressData{}, + }, + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-2", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port81, + }, + }, + Addresses: []negtypes.AddressData{}, + }, + }, + endpointPodMap: testEndpointPodMap, + dupCount: 0, + expect: false, + }, + { + desc: "endpointPodMap has zero endpoint", endpointsData: []negtypes.EndpointsData{ { Meta: &metav1.ObjectMeta{ @@ -1830,12 +1864,46 @@ func TestInvalidEndpointInfo(t *testing.T) { }, }, }, - endpointPodMap: testEndpointPodMap, + endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{}, dupCount: 0, expect: false, }, { - desc: "at least one endpoint is missing a nodeName", + desc: "endpointData and endpointPodMap both have zero endpoint", + endpointsData: []negtypes.EndpointsData{ + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port80, + }, + }, + Addresses: []negtypes.AddressData{}, + }, + { + Meta: &metav1.ObjectMeta{ + Name: testServiceName + "-2", + Namespace: testServiceNamespace, + }, + Ports: []negtypes.PortData{ + { + Name: testPortName, + Port: port81, + }, + }, + Addresses: []negtypes.AddressData{}, + }, + }, + endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{}, + dupCount: 0, + expect: false, + }, + { + desc: "endpointData and endpointPodMap both have non-zero endpoints", endpointsData: []negtypes.EndpointsData{ { Meta: &metav1.ObjectMeta{ @@ -1854,7 +1922,7 @@ func TestInvalidEndpointInfo(t *testing.T) { Namespace: testServiceNamespace, Name: testPodName1, }, - NodeName: nil, + NodeName: &instance1, Addresses: []string{testIP1}, Ready: true, }, @@ -1906,8 +1974,50 @@ func TestInvalidEndpointInfo(t *testing.T) { dupCount: 0, expect: true, }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + if got := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect { + t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect) + } + }) + } +} + +func TestIsValidEPField(t *testing.T) { + t.Parallel() + _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) + + instance1 := testInstance1 + instance2 := testInstance2 + instance4 := testInstance4 + testPortName := "" + testServiceNamespace := "namespace" + port80 := int32(80) + port81 := int32(81) + testIP1 := "10.100.1.1" + testIP2 := "10.100.1.2" + testIP3 := "10.100.2.2" + testIP4 := "10.100.4.1" + testPodName1 := "pod1" + testPodName2 := "pod2" + testPodName3 := "pod3" + testPodName4 := "pod4" + emptyZoneNodeName := "empty-zone-node" + emptyNodeName := "" + fakeZoneGetter := transactionSyncer.zoneGetter.(*negtypes.FakeZoneGetter) + if err := fakeZoneGetter.AddZone("", emptyZoneNodeName); err != nil { + t.Errorf("error when adding zone, expected nil, err: %v", err) + } + + testCases := []struct { + desc string + endpointsData []negtypes.EndpointsData + expect bool + }{ { - desc: "endpointData has zero endpoint", + desc: "no missing zone or nodeName", endpointsData: []negtypes.EndpointsData{ { Meta: &metav1.ObjectMeta{ @@ -1920,7 +2030,26 @@ func TestInvalidEndpointInfo(t *testing.T) { Port: port80, }, }, - Addresses: []negtypes.AddressData{}, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName1, + }, + NodeName: &instance1, + Addresses: []string{testIP1}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName2, + }, + NodeName: &instance1, + Addresses: []string{testIP2}, + Ready: true, + }, + }, }, { Meta: &metav1.ObjectMeta{ @@ -1933,15 +2062,32 @@ func TestInvalidEndpointInfo(t *testing.T) { Port: port81, }, }, - Addresses: []negtypes.AddressData{}, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName3, + }, + NodeName: &instance2, + Addresses: []string{testIP3}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName4, + }, + NodeName: &instance4, + Addresses: []string{testIP4}, + Ready: true, + }, + }, }, }, - endpointPodMap: testEndpointPodMap, - dupCount: 0, - expect: true, + expect: true, }, { - desc: "endpointPodMap has zero endpoint", + desc: "contains one missing nodeName", endpointsData: []negtypes.EndpointsData{ { Meta: &metav1.ObjectMeta{ @@ -1960,7 +2106,7 @@ func TestInvalidEndpointInfo(t *testing.T) { Namespace: testServiceNamespace, Name: testPodName1, }, - NodeName: &instance1, + NodeName: nil, Addresses: []string{testIP1}, Ready: true, }, @@ -2008,12 +2154,10 @@ func TestInvalidEndpointInfo(t *testing.T) { }, }, }, - endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{}, - dupCount: 0, - expect: true, + expect: false, }, { - desc: "endpointData and endpointPodMap both have zero endpoint", + desc: "contains one empty nodeName", endpointsData: []negtypes.EndpointsData{ { Meta: &metav1.ObjectMeta{ @@ -2026,7 +2170,26 @@ func TestInvalidEndpointInfo(t *testing.T) { Port: port80, }, }, - Addresses: []negtypes.AddressData{}, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName1, + }, + NodeName: &emptyNodeName, + Addresses: []string{testIP1}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName2, + }, + NodeName: &instance1, + Addresses: []string{testIP2}, + Ready: true, + }, + }, }, { Meta: &metav1.ObjectMeta{ @@ -2039,15 +2202,32 @@ func TestInvalidEndpointInfo(t *testing.T) { Port: port81, }, }, - Addresses: []negtypes.AddressData{}, + Addresses: []negtypes.AddressData{ + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName3, + }, + NodeName: &instance2, + Addresses: []string{testIP3}, + Ready: true, + }, + { + TargetRef: &corev1.ObjectReference{ + Namespace: testServiceNamespace, + Name: testPodName4, + }, + NodeName: &instance4, + Addresses: []string{testIP4}, + Ready: true, + }, + }, }, }, - endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{}, - dupCount: 0, - expect: true, + expect: false, }, { - desc: "endpointData and endpointPodMap both have non-zero endpoints", + desc: "contains one missing zone", endpointsData: []negtypes.EndpointsData{ { Meta: &metav1.ObjectMeta{ @@ -2066,7 +2246,7 @@ func TestInvalidEndpointInfo(t *testing.T) { Namespace: testServiceNamespace, Name: testPodName1, }, - NodeName: &instance1, + NodeName: &emptyZoneNodeName, Addresses: []string{testIP1}, Ready: true, }, @@ -2114,96 +2294,20 @@ func TestInvalidEndpointInfo(t *testing.T) { }, }, }, - endpointPodMap: testEndpointPodMap, - dupCount: 0, - expect: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - if got := transactionSyncer.invalidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect { - t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect) - } - }) - } -} - -func TestIsZoneMissing(t *testing.T) { - t.Parallel() - _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true) - instance1 := testInstance1 - instance2 := testInstance2 - - zone1 := "us-central1-a" - zone2 := "us-central1-b" - - testCases := []struct { - desc string - zoneNetworkEndpointMap map[string]negtypes.NetworkEndpointSet - expect bool - }{ - { - desc: "no missing zones", - zoneNetworkEndpointMap: map[string]negtypes.NetworkEndpointSet{ - zone1: negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{ - IP: "10.100.1.2", - Port: "80", - Node: instance1, - }, - negtypes.NetworkEndpoint{ - IP: "10.100.1.3", - Port: "80", - Node: instance1, - }, - ), - zone2: negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{ - IP: "10.100.2.2", - Port: "81", - Node: instance2, - }, - ), - }, expect: false, }, - { - desc: "contains one missing zone", - zoneNetworkEndpointMap: map[string]negtypes.NetworkEndpointSet{ - zone1: negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{ - IP: "10.100.1.2", - Port: "80", - Node: instance1, - }, - negtypes.NetworkEndpoint{ - IP: "10.100.1.3", - Port: "80", - Node: instance1, - }, - ), - "": negtypes.NewNetworkEndpointSet( - negtypes.NetworkEndpoint{ - IP: "10.100.2.2", - Port: "81", - Node: instance2, - }, - ), - }, - expect: true, - }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - if got := transactionSyncer.isZoneMissing(tc.zoneNetworkEndpointMap); got != tc.expect { - t.Errorf("isZoneMissing() = %t, expected %t", got, tc.expect) + _, _, _, err := transactionSyncer.endpointsCalculator.CalculateEndpoints(tc.endpointsData, nil) + if got := transactionSyncer.isValidEPField(err); got != tc.expect { + t.Errorf("isValidEPField() = %t, expected %t, err: %v, ", got, tc.expect, err) } }) } } -func TestIsInvalidEPBatch(t *testing.T) { +func TestIsValidEPBatch(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) fakeCloud := negtypes.NewAdapter(fakeGCE) zone := "us-central1-a" @@ -2215,14 +2319,14 @@ func TestIsInvalidEPBatch(t *testing.T) { expect bool }{ { - desc: "NEG API call no error, status code 200", + desc: "NEG API call server error, status code 500", HttpStatusCode: http.StatusOK, - expect: false, + expect: true, }, { - desc: "NEG API call error, status code 400", + desc: "NEG API call request error, status code 400", HttpStatusCode: http.StatusBadRequest, - expect: true, + expect: false, }, } @@ -2237,7 +2341,7 @@ func TestIsInvalidEPBatch(t *testing.T) { _, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true) err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion()) - if got := transactionSyncer.isInvalidEPBatch(err, attachOp, networkEndpoints); got != tc.expect { + if got := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect { t.Errorf("isInvalidEPBatch() = %t, expected %t", got, tc.expect) } }) @@ -2275,11 +2379,12 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp // TODO(freehan): use real readiness reflector reflector := &readiness.NoopReflector{} + fakeZoneGetter := negtypes.NewFakeZoneGetter() negsyncer := NewTransactionSyncer(svcPort, record.NewFakeRecorder(100), fakeGCE, - negtypes.NewFakeZoneGetter(), + fakeZoneGetter, testContext.PodInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(), testContext.EndpointInformer.GetIndexer(), @@ -2287,7 +2392,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp testContext.NodeInformer.GetIndexer(), testContext.SvcNegInformer.GetIndexer(), reflector, - GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), negtypes.NewFakeZoneGetter(), + GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter, svcPort, mode, klog.TODO()), string(kubeSystemUID), testContext.SvcNegClient, diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index a077a26c27..c3f9291587 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -17,6 +17,7 @@ limitations under the License. package syncers import ( + "errors" "fmt" "strconv" "strings" @@ -47,6 +48,12 @@ const ( separator = "||" ) +var ( + ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field") + ErrNodeNotFound = errors.New("failed to retrieve associated zone of node") + ErrEPMissingZone = errors.New("endpoint has empty zone field") +) + // encodeEndpoint encodes ip and instance into a single string func encodeEndpoint(ip, instance, port string) string { return strings.Join([]string{ip, instance, port}, separator) @@ -257,9 +264,9 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes. continue } } - if endpointAddress.NodeName == nil { + if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 { klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) - continue + return nil, nil, dupCount, ErrEPMissingNodeName } if endpointAddress.TargetRef == nil { klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) @@ -267,7 +274,10 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes. } zone, err := zoneGetter.GetZoneForNode(*endpointAddress.NodeName) if err != nil { - return nil, nil, dupCount, fmt.Errorf("failed to retrieve associated zone of node %q: %w", *endpointAddress.NodeName, err) + return nil, nil, dupCount, ErrNodeNotFound + } + if zone == "" { + return nil, nil, dupCount, ErrEPMissingZone } if zoneNetworkEndpointMap[zone] == nil { zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()