diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 9a17804924..2631de9c7d 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -50,19 +50,28 @@ import ( ) var ( + ErrEPCountsDiffer = errors.New("endpoint counts from endpointData and endpointPodMap differ") + ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field") + ErrNodeNotFound = errors.New("failed to retrieve associated zone of node") + ErrEPMissingZone = errors.New("endpoint has empty zone field") + ErrEPSEndpointCountZero = errors.New("endpoint count from endpointData cannot be zero") + ErrEPCalculationCountZero = errors.New("endpoint count from endpointPodMap cannot be zero") + ResultEPCountsDiffer = "EPCountsDiffer" ResultEPMissingNodeName = "EPMissingNodeName" ResultEPMissingZone = "EPMissingZone" - ResultInvalidEPAttach = "InvalidEPAttach" - ResultInvalidEPDetach = "InvalidEPDetach" ResultEPSEndpointCountZero = "EPSEndpointCountZero" ResultEPCalculationCountZero = "EPCalculationCountZero" - ResultNegNotFound = "NegNotFound" - ResultCurrentEPNotFound = "CurrentEPNotFound" - ResultEPSNotFound = "EPSNotFound" - ResultNodeNotFound = "NodeNotFound" - ResultOther = "OtherResult" - ResultSuccess = "Success" + + // these results have their own errors + ResultInvalidEPAttach = "InvalidEPAttach" + ResultInvalidEPDetach = "InvalidEPDetach" + ResultNegNotFound = "NegNotFound" + ResultCurrentEPNotFound = "CurrentEPNotFound" + ResultEPSNotFound = "EPSNotFound" + ResultNodeNotFound = "NodeNotFound" + ResultOther = "OtherResult" + ResultSuccess = "Success" ) type transactionSyncer struct { @@ -120,6 +129,9 @@ type transactionSyncer struct { // 4. Endpoint count from EPS or calculated endpoint list is 0 // Need to grab syncLock first for any reads or writes based on this value inError bool + + // syncCollector collect sync related metrics + syncCollector metrics.SyncerMetricsCollector } func NewTransactionSyncer( @@ -137,6 +149,7 @@ func NewTransactionSyncer( epc negtypes.NetworkEndpointsCalculator, kubeSystemUID string, svcNegClient svcnegclient.Interface, + syncerMetrics *metrics.SyncerMetrics, customName bool, enableEndpointSlices bool, log klog.Logger) negtypes.NegSyncer { @@ -161,6 +174,7 @@ func NewTransactionSyncer( reflector: reflector, kubeSystemUID: kubeSystemUID, svcNegClient: svcNegClient, + syncCollector: syncerMetrics, customName: customName, enableEndpointSlices: enableEndpointSlices, inError: false, @@ -204,14 +218,17 @@ func (s *transactionSyncer) syncInternal() error { defer s.syncLock.Unlock() start := time.Now() - err := s.syncInternalImpl() + result := s.syncInternalImpl() - s.updateStatus(err) - metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start) - return err + s.updateStatus(result.Error) + metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start) + if s.enableEndpointSlices { + s.syncCollector.UpdateSyncer(s.NegSyncerKey, result) + } + return result.Error } -func (s *transactionSyncer) syncInternalImpl() error { +func (s *transactionSyncer) syncInternalImpl() *metrics.NegSyncResult { // TODO(cheungdavid): for now we reset the boolean so it is a no-op, but // in the future, it will be used to trigger degraded mode if the syncer is in error state. if s.inErrorState() { @@ -220,20 +237,20 @@ func (s *transactionSyncer) syncInternalImpl() error { if s.needInit || s.isZoneChange() { if err := s.ensureNetworkEndpointGroups(); err != nil { - return err + return metrics.NewNegSyncResult(err, ResultNegNotFound) } s.needInit = false } if s.syncer.IsStopped() || s.syncer.IsShuttingDown() { s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String()) - return nil + return metrics.NewNegSyncResult(nil, ResultSuccess) } s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode()) currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode()) if err != nil { - return err + return metrics.NewNegSyncResult(err, ResultCurrentEPNotFound) } s.logStats(currentMap, "current NEG endpoints") @@ -249,11 +266,11 @@ func (s *transactionSyncer) syncInternalImpl() error { if s.enableEndpointSlices { slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name)) if err != nil { - return err + return metrics.NewNegSyncResult(err, ResultEPSNotFound) } if len(slices) < 1 { s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync") - return nil + return metrics.NewNegSyncResult(nil, ResultSuccess) } endpointSlices := make([]*discovery.EndpointSlice, len(slices)) for i, slice := range slices { @@ -261,11 +278,17 @@ func (s *transactionSyncer) syncInternalImpl() error { } endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) { + syncResult := s.checkValidEPField(err) + if syncResult.Result == ResultEPMissingNodeName || syncResult.Result == ResultEPMissingZone { s.setErrorState() } - if err != nil { - return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) + if syncResult.Error != nil { + return syncResult + } + syncResult = s.checkEndpointInfo(endpointsData, endpointPodMap, dupCount) + if syncResult.Error != nil { + s.setErrorState() + return syncResult } } else { ep, exists, err := s.endpointLister.Get( @@ -277,16 +300,16 @@ func (s *transactionSyncer) syncInternalImpl() error { }, ) if err != nil { - return err + return metrics.NewNegSyncResult(err, ResultOther) } if !exists { s.logger.Info("Endpoint does not exist. Skipping NEG sync", "endpoint", klog.KRef(s.Namespace, s.Name)) - return nil + return metrics.NewNegSyncResult(nil, ResultSuccess) } endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints)) targetMap, endpointPodMap, _, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) if err != nil { - return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) + return metrics.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), ResultOther) } } @@ -311,12 +334,17 @@ func (s *transactionSyncer) syncInternalImpl() error { if len(addEndpoints) == 0 && len(removeEndpoints) == 0 { s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name) - return nil + return metrics.NewNegSyncResult(nil, ResultSuccess) } s.logEndpoints(addEndpoints, "adding endpoint") s.logEndpoints(removeEndpoints, "removing endpoint") - return s.syncNetworkEndpoints(addEndpoints, removeEndpoints) + err = s.syncNetworkEndpoints(addEndpoints, removeEndpoints) + if err != nil { + return metrics.NewNegSyncResult(err, ResultOther) + } else { + return metrics.NewNegSyncResult(nil, ResultSuccess) + } } // syncLock must already be acquired before execution @@ -375,19 +403,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { return utilerrors.NewAggregate(errList) } -// isValidEndpointInfo checks if endpoint information is correct. -// It returns false if one of the two checks fails: +// checkEndpointInfo checks if endpoint information is correct. +// It returns error and corresponding reason if any of the two checks fails: // // 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap: // endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed // and we compare the endpoint counts with duplicates // 2. The endpoint count from endpointData or the one from endpointPodMap is 0 -func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool { +func (s *transactionSyncer) checkEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) *metrics.NegSyncResult { // Endpoint count from EndpointPodMap countFromPodMap := len(endpointPodMap) + dupCount if countFromPodMap == 0 { s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap) - return false + return metrics.NewNegSyncResult(ErrEPCalculationCountZero, ResultEPCalculationCountZero) } // Endpoint count from EndpointData @@ -397,34 +425,42 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en } if countFromEndpointData == 0 { s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds) - return false + return metrics.NewNegSyncResult(ErrEPSEndpointCountZero, ResultEPSEndpointCountZero) } if countFromEndpointData != countFromPodMap { s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount) - return false + return metrics.NewNegSyncResult(ErrEPCountsDiffer, ResultEPCountsDiffer) } - return true + return metrics.NewNegSyncResult(nil, ResultSuccess) } -// isValidEPField returns false if there is endpoint with missing zone or nodeName -func (s *transactionSyncer) isValidEPField(err error) bool { +// checkValidEPField checks the error from endpoint calculation and return the corresponding syncResult +func (s *transactionSyncer) checkValidEPField(err error) *metrics.NegSyncResult { if errors.Is(err, ErrEPMissingNodeName) { s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err) - return false + return metrics.NewNegSyncResult(err, ResultEPMissingNodeName) } if errors.Is(err, ErrEPMissingZone) { s.logger.Info("Detected unexpected error when checking missing zone", "error", err) - return false + return metrics.NewNegSyncResult(err, ResultEPMissingZone) + } + if errors.Is(err, ErrNodeNotFound) { + return metrics.NewNegSyncResult(err, ResultNodeNotFound) + } + if err != nil { + return metrics.NewNegSyncResult(err, ResultOther) + } else { + return metrics.NewNegSyncResult(nil, ResultSuccess) + } - return true } // isValidEPBatch returns false if the error from endpoint batch response is due to bad request func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool { apiErr, ok := err.(*googleapi.Error) if !ok { - s.logger.Info("Detected error when parsing batch request error", "operation", operation, "error", err) + s.logger.Info("Detected error when parsing batch response error", "operation", operation, "error", err) return false } errCode := apiErr.Code @@ -515,6 +551,14 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err)) if !s.isValidEPBatch(err, operation, networkEndpoints) { s.setErrorState() + var syncResult *metrics.NegSyncResult + if operation == attachOp { + syncResult = metrics.NewNegSyncResult(err, ResultInvalidEPAttach) + } + if operation == detachOp { + syncResult = metrics.NewNegSyncResult(err, ResultInvalidEPDetach) + } + s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult) } } diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index c3f9291587..d120c89a74 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -17,7 +17,6 @@ limitations under the License. package syncers import ( - "errors" "fmt" "strconv" "strings" @@ -48,12 +47,6 @@ const ( separator = "||" ) -var ( - ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field") - ErrNodeNotFound = errors.New("failed to retrieve associated zone of node") - ErrEPMissingZone = errors.New("endpoint has empty zone field") -) - // encodeEndpoint encodes ip and instance into a single string func encodeEndpoint(ip, instance, port string) string { return strings.Join([]string{ip, instance, port}, separator)