Skip to content

Commit

Permalink
Add NegControllerErrorCount metrics
Browse files Browse the repository at this point in the history
Add NegControllerErrorCount metrics to track the count of all errors
from NEG controller, and counts of server errors from GCE/K8s.
  • Loading branch information
sawsa307 committed May 17, 2023
1 parent c7c7171 commit 2a5ba46
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ func NewController(
err := scheme.AddToScheme(negScheme)
if err != nil {
logger.Error(err, "Errored adding default scheme to event recorder")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
err = svcnegv1beta1.AddToScheme(negScheme)
if err != nil {
logger.Error(err, "Errored adding NEG CRD scheme to event recorder")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
recorder := eventBroadcaster.NewRecorder(negScheme,
apiv1.EventSource{Component: "neg-controller"})
Expand Down Expand Up @@ -401,6 +403,7 @@ func (c *Controller) processEndpoint(key string) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.logger.Error(err, "Failed to split endpoint namespaced key", "key", key)
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.manager.Sync(namespace, name)
Expand All @@ -416,6 +419,7 @@ func (c *Controller) serviceWorker() {
defer c.serviceQueue.Done(key)
err := c.processService(key.(string))
c.handleErr(err, key)
metrics.PublishNegControllerErrorCountMetrics(err, false)
}()
}
}
Expand Down Expand Up @@ -713,6 +717,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
c.logger.Error(nil, msg)
if service, exists, err := c.serviceLister.GetByKey(key.(string)); err != nil {
c.logger.Error(err, "Failed to retrieve service from store", "service", key.(string))
metrics.PublishNegControllerErrorCountMetrics(err, true)
} else if exists {
c.recorder.Eventf(service.(*apiv1.Service), apiv1.EventTypeWarning, "ProcessServiceFailed", msg)
}
Expand All @@ -735,6 +740,7 @@ func (c *Controller) enqueueEndpointSlice(obj interface{}) {
key, err := endpointslices.EndpointSlicesServiceKey(endpointSlice)
if err != nil {
c.logger.Error(err, "Failed to find a service label inside endpoint slice", "endpointSlice", klog.KObj(endpointSlice))
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding EndpointSlice to endpointQueue for processing", "endpointSlice", key)
Expand All @@ -745,6 +751,7 @@ func (c *Controller) enqueueNode(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "Failed to generate node key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding Node to nodeQueue for processing", "node", key)
Expand All @@ -755,6 +762,7 @@ func (c *Controller) enqueueService(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "Failed to generate service key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding Service to serviceQueue for processing", "service", key)
Expand All @@ -777,6 +785,7 @@ func (c *Controller) enqueueIngressServices(ing *v1.Ingress) {
func (c *Controller) gc() {
if err := c.manager.GC(); err != nil {
c.logger.Error(err, "NEG controller garbage collection failed")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate
zones, err := zoneGetter.ListZones(candidateNodePredicate)
if err != nil {
logger.Error(err, "Unable to list zones")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return false
}

Expand Down Expand Up @@ -407,6 +408,7 @@ func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabe
obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key())
if err != nil {
manager.logger.Error(err, "Failed to retrieve service from store", "service", svcKey.Key())
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}

Expand Down Expand Up @@ -682,6 +684,7 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
if err != nil {
if utils.IsNotFoundError(err) || utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
manager.logger.V(2).Info("Ignoring error when querying for neg during GC", "negName", name, "zone", zone, "err", err)
metrics.PublishNegControllerErrorCountMetrics(err, true)
return nil
}
return err
Expand Down
42 changes: 42 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-gce/pkg/utils"
)

const (
Expand Down Expand Up @@ -49,6 +50,12 @@ const (
ipv6EndpointType = "IPv6"
dualStackEndpointType = "DualStack"
migrationEndpointType = "Migration"

gceServerError = "GCE_server_error"
k8sServerError = "K8s_server_error"
ignoredControllerError = "ignored_controller_error"
otherControllerError = "other_controller_error"
totalNegControllerError = "total_neg_controller_error"
)

type syncType string
Expand Down Expand Up @@ -211,6 +218,17 @@ var (
},
[]string{"endpoint_type"},
)

// NegControllerErrorCount tracks the count of server errors(GCE/K8s) and
// all errors from NEG controller.
NegControllerErrorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: negControllerSubsystem,
Name: "error_count",
Help: "Counts of server errors and NEG controller errors.",
},
[]string{"error_type"},
)
)

var register sync.Once
Expand All @@ -234,6 +252,7 @@ func RegisterMetrics() {
prometheus.MustRegister(DualStackMigrationLongestUnfinishedDuration)
prometheus.MustRegister(DualStackMigrationServiceCount)
prometheus.MustRegister(SyncerCountByEndpointType)
prometheus.MustRegister(NegControllerErrorCount)

RegisterSyncerMetrics()
})
Expand Down Expand Up @@ -279,9 +298,32 @@ func PublishDegradedModeCorrectnessMetrics(count int, endpointType string, negTy
DegradeModeCorrectness.WithLabelValues(negType, endpointType).Observe(float64(count))
}

// PublishNegControllerErrorCountMetrics publishes collected metrics
// for neg controller errors.
func PublishNegControllerErrorCountMetrics(err error, isIgnored bool) {
if err == nil {
return
}
NegControllerErrorCount.WithLabelValues(totalNegControllerError).Inc()
NegControllerErrorCount.WithLabelValues(getErrorLabel(err, isIgnored)).Inc()
}

func getResult(err error) string {
if err != nil {
return resultError
}
return resultSuccess
}

func getErrorLabel(err error, isIgnored bool) string {
if isIgnored {
return ignoredControllerError
}
if utils.IsGCEServerError(err) {
return gceServerError
}
if utils.IsK8sServerError(err) {
return k8sServerError
}
return otherControllerError
}
3 changes: 3 additions & 0 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -173,6 +174,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
}

retry, err = p.processHealthStatus(key, res)
metrics.PublishNegControllerErrorCountMetrics(err, true)
if retry {
<-p.clock.After(hcRetryDelay)
}
Expand Down Expand Up @@ -293,6 +295,7 @@ func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthS
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
logger.Error(err, "Failed to parse backend service reference from a Network Endpoint health status", "healthStatus", healthStatus)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
if id != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/readiness/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -225,6 +226,7 @@ func (r *readinessReflector) SyncPod(pod *v1.Pod) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod)
if err != nil {
r.logger.Error(err, "Failed to generate pod key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}

Expand Down Expand Up @@ -260,6 +262,7 @@ func (r *readinessReflector) pollNeg(key negMeta) {
retry, err := r.poller.Poll(key)
if err != nil {
r.logger.Error(err, "Failed to poll neg", "neg", key)
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
if retry {
r.poll()
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/readiness/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils/patch"
Expand Down Expand Up @@ -136,6 +137,7 @@ func removeIrrelevantEndpoints(endpointMap negtypes.EndpointPodMap, podLister ca
pod, exists, err := getPodFromStore(podLister, namespacedName.Namespace, namespacedName.Name)
if err != nil {
klog.Warningf("Failed to retrieve pod %q from store: %v", namespacedName.String(), err)
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
if err == nil && exists && needToProcess(pod) {
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/syncers/dualstack/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ func (d *Migrator) Continue(err error) {
}

if err != nil {
metrics.PublishNegControllerErrorCountMetrics(err, true)
// NEG Detach failed; unpause immediately.
d.paused = false
return
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
node, err := l.nodeLister.Get(*addr.NodeName)
if err != nil {
l.logger.Error(err, "failed to retrieve node object", "nodeName", *addr.NodeName)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
if ok := candidateNodeCheck(node); !ok {
Expand All @@ -96,6 +97,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
zoneNodeMap[zone] = append(zoneNodeMap[zone], node)
Expand Down Expand Up @@ -164,6 +166,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
zoneNodeMap[zone] = append(zoneNodeMap[zone], node)
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/syncers/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (s *syncer) Start() error {
retryCh := make(<-chan time.Time)
err := s.core.sync()
if err != nil {
metrics.PublishNegControllerErrorCountMetrics(err, false)
delay, retryErr := s.backoff.NextRetryDelay()
retryMsg := ""
if retryErr == ErrRetriesExceeded {
Expand Down
12 changes: 12 additions & 0 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[
// This is to prevent if the NEG object is deleted or misconfigured by user
s.needInit = true
needRetry = true
metrics.PublishNegControllerErrorCountMetrics(err, true)
}

for networkEndpoint := range networkEndpointMap {
Expand All @@ -588,6 +589,7 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[
if needRetry {
if retryErr := s.retry.Retry(); retryErr != nil {
s.recordEvent(apiv1.EventTypeWarning, "RetryFailed", fmt.Sprintf("Failed to retry NEG sync for %q: %v", s.NegSyncerKey.String(), retryErr))
metrics.PublishNegControllerErrorCountMetrics(retryErr, false)
}
return
}
Expand Down Expand Up @@ -624,6 +626,7 @@ func (s *transactionSyncer) isZoneChange() bool {
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
metrics.PublishNegControllerErrorCountMetrics(err, true)
return false
}

Expand All @@ -632,6 +635,7 @@ func (s *transactionSyncer) isZoneChange() bool {
id, err := cloud.ParseResourceURL(ref.SelfLink)
if err != nil {
s.logger.Error(err, "unable to parse selflink", "selfLink", ref.SelfLink)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
existingZones.Insert(id.Key.Zone)
Expand All @@ -640,6 +644,7 @@ func (s *transactionSyncer) isZoneChange() bool {
zones, err := s.zoneGetter.ListZones(negtypes.NodePredicateForEndpointCalculatorMode(s.EpCalculatorMode))
if err != nil {
s.logger.Error(err, "unable to list zones")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return false
}
currZones := sets.NewString(zones...)
Expand Down Expand Up @@ -712,6 +717,7 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating init status for neg, failed to get neg from store.")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}

Expand All @@ -728,6 +734,7 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
_, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating Neg CR")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
}

Expand All @@ -739,6 +746,7 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating status for neg, failed to get neg from store")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
neg := origNeg.DeepCopy()
Expand All @@ -759,6 +767,7 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
_, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating Neg CR")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
}

Expand All @@ -775,6 +784,7 @@ func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.Endp
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
lastSyncTimestamp := negCR.Status.LastSyncTime
Expand Down Expand Up @@ -906,6 +916,7 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
if err != nil || !ok {
metrics.PublishLabelPropagationError(labels.OtherError)
logger.Error(err, "getEndpointPodLabelMap: error getting pod", "pod", key, "exist", ok)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
pod, ok := obj.(*v1.Pod)
Expand All @@ -917,6 +928,7 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
labelMap, err := labels.GetPodLabelMap(pod, lpConfig)
if err != nil {
recorder.Eventf(pod, apiv1.EventTypeWarning, "LabelsExceededLimit", "Label Propagation Error: %v", err)
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
endpointPodLabelMap[endpoint] = labelMap
}
Expand Down
Loading

0 comments on commit 2a5ba46

Please sign in to comment.