diff --git a/pkg/neg/metrics/neg_metrics_collector.go b/pkg/neg/metrics/neg_metrics_collector.go index b8b8f81dd1..5d33f3ab2f 100644 --- a/pkg/neg/metrics/neg_metrics_collector.go +++ b/pkg/neg/metrics/neg_metrics_collector.go @@ -87,7 +87,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string) sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap) } - sm.syncerStatusMap[key] = syncResult.Result + sm.syncerStatusMap[key] = string(syncResult.Result) } // SetSyncerEPMetrics update the endpoint count based on the endpointStat diff --git a/pkg/neg/syncers/endpoints_calculator.go b/pkg/neg/syncers/endpoints_calculator.go index d114faa898..dea6ef8633 100644 --- a/pkg/neg/syncers/endpoints_calculator.go +++ b/pkg/neg/syncers/endpoints_calculator.go @@ -109,6 +109,12 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints return subsetMap, nil, 0, err } +func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { + // this should be the same as CalculateEndpoints for L4 ec + subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap) + return subsetMap, nil, err +} + func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error { // this should be a no-op for now return nil @@ -166,6 +172,12 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints return subsetMap, nil, 0, err } +func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { + // this should be the same as CalculateEndpoints for L4 ec + subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap) + return subsetMap, nil, err +} + func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error { // this should be a no-op for now return nil @@ -176,16 +188,18 @@ type L7EndpointsCalculator struct { zoneGetter types.ZoneGetter servicePortName string podLister cache.Indexer + nodeLister cache.Indexer networkEndpointType types.NetworkEndpointType enableDualStackNEG bool logger klog.Logger } -func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator { +func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator { return &L7EndpointsCalculator{ zoneGetter: zoneGetter, servicePortName: svcPortName, podLister: podLister, + nodeLister: nodeLister, networkEndpointType: endpointType, enableDualStackNEG: enableDualStackNEG, logger: logger.WithName("L7EndpointsCalculator"), @@ -199,7 +213,14 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode { // CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) { - return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType) + result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.podLister, l.servicePortName, l.networkEndpointType) + return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err +} + +// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. +func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { + result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType) + return result.NetworkEndpointSet, result.EndpointPodMap, nil } func nodeMapToString(nodeMap map[string][]*v1.Node) string { @@ -235,7 +256,7 @@ func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.Endpoints } if countFromEndpointData != countFromPodMap { - l.logger.Info("Detected error when comparing endpoint counts", "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount) + l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount) return types.ErrEPCountsDiffer } return nil diff --git a/pkg/neg/syncers/endpoints_calculator_test.go b/pkg/neg/syncers/endpoints_calculator_test.go index 3272416259..75daab7b4e 100644 --- a/pkg/neg/syncers/endpoints_calculator_test.go +++ b/pkg/neg/syncers/endpoints_calculator_test.go @@ -232,10 +232,10 @@ func TestValidateEndpoints(t *testing.T) { zoneGetter := negtypes.NewFakeZoneGetter() testContext := negtypes.NewTestContext() podLister := testContext.PodInformer.GetIndexer() - nodeLister := listers.NewNodeLister(testContext.NodeInformer.GetIndexer()) - L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG) - L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO()) - L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO()) + nodeLister := testContext.NodeInformer.GetIndexer() + L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG) + L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO()) + L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO()) testEndpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{ { diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 6a2ef955b8..c0a186f10e 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -18,7 +18,6 @@ package syncers import ( "context" - "errors" "fmt" "net/http" "strings" @@ -187,6 +186,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt return NewL7EndpointsCalculator( zoneGetter, podLister, + nodeLister, syncerKey.PortTuple.Name, syncerKey.NegType, logger, @@ -220,17 +220,16 @@ func (s *transactionSyncer) syncInternal() error { } func (s *transactionSyncer) syncInternalImpl() error { + if s.syncer.IsStopped() || s.syncer.IsShuttingDown() { + s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String()) + return nil + } if s.needInit || s.isZoneChange() { if err := s.ensureNetworkEndpointGroups(); err != nil { return err } s.needInit = false } - - if s.syncer.IsStopped() || s.syncer.IsShuttingDown() { - s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String()) - return nil - } s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode()) currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode()) @@ -246,8 +245,6 @@ func (s *transactionSyncer) syncInternalImpl() error { var targetMap map[string]negtypes.NetworkEndpointSet var endpointPodMap negtypes.EndpointPodMap - var dupCount int - slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name)) if err != nil { return err @@ -256,37 +253,31 @@ func (s *transactionSyncer) syncInternalImpl() error { s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync") return nil } - endpointSlices := make([]*discovery.EndpointSlice, len(slices)) - negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) - for i, slice := range slices { - endpointslice := slice.(*discovery.EndpointSlice) - endpointSlices[i] = endpointslice - if err != nil { - s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName)) - continue - } - lastSyncTimestamp := negCR.Status.LastSyncTime - epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp - - epsStaleness := time.Since(lastSyncTimestamp.Time) - // if this endpoint slice is newly created/created after last sync - if lastSyncTimestamp.Before(&epsCreationTimestamp) { - epsStaleness = time.Since(epsCreationTimestamp.Time) - } - metrics.PublishNegEPSStalenessMetrics(epsStaleness) - s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness) + endpointSlices := convertUntypedToEPS(slices) + s.computeEPSStaleness(endpointSlices) - } endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) - targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if err != nil { + targetMap, endpointPodMap, err = s.getEndpointsCalculation(endpointsData, currentMap) + + if !s.enableDegradedMode && err != nil { return err - } - err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount) - if err != nil { - // TODO(cheungdavid): return error from ValidateEndpoint after degraded mode is implemented - // for now we don't return error so it won't break the sync - s.setErrorState(s.getErrorStateReason(err)) + } else if s.enableDegradedMode { + if !s.inErrorState() && err != nil { + return err // if we encounter an error, we will return and run the next sync in degraded mode + } + degradedTargetMap, degradedPodMap, degradedModeErr := s.endpointsCalculator.CalculateEndpointsDegradedMode(endpointsData, currentMap) + if degradedModeErr != nil { + return degradedModeErr + } + notInDegraded, onlyInDegraded := calculateNetworkEndpointDifference(targetMap, degradedTargetMap) + if s.inErrorState() { + targetMap = degradedTargetMap + endpointPodMap = degradedPodMap + if len(notInDegraded) == 0 && len(onlyInDegraded) == 0 { + s.resetErrorState() + } + } + // TODO(cheungdavid): in the else branch, publish metrics if we don't encounter error and we are not in error state } s.logStats(targetMap, "desired NEG endpoints") @@ -317,6 +308,23 @@ func (s *transactionSyncer) syncInternalImpl() error { return s.syncNetworkEndpoints(addEndpoints, removeEndpoints) } +func (s *transactionSyncer) getEndpointsCalculation( + endpointsData []negtypes.EndpointsData, + currentMap map[string]negtypes.NetworkEndpointSet, +) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) { + targetMap, endpointPodMap, dupCount, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) + if err != nil { + return nil, nil, err + } + if s.enableDegradedMode { + err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount) + if err != nil { + return nil, nil, err + } + } + return targetMap, endpointPodMap, nil +} + // syncLock must already be acquired before execution func (s *transactionSyncer) inErrorState() bool { return s.errorState != "" @@ -327,6 +335,11 @@ func (s *transactionSyncer) setErrorState(errorState string) { s.errorState = errorState } +// syncLock must already be acquired before execution +func (s *transactionSyncer) resetErrorState() { + s.errorState = "" +} + // ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones. func (s *transactionSyncer) ensureNetworkEndpointGroups() error { var err error @@ -376,7 +389,7 @@ func (s *transactionSyncer) getErrorStateReason(err error) string { return "" } if result, contains := negtypes.ErrorStateResult[err]; contains { - return result + return string(result) } return "" } @@ -403,8 +416,6 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti // syncNetworkEndpoints spins off go routines to execute NEG operations func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { - var wg sync.WaitGroup - syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { for zone, endpointSet := range endpointMap { if endpointSet.Len() == 0 { @@ -428,10 +439,10 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } if operation == attachOp { - s.attachNetworkEndpoints(zone, batch, &wg) + s.attachNetworkEndpoints(zone, batch) } if operation == detachOp { - s.detachNetworkEndpoints(zone, batch, &wg) + s.detachNetworkEndpoints(zone, batch) } } return nil @@ -444,52 +455,25 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m if err := syncFunc(removeEndpoints, detachOp); err != nil { return err } - go s.collectSyncResult(&wg) return nil } -// collectSyncResult collects the result of the sync and emits the metrics for sync result -func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) { - wg.Wait() - s.syncLock.Lock() - defer s.syncLock.Unlock() - - var syncResult *negtypes.NegSyncResult - switch s.errorState { - case "": - syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess) - case negtypes.ResultInvalidAPIResponse: - syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse) - case negtypes.ResultInvalidEPAttach: - syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach) - case negtypes.ResultInvalidEPDetach: - syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach) - default: - syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError) - } - - s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult) -} - // attachNetworkEndpoints creates go routine to run operations for attaching network endpoints -func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { +func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - wg.Add(1) - go s.operationInternal(attachOp, zone, networkEndpointMap, wg) + go s.operationInternal(attachOp, zone, networkEndpointMap) } // detachNetworkEndpoints creates go routine to run operations for detaching network endpoints -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { +func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - wg.Add(1) - go s.operationInternal(detachOp, zone, networkEndpointMap, wg) + go s.operationInternal(detachOp, zone, networkEndpointMap) } // operationInternal executes NEG API call and commits the transactions // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync -func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { - defer wg.Done() +func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { var err error start := time.Now() networkEndpoints := []*composite.NetworkEndpoint{} @@ -734,6 +718,35 @@ func (s *transactionSyncer) updateStatus(syncErr error) { } } +func convertUntypedToEPS(endpointSliceUntyped []interface{}) []*discovery.EndpointSlice { + endpointSlices := make([]*discovery.EndpointSlice, len(endpointSliceUntyped)) + for i, slice := range endpointSliceUntyped { + endpointslice := slice.(*discovery.EndpointSlice) + endpointSlices[i] = endpointslice + } + return endpointSlices +} + +func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.EndpointSlice) { + negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) + if err != nil { + s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName)) + return + } + lastSyncTimestamp := negCR.Status.LastSyncTime + for _, endpointSlice := range endpointSlices { + epsCreationTimestamp := endpointSlice.ObjectMeta.CreationTimestamp + + epsStaleness := time.Since(lastSyncTimestamp.Time) + // if this endpoint slice is newly created/created after last sync + if lastSyncTimestamp.Before(&epsCreationTimestamp) { + epsStaleness = time.Since(epsCreationTimestamp.Time) + } + metrics.PublishNegEPSStalenessMetrics(epsStaleness) + s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointSlice.Namespace, "Name", endpointSlice.Name, "staleness", epsStaleness) + } +} + // getNegFromStore returns the neg associated with the provided namespace and neg name if it exists otherwise throws an error func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) { n, exists, err := svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName)) diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 3329de962d..a2fc37fe99 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -18,6 +18,7 @@ package syncers import ( context2 "context" + "errors" "fmt" "net" "reflect" @@ -28,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -59,8 +61,6 @@ const ( testInstance4 = "instance4" testInstance5 = "instance5" testInstance6 = "instance6" - testNamespace = "ns" - testService = "svc" ) func TestTransactionSyncNetworkEndpoints(t *testing.T) { @@ -900,8 +900,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { negExists: true, negDesc: utils.NegDescription{ ClusterUID: kubeSystemUID, - Namespace: testNamespace, - ServiceName: testService, + Namespace: testServiceNamespace, + ServiceName: testServiceName, Port: "80", }.String(), crStatusPopulated: true, @@ -919,8 +919,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { negExists: true, negDesc: utils.NegDescription{ ClusterUID: kubeSystemUID, - Namespace: testNamespace, - ServiceName: testService, + Namespace: testServiceNamespace, + ServiceName: testServiceName, Port: "80", }.String(), crStatusPopulated: false, @@ -931,8 +931,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { negExists: true, negDesc: utils.NegDescription{ ClusterUID: "cluster-2", - Namespace: testNamespace, - ServiceName: testService, + Namespace: testServiceNamespace, + ServiceName: testServiceName, Port: "80", }.String(), crStatusPopulated: false, @@ -944,7 +944,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { negDesc: utils.NegDescription{ ClusterUID: kubeSystemUID, Namespace: "namespace-2", - ServiceName: testService, + ServiceName: testServiceName, Port: "80", }.String(), crStatusPopulated: false, @@ -955,7 +955,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { negExists: true, negDesc: utils.NegDescription{ ClusterUID: kubeSystemUID, - Namespace: testNamespace, + Namespace: testServiceNamespace, ServiceName: "service-2", Port: "80", }.String(), @@ -967,8 +967,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { negExists: true, negDesc: utils.NegDescription{ ClusterUID: kubeSystemUID, - Namespace: testNamespace, - ServiceName: testService, + Namespace: testServiceNamespace, + ServiceName: testServiceName, Port: "81", }.String(), crStatusPopulated: false, @@ -980,8 +980,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { // Cause error by having a conflicting neg description negDesc: utils.NegDescription{ ClusterUID: kubeSystemUID, - Namespace: testNamespace, - ServiceName: testService, + Namespace: testServiceNamespace, + ServiceName: testServiceName, Port: "81", }.String(), crStatusPopulated: true, @@ -1022,7 +1022,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { creationTS := v1.Date(2020, time.July, 23, 0, 0, 0, 0, time.UTC) //Create NEG CR for Syncer to update status on origCR := createNegCR(testNegName, creationTS, tc.crStatusPopulated, tc.crStatusPopulated, refs) - neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) if err != nil { t.Errorf("Failed to create test NEG CR: %s", err) } @@ -1035,7 +1035,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { t.Errorf("Expected error, but got none") } - negCR, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) + negCR, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) if err != nil { t.Errorf("Failed to get NEG from neg client: %s", err) } @@ -1080,7 +1080,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { } }) - negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Delete(context2.TODO(), testNegName, v1.DeleteOptions{}) + negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Delete(context2.TODO(), testNegName, v1.DeleteOptions{}) syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone1, syncer.NegSyncerKey.GetAPIVersion()) syncer.cloud.DeleteNetworkEndpointGroup(testNegName, negtypes.TestZone2, syncer.NegSyncerKey.GetAPIVersion()) @@ -1186,7 +1186,7 @@ func TestUpdateStatus(t *testing.T) { // Since timestamp gets truncated to the second, there is a chance that the timestamps will be the same as LastTransitionTime or LastSyncTime so use creation TS from an earlier date creationTS := v1.Date(2020, time.July, 23, 0, 0, 0, 0, time.UTC) origCR := createNegCR(testNegName, creationTS, tc.populateConditions[negv1beta1.Initialized], tc.populateConditions[negv1beta1.Synced], tc.negRefs) - origCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) + origCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) if err != nil { t.Errorf("Failed to create test NEG CR: %s", err) } @@ -1194,7 +1194,7 @@ func TestUpdateStatus(t *testing.T) { syncer.updateStatus(syncErr) - negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) + negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) if err != nil { t.Errorf("Failed to create test NEG CR: %s", err) } @@ -1307,7 +1307,7 @@ func TestUnknownNodes(t *testing.T) { testIP3 := "10.100.2.1" testPort := int64(80) - testEndpointSlices := getTestEndpointSlices(testService, testNamespace) + testEndpointSlices := getDefaultEndpointSlices() testEndpointSlices[0].Endpoints[0].NodeName = utilpointer.StringPtr("unknown-node") testEndpointMap := map[string]*composite.NetworkEndpoint{ negtypes.TestZone1: &composite.NetworkEndpoint{ @@ -1344,7 +1344,7 @@ func TestUnknownNodes(t *testing.T) { neg := &negv1beta1.ServiceNetworkEndpointGroup{ ObjectMeta: metav1.ObjectMeta{ Name: testNegName, - Namespace: testNamespace, + Namespace: testServiceNamespace, }, Status: negv1beta1.ServiceNetworkEndpointGroupStatus{ NetworkEndpointGroups: objRefs, @@ -1389,6 +1389,245 @@ func TestUnknownNodes(t *testing.T) { } } +// TestEnableDegradedMode verifies if DegradedMode has been correctly enabled for L7 endpoint calculator +func TestEnableDegradedMode(t *testing.T) { + t.Parallel() + zoneGetter := negtypes.NewFakeZoneGetter() + testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"}) + testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"}) + fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) + + testIP1 := "10.100.1.1" + testIP2 := "10.100.1.2" + testIP3 := "10.100.2.1" + testPort := int64(80) + + // only include matching port endpoints so we won't encounter error when validatingEndpoints + invalidEndpointSlices := getDefaultEndpointSlices()[:1] + invalidEndpointSlices[0].Endpoints[0].NodeName = nil + validEndpointSlice := getDefaultEndpointSlices()[:1] + testEndpointMap := map[string]*composite.NetworkEndpoint{ + negtypes.TestZone1: { + Instance: negtypes.TestInstance1, + IpAddress: testIP1, + Port: testPort, + }, + negtypes.TestZone2: { + Instance: negtypes.TestInstance3, + IpAddress: testIP2, + Port: testPort, + }, + negtypes.TestZone4: { + Instance: negtypes.TestUpgradeInstance1, + IpAddress: testIP3, + Port: testPort, + }, + } + + initialEndpoints := map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"), + ), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.1.2||instance3||80"), + ), + negtypes.TestZone4: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.2.1||upgrade-instance1||80"), + ), + } + + updatedEndpoints := map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"), + networkEndpointFromEncodedEndpoint("10.100.1.2||instance1||80"), + networkEndpointFromEncodedEndpoint("10.100.2.1||instance2||80"), + networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"), + networkEndpointFromEncodedEndpoint("10.100.1.4||instance1||80")), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80")), + negtypes.TestZone4: negtypes.NewNetworkEndpointSet(), + } + + testCases := []struct { + desc string + modify func(ts *transactionSyncer) + negName string // to distinguish endpoints in differnt NEGs + testEndpointSlices []*discovery.EndpointSlice + expectedEndpoints map[string]negtypes.NetworkEndpointSet + expectedInErrorState bool + expectErr error + }{ + { + desc: "enable degraded mode, not error state, include invalid endpoint that would trigger error state", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = true + ts.resetErrorState() + }, + negName: "neg-1", + testEndpointSlices: invalidEndpointSlices, + expectedEndpoints: initialEndpoints, + expectedInErrorState: true, + expectErr: negtypes.ErrEPNodeMissing, + }, + { + desc: "enable degraded mode, in error state, include invalid endpoints that would trigger error state", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = true + ts.setErrorState(string(negtypes.ResultEPNodeMissing)) + }, + negName: "neg-2", + testEndpointSlices: invalidEndpointSlices, + expectedEndpoints: updatedEndpoints, + expectedInErrorState: true, + expectErr: nil, + }, + { + desc: "enable degraded mode, not error state, no invalid endpoints", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = true + ts.resetErrorState() + }, + negName: "neg-3", + testEndpointSlices: validEndpointSlice, + expectedEndpoints: updatedEndpoints, + expectedInErrorState: false, + expectErr: nil, + }, + { + desc: "enable degraded mode, in error state, no invalid endpoints", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = true + ts.setErrorState(string(negtypes.ResultEPNodeMissing)) + }, + negName: "neg-4", + testEndpointSlices: validEndpointSlice, + expectedEndpoints: updatedEndpoints, + expectedInErrorState: false, // we should reset error state + expectErr: nil, + }, + { + desc: "disable degraded mode, not error state, include invalid endpoints that would trigger error state", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = false + ts.resetErrorState() + }, + negName: "neg-5", + testEndpointSlices: invalidEndpointSlices, + expectedEndpoints: initialEndpoints, + expectedInErrorState: true, + expectErr: negtypes.ErrEPNodeMissing, + }, + { + desc: "disable degraded mode, and in error state, include invalid endpoints that would trigger error state", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = false + ts.setErrorState(string(negtypes.ResultEPNodeMissing)) + }, + negName: "neg-6", + testEndpointSlices: invalidEndpointSlices, + expectedEndpoints: initialEndpoints, + expectedInErrorState: true, + expectErr: negtypes.ErrEPNodeMissing, + }, + { + desc: "disable degraded mode, and not error state, no invalid endpoints", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = false + ts.resetErrorState() + }, + negName: "neg-7", + testEndpointSlices: validEndpointSlice, + expectedEndpoints: updatedEndpoints, + expectedInErrorState: false, + expectErr: nil, + }, + { + desc: "disable degraded mode, and in error state, no invalid endpoints", + modify: func(ts *transactionSyncer) { + ts.enableDegradedMode = false + ts.setErrorState(string(negtypes.ResultEPNodeMissing)) + }, + negName: "neg-8", + testEndpointSlices: validEndpointSlice, + expectedEndpoints: updatedEndpoints, + expectedInErrorState: true, // if degraded mode is disabled, we don't reset error state, but we won't have different behaviors based on error state either + expectErr: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + // Create initial NetworkEndpointGroups in cloud + var objRefs []negv1beta1.NegObjectReference + for zone, endpoint := range testEndpointMap { + fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{Name: tc.negName, Version: meta.VersionGA}, zone) + fakeCloud.AttachNetworkEndpoints(tc.negName, zone, []*composite.NetworkEndpoint{endpoint}, meta.VersionGA) + neg, err := fakeCloud.GetNetworkEndpointGroup(tc.negName, zone, meta.VersionGA) + if err != nil { + t.Fatalf("failed to get neg from fake cloud: %s", err) + } + objRefs = append(objRefs, negv1beta1.NegObjectReference{SelfLink: neg.SelfLink}) + } + neg := &negv1beta1.ServiceNetworkEndpointGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.negName, + Namespace: testServiceNamespace, + }, + Status: negv1beta1.ServiceNetworkEndpointGroupStatus{ + NetworkEndpointGroups: objRefs, + }, + } + _, s := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false) + s.NegSyncerKey.NegName = tc.negName + s.needInit = false + addPodsToLister(s.podLister) + for i := 1; i <= 4; i++ { + s.nodeLister.Add(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("instance%v", i), + }, + }) + } + for _, eps := range tc.testEndpointSlices { + s.endpointSliceLister.Add(eps) + } + s.svcNegLister.Add(neg) + // mark syncer as started without starting the syncer routine + (s.syncer.(*syncer)).stopped = false + tc.modify(s) + + out, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode) + if err != nil { + t.Errorf("errored retrieving existing network endpoints") + } + if !reflect.DeepEqual(initialEndpoints, out) { + t.Errorf("endpoints should not be changed before sync:\ngot %+v,\n expected %+v", out, tc.expectedEndpoints) + } + + err = s.syncInternal() + if !errors.Is(err, tc.expectErr) { + t.Errorf("syncInternal returned %v, expected %v", err, tc.expectErr) + } + if s.inErrorState() != tc.expectedInErrorState { + t.Errorf("after syncInternal, error state is %v, expected to be %v", s.inErrorState(), tc.expectedInErrorState) + } + err = wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) { + out, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode) + if err != nil { + return false, err + } + if !reflect.DeepEqual(tc.expectedEndpoints, out) { + return false, err + } + return true, nil + }) + if err != nil { + t.Errorf("endpoints are different from expected:\ngot %+v,\n expected %+v", out, tc.expectedEndpoints) + } + }) + } +} + func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false) ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG) @@ -1398,8 +1637,8 @@ func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, m func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negType negtypes.NetworkEndpointType, customName bool) (negtypes.NegSyncer, *transactionSyncer) { testContext := negtypes.NewTestContext() svcPort := negtypes.NegSyncerKey{ - Namespace: testNamespace, - Name: testService, + Namespace: testServiceNamespace, + Name: testServiceName, NegType: negType, PortTuple: negtypes.SvcPortTuple{ Port: 80, @@ -1481,7 +1720,7 @@ func generateEndpointSetAndMap(initialIp net.IP, num int, instance string, targe endpoint := negtypes.NetworkEndpoint{IP: ip.String(), Node: instance, Port: targetPort} retSet.Insert(endpoint) - retMap[endpoint] = types.NamespacedName{Namespace: testNamespace, Name: fmt.Sprintf("pod-%s-%d", instance, i)} + retMap[endpoint] = types.NamespacedName{Namespace: testServiceNamespace, Name: fmt.Sprintf("pod-%s-%d", instance, i)} } return retSet, retMap } @@ -1656,7 +1895,7 @@ func createNegCR(testNegName string, creationTS metav1.Time, populateInitialized neg := &negv1beta1.ServiceNetworkEndpointGroup{ ObjectMeta: metav1.ObjectMeta{ Name: testNegName, - Namespace: testNamespace, + Namespace: testServiceNamespace, CreationTimestamp: creationTS, }, } diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 739fbe99b4..a17b1f69da 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/tools/record" negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/flags" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/klog/v2" @@ -217,14 +218,30 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService return negRef, nil } +type ZoneNetworkEndpointMapResult struct { + NetworkEndpointSet map[string]negtypes.NetworkEndpointSet + EndpointPodMap negtypes.EndpointPodMap + DupCount int +} + // toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints -func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) { +func toZoneNetworkEndpointMap( + eds []negtypes.EndpointsData, + zoneGetter negtypes.ZoneGetter, + podLister cache.Indexer, + servicePortName string, + networkEndpointType negtypes.NetworkEndpointType, +) (ZoneNetworkEndpointMapResult, error) { zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} networkEndpointPodMap := negtypes.EndpointPodMap{} dupCount := 0 if eds == nil { klog.Errorf("Endpoint object is nil") - return zoneNetworkEndpointMap, networkEndpointPodMap, dupCount, nil + return ZoneNetworkEndpointMapResult{ + NetworkEndpointSet: zoneNetworkEndpointMap, + EndpointPodMap: networkEndpointPodMap, + DupCount: dupCount, + }, nil } var foundMatchingPort bool for _, ed := range eds { @@ -249,22 +266,29 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes. klog.Infof("Skipping non IPv4 address: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) continue } - if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 { - klog.V(2).Infof("Detected unexpected error when checking missing nodeName. Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) - return nil, nil, dupCount, negtypes.ErrEPMissingNodeName + zone, getZoneErr := getEndpointZone(endpointAddress, zoneGetter) + if getZoneErr != nil { + klog.Errorf("Detected unexpected error when getting zone, err: %v", getZoneErr) + return ZoneNetworkEndpointMapResult{ + NetworkEndpointSet: nil, + EndpointPodMap: nil, + DupCount: dupCount, + }, getZoneErr } - if endpointAddress.TargetRef == nil { + // pod is used for label propagation + _, getPodErr := getEndpointPod(endpointAddress, podLister) + if getPodErr != nil { + if flags.F.EnableDegradedMode { + klog.Errorf("Detected unexpected error when getting pod, err: %v", getPodErr) + return ZoneNetworkEndpointMapResult{ + NetworkEndpointSet: nil, + EndpointPodMap: nil, + DupCount: dupCount, + }, getPodErr // when degraded mode is enabled, we want to trigger degraded mode so return the error + } klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) continue } - zone, err := zoneGetter.GetZoneForNode(*endpointAddress.NodeName) - if err != nil { - return nil, nil, dupCount, negtypes.ErrNodeNotFound - } - if zone == "" { - klog.V(2).Info("Detected unexpected error when checking missing zone") - return nil, nil, dupCount, negtypes.ErrEPMissingZone - } if zoneNetworkEndpointMap[zone] == nil { zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet() } @@ -277,11 +301,12 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes. } zoneNetworkEndpointMap[zone].Insert(networkEndpoint) - // increment the count for duplicate endpoint + // if existing name is alphabetically lower than current one, continue and don't replace if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains { dupCount += 1 if existingPod.Name < endpointAddress.TargetRef.Name { - continue // if existing name is alphabetically lower than current one, continue and don't replace + klog.Infof("Found duplicate endpoints for %q, save the pod information from the alphabetically higher pod", address) + continue } } networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name} @@ -295,15 +320,60 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes. if len(zoneNetworkEndpointMap) == 0 || len(networkEndpointPodMap) == 0 { klog.V(3).Infof("Generated empty endpoint maps (zoneNetworkEndpointMap: %+v, networkEndpointPodMap: %v) from Endpoints object: %+v", zoneNetworkEndpointMap, networkEndpointPodMap, eds) } - return zoneNetworkEndpointMap, networkEndpointPodMap, dupCount, nil + return ZoneNetworkEndpointMapResult{ + NetworkEndpointSet: zoneNetworkEndpointMap, + EndpointPodMap: networkEndpointPodMap, + DupCount: dupCount, + }, nil } -func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, - podLister, nodeLister cache.Indexer, servicePortName string, - networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) { - targetMap := map[string]negtypes.NetworkEndpointSet{} - endpointPodMap := negtypes.EndpointPodMap{} - var dupCount int +// getEndpointZone use an endpoint's node information to get its corresponding zone +func getEndpointZone( + endpointAddress negtypes.AddressData, + zoneGetter negtypes.ZoneGetter, +) (string, error) { + if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 { + return "", negtypes.ErrEPNodeMissing + } + zone, err := zoneGetter.GetZoneForNode(*endpointAddress.NodeName) + if err != nil { + return zone, negtypes.ErrEPNodeNotFound + } + if zone == "" { + return zone, negtypes.ErrEPZoneMissing + } + return zone, nil +} + +// getEndpointPod use an endpoint's pod information to get its corresponding pod object +func getEndpointPod( + endpointAddress negtypes.AddressData, + podLister cache.Indexer, +) (*apiv1.Pod, error) { + if endpointAddress.TargetRef == nil { + return nil, negtypes.ErrEPPodMissing + } + key := fmt.Sprintf("%s/%s", endpointAddress.TargetRef.Namespace, endpointAddress.TargetRef.Name) + obj, exists, err := podLister.GetByKey(key) + if err != nil || !exists { + return nil, negtypes.ErrEPPodNotFound + } + pod, ok := obj.(*apiv1.Pod) + if !ok { + return nil, negtypes.ErrEPPodTypeAssertionFailed + } + return pod, nil +} + +func toZoneNetworkEndpointMapDegradedMode( + eds []negtypes.EndpointsData, + zoneGetter negtypes.ZoneGetter, + podLister, nodeLister cache.Indexer, + servicePortName string, + networkEndpointType negtypes.NetworkEndpointType, +) ZoneNetworkEndpointMapResult { + zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} + networkEndpointPodMap := negtypes.EndpointPodMap{} for _, ed := range eds { matchPort := "" for _, port := range ed.Ports { @@ -320,58 +390,47 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) continue } - if endpointAddress.TargetRef == nil { - klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) + pod, getPodErr := getEndpointPod(endpointAddress, podLister) + if getPodErr != nil { + klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr) continue } - dupCount += validateAndAddEndpoints(endpointAddress, zoneGetter, podLister, nodeLister, matchPort, networkEndpointType, targetMap, endpointPodMap) - } - } - return targetMap, endpointPodMap, dupCount, nil -} - -// validateAndAddEndpoints fills in missing information and creates network endpoint for each endpoint addresss -func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, matchPort string, endpointType negtypes.NetworkEndpointType, targetMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap) int { - var dupCount int - for _, address := range ep.Addresses { - key := fmt.Sprintf("%s/%s", ep.TargetRef.Namespace, ep.TargetRef.Name) - obj, exists, err := podLister.GetByKey(key) - if err != nil || !exists { - klog.V(2).Infof("Endpoint %q does not correspond to an existing pod. Skipping", address) - continue - } - pod, ok := obj.(*apiv1.Pod) - if !ok { - klog.V(2).Infof("Endpoint %q does not correspond to a pod object. Skipping", address) - continue - } - if !validatePod(pod, nodeLister) { - klog.V(2).Infof("Endpoint %q does not correspond to a valid pod resource. Skipping", address) - continue - } - nodeName := pod.Spec.NodeName - zone, err := zoneGetter.GetZoneForNode(nodeName) - if err != nil { - klog.V(2).Infof("Endpoint %q does not have valid zone information. Skipping", address) - continue - } + if !validatePod(pod, nodeLister) { + klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr) + continue + } + nodeName := pod.Spec.NodeName + zone, err := zoneGetter.GetZoneForNode(nodeName) + if err != nil { + klog.V(2).Infof("For endpoint %q in pod %q, its corresponding node %q does not have valid zone information, skipping", endpointAddress.Addresses, pod.ObjectMeta.Name, nodeName) + continue + } + if zoneNetworkEndpointMap[zone] == nil { + zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet() + } + for _, address := range endpointAddress.Addresses { + networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName} + if networkEndpointType == negtypes.NonGCPPrivateEndpointType { + // Non-GCP network endpoints don't have associated nodes. + networkEndpoint.Node = "" + } + zoneNetworkEndpointMap[zone].Insert(networkEndpoint) - if endpointType == negtypes.NonGCPPrivateEndpointType { - // Non-GCP network endpoints don't have associated nodes. - nodeName = "" - } - networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName} - if targetMap[zone] == nil { - targetMap[zone] = negtypes.NewNetworkEndpointSet() - } - targetMap[zone].Insert(networkEndpoint) - // increment the count for duplicated endpoint - if _, contains := endpointPodMap[networkEndpoint]; contains { - dupCount += 1 + if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains { + // if existing name is alphabetically lower than current one, continue and don't replace + if existingPod.Name < endpointAddress.TargetRef.Name { + klog.Infof("Found duplicate endpoints for %q, save the pod information from the alphabetically higher pod", address) + continue + } + } + networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name} + } } - endpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name} } - return dupCount + return ZoneNetworkEndpointMapResult{ + NetworkEndpointSet: zoneNetworkEndpointMap, + EndpointPodMap: networkEndpointPodMap, + } } // validatePod checks if this pod is a valid pod resource @@ -382,17 +441,17 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) bool { // Terminal Pod means a pod is in PodFailed or PodSucceeded phase phase := pod.Status.Phase if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded { - klog.V(2).Info("Pod %s/%s is a terminal pod with status %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, phase) + klog.V(2).Infof("Pod %s/%s is a terminal pod with status %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, phase) return false } obj, exists, err := nodeLister.GetByKey(pod.Spec.NodeName) if err != nil || !exists { - klog.V(2).Info("Pod %s/%s corresponds to a non-existing node %s, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName) + klog.V(2).Infof("Pod %s/%s corresponds to a non-existing node %s, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName) return false } _, isNode := obj.(*apiv1.Node) if !isNode { - klog.V(2).Info("Pod %s/%s does not correspond to a valid node resource, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) + klog.V(2).Infof("Pod %s/%s does not correspond to a valid node resource, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) return false } return true diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index db833f2172..6901ad6b55 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -444,6 +444,8 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { func TestToZoneNetworkEndpointMapUtil(t *testing.T) { t.Parallel() zoneGetter := negtypes.NewFakeZoneGetter() + podLister := negtypes.NewTestContext().PodInformer.GetIndexer() + addPodsToLister(podLister) testCases := []struct { desc string portName string @@ -530,17 +532,17 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { } for _, tc := range testCases { - retSet, retMap, _, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, tc.portName, tc.networkEndpointType) + result, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, podLister, tc.portName, tc.networkEndpointType) if err != nil { t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) } - if !reflect.DeepEqual(retSet, tc.endpointSets) { - t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, retSet) + if !reflect.DeepEqual(result.NetworkEndpointSet, tc.endpointSets) { + t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, result.NetworkEndpointSet) } - if !reflect.DeepEqual(retMap, tc.expectMap) { - t.Errorf("For case %q, expecting endpoint map %v, but got %v.", tc.desc, tc.expectMap, retMap) + if !reflect.DeepEqual(result.EndpointPodMap, tc.expectMap) { + t.Errorf("For case %q, expecting endpoint map %v, but got %v.", tc.desc, tc.expectMap, result.EndpointPodMap) } } } @@ -1212,9 +1214,9 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { testNonExistPort := "non-exists" testEmptyNamedPort := "" testNamedPort := "named-Port" - testCases := []struct { desc string + testEndpointSlices []*discovery.EndpointSlice portName string expectedEndpointMap map[string]negtypes.NetworkEndpointSet expectedPodMap negtypes.EndpointPodMap @@ -1222,14 +1224,16 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { }{ { desc: "non exist target port", + testEndpointSlices: getDefaultEndpointSlices(), portName: testNonExistPort, expectedEndpointMap: map[string]negtypes.NetworkEndpointSet{}, expectedPodMap: negtypes.EndpointPodMap{}, networkEndpointType: negtypes.VmIpPortEndpointType, }, { - desc: "empty named port", - portName: testEmptyNamedPort, + desc: "empty named port", + testEndpointSlices: getDefaultEndpointSlices(), + portName: testEmptyNamedPort, expectedEndpointMap: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"), @@ -1241,18 +1245,19 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80")), }, expectedPodMap: negtypes.EndpointPodMap{ - networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod1"}, - networkEndpointFromEncodedEndpoint("10.100.1.2||instance1||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod2"}, - networkEndpointFromEncodedEndpoint("10.100.2.1||instance2||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod3"}, - networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod4"}, - networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod5"}, - networkEndpointFromEncodedEndpoint("10.100.1.4||instance1||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod6"}, + networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod1"}, + networkEndpointFromEncodedEndpoint("10.100.1.2||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod2"}, + networkEndpointFromEncodedEndpoint("10.100.2.1||instance2||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"}, + networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"}, + networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"}, + networkEndpointFromEncodedEndpoint("10.100.1.4||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod6"}, }, networkEndpointType: negtypes.VmIpPortEndpointType, }, { - desc: "named target port", - portName: testNamedPort, + desc: "named target port", + testEndpointSlices: getDefaultEndpointSlices(), + portName: testNamedPort, expectedEndpointMap: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.2.2||instance2||81")), @@ -1264,18 +1269,19 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.4.4||instance4||8081")), }, expectedPodMap: negtypes.EndpointPodMap{ - networkEndpointFromEncodedEndpoint("10.100.2.2||instance2||81"): types.NamespacedName{Namespace: testNamespace, Name: "pod7"}, - networkEndpointFromEncodedEndpoint("10.100.4.1||instance4||81"): types.NamespacedName{Namespace: testNamespace, Name: "pod8"}, - networkEndpointFromEncodedEndpoint("10.100.4.3||instance4||81"): types.NamespacedName{Namespace: testNamespace, Name: "pod9"}, - networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"): types.NamespacedName{Namespace: testNamespace, Name: "pod10"}, - networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"): types.NamespacedName{Namespace: testNamespace, Name: "pod11"}, - networkEndpointFromEncodedEndpoint("10.100.4.4||instance4||8081"): types.NamespacedName{Namespace: testNamespace, Name: "pod12"}, + networkEndpointFromEncodedEndpoint("10.100.2.2||instance2||81"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod7"}, + networkEndpointFromEncodedEndpoint("10.100.4.1||instance4||81"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod8"}, + networkEndpointFromEncodedEndpoint("10.100.4.3||instance4||81"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod9"}, + networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod10"}, + networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod11"}, + networkEndpointFromEncodedEndpoint("10.100.4.4||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod12"}, }, networkEndpointType: negtypes.VmIpPortEndpointType, }, { - desc: "Non-GCP network endpoints", - portName: testEmptyNamedPort, + desc: "Non-GCP network endpoints", + testEndpointSlices: getDefaultEndpointSlices(), + portName: testEmptyNamedPort, expectedEndpointMap: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.1.1||||80"), @@ -1287,49 +1293,36 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.3.1||||80")), }, expectedPodMap: negtypes.EndpointPodMap{ - networkEndpointFromEncodedEndpoint("10.100.1.1||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod1"}, - networkEndpointFromEncodedEndpoint("10.100.1.2||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod2"}, - networkEndpointFromEncodedEndpoint("10.100.2.1||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod3"}, - networkEndpointFromEncodedEndpoint("10.100.3.1||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod4"}, - networkEndpointFromEncodedEndpoint("10.100.1.3||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod5"}, - networkEndpointFromEncodedEndpoint("10.100.1.4||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod6"}, + networkEndpointFromEncodedEndpoint("10.100.1.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod1"}, + networkEndpointFromEncodedEndpoint("10.100.1.2||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod2"}, + networkEndpointFromEncodedEndpoint("10.100.2.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"}, + networkEndpointFromEncodedEndpoint("10.100.3.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"}, + networkEndpointFromEncodedEndpoint("10.100.1.3||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"}, + networkEndpointFromEncodedEndpoint("10.100.1.4||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod6"}, }, networkEndpointType: negtypes.NonGCPPrivateEndpointType, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - testEndpointSlices := getTestEndpointSlices(testService, testNamespace) - - targetMap, endpointPodMap, _, err := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(testEndpointSlices), fakeZoneGetter, podLister, nodeLister, tc.portName, tc.networkEndpointType) - if err != nil { - t.Errorf("expected error=nil, but got %v", err) - } - - if !reflect.DeepEqual(targetMap, tc.expectedEndpointMap) { - t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", targetMap, tc.expectedEndpointMap) + result := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices), fakeZoneGetter, podLister, nodeLister, tc.portName, tc.networkEndpointType) + if !reflect.DeepEqual(result.NetworkEndpointSet, tc.expectedEndpointMap) { + t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", result.NetworkEndpointSet, tc.expectedEndpointMap) } - if !reflect.DeepEqual(endpointPodMap, tc.expectedPodMap) { - t.Errorf("degraded mode endpoint map is not calculated correctly:\ngot %+v,\n expected %+v", endpointPodMap, tc.expectedPodMap) + if !reflect.DeepEqual(result.EndpointPodMap, tc.expectedPodMap) { + t.Errorf("degraded mode endpoint map is not calculated correctly:\ngot %+v,\n expected %+v", result.EndpointPodMap, tc.expectedPodMap) } }) } } -func TestValidateAndAddEndpoints(t *testing.T) { +func TestDegradedModeValidateEndpointInfo(t *testing.T) { t.Parallel() - matchPort := strconv.Itoa(int(80)) + emptyNamedPort := "" emptyNodeName := "" - ready := true + port80 := int32(80) + protocolTCP := v1.ProtocolTCP instance1 := negtypes.TestInstance1 - endpointMap := map[string]negtypes.NetworkEndpointSet{ - negtypes.TestZone1: negtypes.NewNetworkEndpointSet( - networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80")), - } - podMap := negtypes.EndpointPodMap{ - networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod1"}, - } - fakeZoneGetter := negtypes.NewFakeZoneGetter() testContext := negtypes.NewTestContext() podLister := testContext.PodInformer.GetIndexer() @@ -1342,38 +1335,62 @@ func TestValidateAndAddEndpoints(t *testing.T) { }, }) + endpointMap := map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"), + networkEndpointFromEncodedEndpoint("10.100.1.2||instance1||80"), + ), + } + podMap := negtypes.EndpointPodMap{ + networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod1"}, + networkEndpointFromEncodedEndpoint("10.100.1.2||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod2"}, + } + testCases := []struct { desc string - ep negtypes.AddressData + testEndpointSlices []*discovery.EndpointSlice endpointType negtypes.NetworkEndpointType expectedEndpointMap map[string]negtypes.NetworkEndpointSet expectedPodMap negtypes.EndpointPodMap }{ - { - desc: "endpoint with nodeName", - ep: negtypes.AddressData{ - Addresses: []string{"10.100.1.1"}, - NodeName: &instance1, - TargetRef: &corev1.ObjectReference{ - Namespace: testNamespace, - Name: "pod1", - }, - Ready: ready, - }, - endpointType: negtypes.VmIpPortEndpointType, - expectedEndpointMap: endpointMap, - expectedPodMap: podMap, - }, { desc: "endpoint without nodeName, nodeName should be filled", - ep: negtypes.AddressData{ - Addresses: []string{"10.100.1.1"}, - NodeName: nil, - TargetRef: &corev1.ObjectReference{ - Namespace: testNamespace, - Name: "pod1", + testEndpointSlices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + }, + }, + AddressType: "IPv4", + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.100.1.1"}, + NodeName: nil, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + }, + { + Addresses: []string{"10.100.1.2"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &emptyNamedPort, + Port: &port80, + Protocol: &protocolTCP, + }, + }, }, - Ready: ready, }, endpointType: negtypes.VmIpPortEndpointType, expectedEndpointMap: endpointMap, @@ -1381,51 +1398,56 @@ func TestValidateAndAddEndpoints(t *testing.T) { }, { desc: "endpoint with empty nodeName, nodeName should be filled", - ep: negtypes.AddressData{ - Addresses: []string{"10.100.1.1"}, - NodeName: &emptyNodeName, - TargetRef: &corev1.ObjectReference{ - Namespace: testNamespace, - Name: "pod1", + testEndpointSlices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + }, + }, + AddressType: "IPv4", + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.100.1.1"}, + NodeName: &emptyNodeName, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + }, + { + Addresses: []string{"10.100.1.2"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &emptyNamedPort, + Port: &port80, + Protocol: &protocolTCP, + }, + }, }, - Ready: ready, }, endpointType: negtypes.VmIpPortEndpointType, expectedEndpointMap: endpointMap, expectedPodMap: podMap, }, - { - desc: "Non-GCP network endpoint", - ep: negtypes.AddressData{ - TargetRef: &corev1.ObjectReference{ - Namespace: testNamespace, - Name: "pod1", - }, - NodeName: &emptyNodeName, - Addresses: []string{"10.100.1.1"}, - Ready: ready, - }, - - endpointType: negtypes.NonGCPPrivateEndpointType, - expectedEndpointMap: map[string]negtypes.NetworkEndpointSet{ - negtypes.TestZone1: negtypes.NewNetworkEndpointSet( - networkEndpointFromEncodedEndpoint("10.100.1.1||||80")), - }, - expectedPodMap: negtypes.EndpointPodMap{ - networkEndpointFromEncodedEndpoint("10.100.1.1||||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod1"}}, - }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - targetMap := map[string]negtypes.NetworkEndpointSet{} - endpointPodMap := negtypes.EndpointPodMap{} - validateAndAddEndpoints(tc.ep, fakeZoneGetter, podLister, nodeLister, matchPort, tc.endpointType, targetMap, endpointPodMap) - - if !reflect.DeepEqual(targetMap, tc.expectedEndpointMap) { - t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", targetMap, tc.expectedEndpointMap) + result := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices), fakeZoneGetter, podLister, nodeLister, emptyNamedPort, tc.endpointType) + if !reflect.DeepEqual(result.NetworkEndpointSet, tc.expectedEndpointMap) { + t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", result.NetworkEndpointSet, tc.expectedEndpointMap) } - if !reflect.DeepEqual(endpointPodMap, tc.expectedPodMap) { - t.Errorf("degraded mode endpoint map is not calculated correctly:\ngot %+v,\n expected %+v", endpointPodMap, tc.expectedPodMap) + if !reflect.DeepEqual(result.EndpointPodMap, tc.expectedPodMap) { + t.Errorf("degraded mode endpoint map is not calculated correctly:\ngot %+v,\n expected %+v", result.EndpointPodMap, tc.expectedPodMap) } }) } @@ -1452,7 +1474,7 @@ func TestValidatePod(t *testing.T) { desc: "a valid pod with phase running", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod1", }, Status: v1.PodStatus{ @@ -1468,7 +1490,7 @@ func TestValidatePod(t *testing.T) { desc: "a terminal pod with phase failed", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod2", }, Status: v1.PodStatus{ @@ -1484,7 +1506,7 @@ func TestValidatePod(t *testing.T) { desc: "a terminal pod with phase succeeded", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod3", }, Status: v1.PodStatus{ @@ -1500,7 +1522,7 @@ func TestValidatePod(t *testing.T) { desc: "a pod from non-existent node", pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod4", }, Status: corev1.PodStatus{ @@ -1527,7 +1549,7 @@ func addPodsToLister(podLister cache.Indexer) { for i := 1; i <= 6; i++ { podLister.Add(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: fmt.Sprintf("pod%v", i), }, Status: corev1.PodStatus{ @@ -1541,7 +1563,7 @@ func addPodsToLister(podLister cache.Indexer) { for i := 7; i <= 12; i++ { podLister.Add(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: fmt.Sprintf("pod%v", i), }, Status: corev1.PodStatus{ @@ -1555,7 +1577,7 @@ func addPodsToLister(podLister cache.Indexer) { podLister.Update(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod3", }, Status: corev1.PodStatus{ @@ -1567,7 +1589,7 @@ func addPodsToLister(podLister cache.Indexer) { }) podLister.Update(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod4", }, Status: corev1.PodStatus{ @@ -1579,7 +1601,7 @@ func addPodsToLister(podLister cache.Indexer) { }) podLister.Update(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod7", }, Status: corev1.PodStatus{ @@ -1591,7 +1613,7 @@ func addPodsToLister(podLister cache.Indexer) { }) podLister.Update(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, + Namespace: testServiceNamespace, Name: "pod10", }, Status: corev1.PodStatus{ diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index cf5709e0e3..1cb2f5bd0c 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -84,6 +84,8 @@ type NetworkEndpointsCalculator interface { // CalculateEndpoints computes the NEG endpoints based on service endpoints and the current NEG state and returns a // map of zone name to network endpoint set CalculateEndpoints(eds []EndpointsData, currentMap map[string]NetworkEndpointSet) (map[string]NetworkEndpointSet, EndpointPodMap, int, error) + // CalculateEndpointsDegradedMode computes the NEG endpoints using degraded mode calculation + CalculateEndpointsDegradedMode(eds []EndpointsData, currentMap map[string]NetworkEndpointSet) (map[string]NetworkEndpointSet, EndpointPodMap, error) // Mode indicates the mode that the EndpointsCalculator is operating in. Mode() EndpointsCalculatorMode // ValidateEndpoints validates the NEG endpoint information is correct diff --git a/pkg/neg/types/sync_results.go b/pkg/neg/types/sync_results.go index b32fd57a50..31817adfa0 100644 --- a/pkg/neg/types/sync_results.go +++ b/pkg/neg/types/sync_results.go @@ -15,56 +15,68 @@ package types import "errors" +type Result string + const ( - ResultEPCountsDiffer = "EPCountsDiffer" - ResultEPMissingNodeName = "EPMissingNodeName" - ResultNodeNotFound = "NodeNotFound" - ResultEPMissingZone = "EPMissingZone" - ResultEPSEndpointCountZero = "EPSEndpointCountZero" - ResultEPCalculationCountZero = "EPCalculationCountZero" - ResultInvalidAPIResponse = "InvalidAPIResponse" - ResultInvalidEPAttach = "InvalidEPAttach" - ResultInvalidEPDetach = "InvalidEPDetach" + ResultEPCountsDiffer = Result("EPCountsDiffer") + ResultEPNodeMissing = Result("EPNodeMissing") + ResultEPNodeNotFound = Result("EPNodeNotFound") + ResultEPPodMissing = Result("EPPodMissing") + ResultEPPodNotFound = Result("EPPodNotFound") + ResultEPPodTypeAssertionFailed = Result("EPPodTypeAssertionFailed") + ResultEPZoneMissing = Result("EPZoneMissing") + ResultEPSEndpointCountZero = Result("EPSEndpointCountZero") + ResultEPCalculationCountZero = Result("EPCalculationCountZero") + ResultInvalidAPIResponse = Result("InvalidAPIResponse") + ResultInvalidEPAttach = Result("InvalidEPAttach") + ResultInvalidEPDetach = Result("InvalidEPDetach") // these results have their own errors - ResultNegNotFound = "NegNotFound" - ResultCurrentEPNotFound = "CurrentEPNotFound" - ResultEPSNotFound = "EPSNotFound" - ResultOtherError = "OtherError" - ResultInProgress = "InProgress" - ResultSuccess = "Success" + ResultNegNotFound = Result("NegNotFound") + ResultCurrentNegEPNotFound = Result("CurrentNegEPNotFound") + ResultEPSNotFound = Result("EPSNotFound") + ResultOtherError = Result("OtherError") + ResultInProgress = Result("InProgress") + ResultSuccess = Result("Success") ) var ( - ErrEPCountsDiffer = errors.New("endpoint counts from endpointData and endpointPodMap differ") - ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field") - ErrNodeNotFound = errors.New("failed to retrieve associated zone of node") - ErrEPMissingZone = errors.New("endpoint has empty zone field") - ErrEPSEndpointCountZero = errors.New("endpoint count from endpointData cannot be zero") - ErrEPCalculationCountZero = errors.New("endpoint count from endpointPodMap cannot be zero") - ErrInvalidAPIResponse = errors.New("received response error doesn't match googleapi.Error type") - ErrInvalidEPAttach = errors.New("endpoint information for attach operation is incorrect") - ErrInvalidEPDetach = errors.New("endpoint information for detach operation is incorrect") + ErrEPCountsDiffer = errors.New("endpoint counts from endpointData and endpointPodMap differ") + ErrEPNodeMissing = errors.New("endpoint has missing nodeName field") + ErrEPNodeNotFound = errors.New("endpoint corresponds to an non-existing node") + ErrEPPodMissing = errors.New("endpoint has missing pod field") + ErrEPPodNotFound = errors.New("endpoint corresponds to an non-existing pod") + ErrEPPodTypeAssertionFailed = errors.New("endpoint corresponds to an object that fails pod type assertion") + ErrEPZoneMissing = errors.New("endpoint has missing zone field") + ErrEPSEndpointCountZero = errors.New("endpoint count from endpointData cannot be zero") + ErrEPCalculationCountZero = errors.New("endpoint count from endpointPodMap cannot be zero") + ErrInvalidAPIResponse = errors.New("received response error doesn't match googleapi.Error type") + ErrInvalidEPAttach = errors.New("endpoint information for attach operation is incorrect") + ErrInvalidEPDetach = errors.New("endpoint information for detach operation is incorrect") // use this map for conversion between errors and sync results - ErrorStateResult = map[error]string{ - ErrEPMissingNodeName: ResultEPMissingNodeName, - ErrEPMissingZone: ResultEPMissingZone, - ErrEPCalculationCountZero: ResultEPCalculationCountZero, - ErrEPSEndpointCountZero: ResultEPSEndpointCountZero, - ErrEPCountsDiffer: ResultEPCountsDiffer, - ErrInvalidAPIResponse: ResultInvalidAPIResponse, - ErrInvalidEPAttach: ResultInvalidEPAttach, - ErrInvalidEPDetach: ResultInvalidEPDetach, + ErrorStateResult = map[error]Result{ + ErrEPNodeMissing: ResultEPNodeMissing, + ErrEPNodeNotFound: ResultEPNodeNotFound, + ErrEPPodMissing: ResultEPPodMissing, + ErrEPPodNotFound: ResultEPPodNotFound, + ErrEPPodTypeAssertionFailed: ResultEPPodTypeAssertionFailed, + ErrEPZoneMissing: ResultEPZoneMissing, + ErrEPCalculationCountZero: ResultEPCalculationCountZero, + ErrEPSEndpointCountZero: ResultEPSEndpointCountZero, + ErrEPCountsDiffer: ResultEPCountsDiffer, + ErrInvalidAPIResponse: ResultInvalidAPIResponse, + ErrInvalidEPAttach: ResultInvalidEPAttach, + ErrInvalidEPDetach: ResultInvalidEPDetach, } ) type NegSyncResult struct { Error error - Result string + Result Result } -func NewNegSyncResult(err error, result string) *NegSyncResult { +func NewNegSyncResult(err error, result Result) *NegSyncResult { return &NegSyncResult{ Error: err, Result: result,