diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 6e1eb288c4..e94fbc0398 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -149,10 +149,12 @@ func NewController( err := scheme.AddToScheme(negScheme) if err != nil { logger.Error(err, "Errored adding default scheme to event recorder") + metrics.PublishNegControllerErrorCountMetrics(err, true) } err = svcnegv1beta1.AddToScheme(negScheme) if err != nil { logger.Error(err, "Errored adding NEG CRD scheme to event recorder") + metrics.PublishNegControllerErrorCountMetrics(err, true) } recorder := eventBroadcaster.NewRecorder(negScheme, apiv1.EventSource{Component: "neg-controller"}) @@ -401,6 +403,7 @@ func (c *Controller) processEndpoint(key string) { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { c.logger.Error(err, "Failed to split endpoint namespaced key", "key", key) + metrics.PublishNegControllerErrorCountMetrics(err, true) return } c.manager.Sync(namespace, name) @@ -416,6 +419,7 @@ func (c *Controller) serviceWorker() { defer c.serviceQueue.Done(key) err := c.processService(key.(string)) c.handleErr(err, key) + metrics.PublishNegControllerErrorCountMetrics(err, false) }() } } @@ -713,6 +717,7 @@ func (c *Controller) handleErr(err error, key interface{}) { c.logger.Error(nil, msg) if service, exists, err := c.serviceLister.GetByKey(key.(string)); err != nil { c.logger.Error(err, "Failed to retrieve service from store", "service", key.(string)) + metrics.PublishNegControllerErrorCountMetrics(err, true) } else if exists { c.recorder.Eventf(service.(*apiv1.Service), apiv1.EventTypeWarning, "ProcessServiceFailed", msg) } @@ -735,6 +740,7 @@ func (c *Controller) enqueueEndpointSlice(obj interface{}) { key, err := endpointslices.EndpointSlicesServiceKey(endpointSlice) if err != nil { c.logger.Error(err, "Failed to find a service label inside endpoint slice", "endpointSlice", klog.KObj(endpointSlice)) + metrics.PublishNegControllerErrorCountMetrics(err, true) return } c.logger.V(3).Info("Adding EndpointSlice to endpointQueue for processing", "endpointSlice", key) @@ -745,6 +751,7 @@ func (c *Controller) enqueueNode(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { c.logger.Error(err, "Failed to generate node key") + metrics.PublishNegControllerErrorCountMetrics(err, true) return } c.logger.V(3).Info("Adding Node to nodeQueue for processing", "node", key) @@ -755,6 +762,7 @@ func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { c.logger.Error(err, "Failed to generate service key") + metrics.PublishNegControllerErrorCountMetrics(err, true) return } c.logger.V(3).Info("Adding Service to serviceQueue for processing", "service", key) @@ -777,6 +785,7 @@ func (c *Controller) enqueueIngressServices(ing *v1.Ingress) { func (c *Controller) gc() { if err := c.manager.GC(); err != nil { c.logger.Error(err, "NEG controller garbage collection failed") + metrics.PublishNegControllerErrorCountMetrics(err, true) } } diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index a75d21e759..34a8277541 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -349,6 +349,7 @@ func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate zones, err := zoneGetter.ListZones(candidateNodePredicate) if err != nil { logger.Error(err, "Unable to list zones") + metrics.PublishNegControllerErrorCountMetrics(err, true) return false } @@ -407,6 +408,7 @@ func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabe obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key()) if err != nil { manager.logger.Error(err, "Failed to retrieve service from store", "service", svcKey.Key()) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } @@ -682,6 +684,7 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string if err != nil { if utils.IsNotFoundError(err) || utils.IsHTTPErrorCode(err, http.StatusBadRequest) { manager.logger.V(2).Info("Ignoring error when querying for neg during GC", "negName", name, "zone", zone, "err", err) + metrics.PublishNegControllerErrorCountMetrics(err, true) return nil } return err diff --git a/pkg/neg/metrics/metrics.go b/pkg/neg/metrics/metrics.go index 5e73bd38af..6378d64bae 100644 --- a/pkg/neg/metrics/metrics.go +++ b/pkg/neg/metrics/metrics.go @@ -21,6 +21,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "k8s.io/ingress-gce/pkg/utils" ) const ( @@ -49,6 +50,12 @@ const ( ipv6EndpointType = "IPv6" dualStackEndpointType = "DualStack" migrationEndpointType = "Migration" + + gceServerError = "GCE_server_error" + k8sServerError = "K8s_server_error" + ignoredError = "ignored_error" + otherError = "other_error" + totalNegError = "total_neg_error" ) type syncType string @@ -211,6 +218,17 @@ var ( }, []string{"endpoint_type"}, ) + + // NegControllerErrorCount tracks the count of server errors(GCE/K8s) and + // all errors from NEG controller. + NegControllerErrorCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: negControllerSubsystem, + Name: "error_count", + Help: "Counts of server errors and NEG controller errors.", + }, + []string{"error_type"}, + ) ) var register sync.Once @@ -234,6 +252,7 @@ func RegisterMetrics() { prometheus.MustRegister(DualStackMigrationLongestUnfinishedDuration) prometheus.MustRegister(DualStackMigrationServiceCount) prometheus.MustRegister(SyncerCountByEndpointType) + prometheus.MustRegister(NegControllerErrorCount) RegisterSyncerMetrics() }) @@ -279,9 +298,32 @@ func PublishDegradedModeCorrectnessMetrics(count int, endpointType string, negTy DegradeModeCorrectness.WithLabelValues(negType, endpointType).Observe(float64(count)) } +// PublishNegControllerErrorCountMetrics publishes collected metrics +// for neg controller errors. +func PublishNegControllerErrorCountMetrics(err error, isIgnored bool) { + if err == nil { + return + } + NegControllerErrorCount.WithLabelValues(totalNegError).Inc() + NegControllerErrorCount.WithLabelValues(getErrorLabel(err, isIgnored)).Inc() +} + func getResult(err error) string { if err != nil { return resultError } return resultSuccess } + +func getErrorLabel(err error, isIgnored bool) string { + if isIgnored { + return ignoredError + } + if utils.IsGCEServerError(err) { + return gceServerError + } + if utils.IsK8sServerError(err) { + return k8sServerError + } + return otherError +} diff --git a/pkg/neg/readiness/poller.go b/pkg/neg/readiness/poller.go index 36f71aff81..509e88b112 100644 --- a/pkg/neg/readiness/poller.go +++ b/pkg/neg/readiness/poller.go @@ -28,6 +28,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/neg/metrics" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -173,6 +174,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) { } retry, err = p.processHealthStatus(key, res) + metrics.PublishNegControllerErrorCountMetrics(err, true) if retry { <-p.clock.After(hcRetryDelay) } @@ -293,6 +295,7 @@ func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthS id, err := cloud.ParseResourceURL(hs.BackendService.BackendService) if err != nil { logger.Error(err, "Failed to parse backend service reference from a Network Endpoint health status", "healthStatus", healthStatus) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } if id != nil { diff --git a/pkg/neg/readiness/reflector.go b/pkg/neg/readiness/reflector.go index 7f13c8c9ca..6e2eee886e 100644 --- a/pkg/neg/readiness/reflector.go +++ b/pkg/neg/readiness/reflector.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/ingress-gce/pkg/neg/metrics" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/neg/types/shared" "k8s.io/klog/v2" @@ -225,6 +226,7 @@ func (r *readinessReflector) SyncPod(pod *v1.Pod) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod) if err != nil { r.logger.Error(err, "Failed to generate pod key") + metrics.PublishNegControllerErrorCountMetrics(err, true) return } @@ -260,6 +262,7 @@ func (r *readinessReflector) pollNeg(key negMeta) { retry, err := r.poller.Poll(key) if err != nil { r.logger.Error(err, "Failed to poll neg", "neg", key) + metrics.PublishNegControllerErrorCountMetrics(err, true) } if retry { r.poll() diff --git a/pkg/neg/readiness/utils.go b/pkg/neg/readiness/utils.go index 5555a76422..19789fab0e 100644 --- a/pkg/neg/readiness/utils.go +++ b/pkg/neg/readiness/utils.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/neg/metrics" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/neg/types/shared" "k8s.io/ingress-gce/pkg/utils/patch" @@ -136,6 +137,7 @@ func removeIrrelevantEndpoints(endpointMap negtypes.EndpointPodMap, podLister ca pod, exists, err := getPodFromStore(podLister, namespacedName.Namespace, namespacedName.Name) if err != nil { klog.Warningf("Failed to retrieve pod %q from store: %v", namespacedName.String(), err) + metrics.PublishNegControllerErrorCountMetrics(err, true) } if err == nil && exists && needToProcess(pod) { continue diff --git a/pkg/neg/syncers/endpoints_calculator.go b/pkg/neg/syncers/endpoints_calculator.go index 913525d328..88869e0e57 100644 --- a/pkg/neg/syncers/endpoints_calculator.go +++ b/pkg/neg/syncers/endpoints_calculator.go @@ -87,6 +87,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints node, err := l.nodeLister.Get(*addr.NodeName) if err != nil { l.logger.Error(err, "failed to retrieve node object", "nodeName", *addr.NodeName) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } if ok := candidateNodeCheck(node); !ok { @@ -96,6 +97,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints zone, err := l.zoneGetter.GetZoneForNode(node.Name) if err != nil { l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } zoneNodeMap[zone] = append(zoneNodeMap[zone], node) @@ -164,6 +166,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints zone, err := l.zoneGetter.GetZoneForNode(node.Name) if err != nil { l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } zoneNodeMap[zone] = append(zoneNodeMap[zone], node) diff --git a/pkg/neg/syncers/syncer.go b/pkg/neg/syncers/syncer.go index 9343117bed..aca6614b05 100644 --- a/pkg/neg/syncers/syncer.go +++ b/pkg/neg/syncers/syncer.go @@ -24,6 +24,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/neg/metrics" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -89,6 +90,7 @@ func (s *syncer) Start() error { retryCh := make(<-chan time.Time) err := s.core.sync() if err != nil { + metrics.PublishNegControllerErrorCountMetrics(err, false) delay, retryErr := s.backoff.NextRetryDelay() retryMsg := "" if retryErr == ErrRetriesExceeded { diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 56582b56ef..963beb5cfd 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -573,6 +573,7 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[ // This is to prevent if the NEG object is deleted or misconfigured by user s.needInit = true needRetry = true + metrics.PublishNegControllerErrorCountMetrics(err, false) } for networkEndpoint := range networkEndpointMap { @@ -588,6 +589,7 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[ if needRetry { if retryErr := s.retry.Retry(); retryErr != nil { s.recordEvent(apiv1.EventTypeWarning, "RetryFailed", fmt.Sprintf("Failed to retry NEG sync for %q: %v", s.NegSyncerKey.String(), retryErr)) + metrics.PublishNegControllerErrorCountMetrics(retryErr, false) } return } @@ -624,6 +626,7 @@ func (s *transactionSyncer) isZoneChange() bool { negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) if err != nil { s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName)) + metrics.PublishNegControllerErrorCountMetrics(err, true) return false } @@ -632,6 +635,7 @@ func (s *transactionSyncer) isZoneChange() bool { id, err := cloud.ParseResourceURL(ref.SelfLink) if err != nil { s.logger.Error(err, "unable to parse selflink", "selfLink", ref.SelfLink) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } existingZones.Insert(id.Key.Zone) @@ -640,6 +644,7 @@ func (s *transactionSyncer) isZoneChange() bool { zones, err := s.zoneGetter.ListZones(negtypes.NodePredicateForEndpointCalculatorMode(s.EpCalculatorMode)) if err != nil { s.logger.Error(err, "unable to list zones") + metrics.PublishNegControllerErrorCountMetrics(err, true) return false } currZones := sets.NewString(zones...) @@ -712,6 +717,7 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) if err != nil { s.logger.Error(err, "Error updating init status for neg, failed to get neg from store.") + metrics.PublishNegControllerErrorCountMetrics(err, true) return } @@ -728,6 +734,7 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe _, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.NegSyncerKey.NegName) if err != nil { s.logger.Error(err, "Error updating Neg CR") + metrics.PublishNegControllerErrorCountMetrics(err, true) } } @@ -739,6 +746,7 @@ func (s *transactionSyncer) updateStatus(syncErr error) { origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) if err != nil { s.logger.Error(err, "Error updating status for neg, failed to get neg from store") + metrics.PublishNegControllerErrorCountMetrics(err, true) return } neg := origNeg.DeepCopy() @@ -759,6 +767,7 @@ func (s *transactionSyncer) updateStatus(syncErr error) { _, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.NegSyncerKey.NegName) if err != nil { s.logger.Error(err, "Error updating Neg CR") + metrics.PublishNegControllerErrorCountMetrics(err, true) } } @@ -775,6 +784,7 @@ func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.Endp negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName) if err != nil { s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName)) + metrics.PublishNegControllerErrorCountMetrics(err, true) return } lastSyncTimestamp := negCR.Status.LastSyncTime @@ -906,6 +916,7 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en if err != nil || !ok { metrics.PublishLabelPropagationError(labels.OtherError) logger.Error(err, "getEndpointPodLabelMap: error getting pod", "pod", key, "exist", ok) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } pod, ok := obj.(*v1.Pod) @@ -917,6 +928,7 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en labelMap, err := labels.GetPodLabelMap(pod, lpConfig) if err != nil { recorder.Eventf(pod, apiv1.EventTypeWarning, "LabelsExceededLimit", "Label Propagation Error: %v", err) + metrics.PublishNegControllerErrorCountMetrics(err, true) } endpointPodLabelMap[endpoint] = labelMap } diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index e6f0ccb656..4bed990fd4 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -33,6 +33,7 @@ import ( negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/flags" + "k8s.io/ingress-gce/pkg/neg/metrics" "k8s.io/ingress-gce/pkg/neg/syncers/labels" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/network" @@ -116,6 +117,7 @@ func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Serv } if err != nil { klog.Errorf("Failed to retrieve service %s/%s from store: %v", namespace, name, err) + metrics.PublishNegControllerErrorCountMetrics(err, true) } return nil } @@ -130,6 +132,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService return negRef, err } klog.V(4).Infof("Neg %q in zone %q was not found: %s", negName, zone, err) + metrics.PublishNegControllerErrorCountMetrics(err, true) } needToCreate := false @@ -419,6 +422,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett pod, getPodStat, getPodErr := getEndpointPod(endpointAddress, podLister) if getPodErr != nil { klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr) + metrics.PublishNegControllerErrorCountMetrics(getPodErr, true) for state, count := range getPodStat { localEPCount[state] += count } @@ -433,6 +437,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett zone, getZoneErr := zoneGetter.GetZoneForNode(nodeName) if getZoneErr != nil { klog.Errorf("For endpoint %q in pod %q, its corresponding node %q does not have valid zone information: %w, skipping", endpointAddress.Addresses, pod.ObjectMeta.Name, nodeName, getZoneErr) + metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true) localEPCount[negtypes.ZoneMissing]++ continue } @@ -457,12 +462,14 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett checkIPErr := podContainsEndpointAddress(networkEndpoint, pod) if checkIPErr != nil { klog.Errorf("Endpoint %q in Endpoints %s/%s has IP(s) not match to its pod %s: %w, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, pod.Name, checkIPErr) + metrics.PublishNegControllerErrorCountMetrics(checkIPErr, true) localEPCount[negtypes.IPNotFromPod] += 1 continue } validatePodStat, validateErr := validatePod(pod, nodeLister, serviceLister, networkEndpoint, serviceName, isCustomEPS) if validateErr != nil { klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, validateErr) + metrics.PublishNegControllerErrorCountMetrics(validateErr, true) for state, count := range validatePodStat { localEPCount[state] += count } @@ -600,6 +607,7 @@ func nodeContainsPodIP(node *apiv1.Node, networkEndpoint negtypes.NetworkEndpoin _, ipnet, err := net.ParseCIDR(podCIDR) if err != nil { // swallow errors for CIDRs that are invalid + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } ipnets = append(ipnets, ipnet) @@ -661,6 +669,7 @@ func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes. // NEG not found in a candidate zone is an error. if utils.IsNotFoundError(err) && !candidateZonesMap.Has(zone) { klog.Infof("Ignoring NotFound error for NEG %q in zone %q", negName, zone) + metrics.PublishNegControllerErrorCountMetrics(err, true) continue } return nil, nil, fmt.Errorf("Failed to lookup NEG in zone %q, candidate zones %v, err - %v", zone, candidateZonesMap, err) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index b03cd1769a..cfc060165a 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -268,6 +268,40 @@ func GetErrorType(err error) string { return "" } +// IsGCEServerError returns true if the error is GCE server error +func IsGCEServerError(err error) bool { + if err == nil { + return false + } + var gerr *googleapi.Error + if !errors.As(err, &gerr) { + return false + } + for { + if apiErr, ok := err.(*googleapi.Error); ok { + return apiErr.Code >= http.StatusInternalServerError + } + err = errors.Unwrap(err) + } +} + +// IsK8sServerError returns true if the error is K8s server error +func IsK8sServerError(err error) bool { + if err == nil { + return false + } + var k8serr *k8serrors.StatusError + if !errors.As(err, &k8serr) { + return false + } + for { + if apiErr, ok := err.(*k8serrors.StatusError); ok { + return apiErr.ErrStatus.Code >= http.StatusInternalServerError + } + err = errors.Unwrap(err) + } +} + // PrettyJson marshals an object in a human-friendly format. func PrettyJson(data interface{}) (string, error) { buffer := new(bytes.Buffer)