Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track GCE/K8s server error #2097

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ func NewController(
err := scheme.AddToScheme(negScheme)
if err != nil {
logger.Error(err, "Errored adding default scheme to event recorder")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
err = svcnegv1beta1.AddToScheme(negScheme)
if err != nil {
logger.Error(err, "Errored adding NEG CRD scheme to event recorder")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
recorder := eventBroadcaster.NewRecorder(negScheme,
apiv1.EventSource{Component: "neg-controller"})
Expand Down Expand Up @@ -401,6 +403,7 @@ func (c *Controller) processEndpoint(key string) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.logger.Error(err, "Failed to split endpoint namespaced key", "key", key)
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.manager.Sync(namespace, name)
Expand All @@ -416,6 +419,7 @@ func (c *Controller) serviceWorker() {
defer c.serviceQueue.Done(key)
err := c.processService(key.(string))
c.handleErr(err, key)
metrics.PublishNegControllerErrorCountMetrics(err, false)
}()
}
}
Expand Down Expand Up @@ -713,6 +717,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
c.logger.Error(nil, msg)
if service, exists, err := c.serviceLister.GetByKey(key.(string)); err != nil {
c.logger.Error(err, "Failed to retrieve service from store", "service", key.(string))
metrics.PublishNegControllerErrorCountMetrics(err, true)
} else if exists {
c.recorder.Eventf(service.(*apiv1.Service), apiv1.EventTypeWarning, "ProcessServiceFailed", msg)
}
Expand All @@ -735,6 +740,7 @@ func (c *Controller) enqueueEndpointSlice(obj interface{}) {
key, err := endpointslices.EndpointSlicesServiceKey(endpointSlice)
if err != nil {
c.logger.Error(err, "Failed to find a service label inside endpoint slice", "endpointSlice", klog.KObj(endpointSlice))
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding EndpointSlice to endpointQueue for processing", "endpointSlice", key)
Expand All @@ -745,6 +751,7 @@ func (c *Controller) enqueueNode(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "Failed to generate node key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding Node to nodeQueue for processing", "node", key)
Expand All @@ -755,6 +762,7 @@ func (c *Controller) enqueueService(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "Failed to generate service key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding Service to serviceQueue for processing", "service", key)
Expand All @@ -777,6 +785,7 @@ func (c *Controller) enqueueIngressServices(ing *v1.Ingress) {
func (c *Controller) gc() {
if err := c.manager.GC(); err != nil {
c.logger.Error(err, "NEG controller garbage collection failed")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate
zones, err := zoneGetter.ListZones(candidateNodePredicate)
if err != nil {
logger.Error(err, "Unable to list zones")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return false
}

Expand Down Expand Up @@ -407,6 +408,7 @@ func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabe
obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key())
if err != nil {
manager.logger.Error(err, "Failed to retrieve service from store", "service", svcKey.Key())
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}

Expand Down Expand Up @@ -682,6 +684,7 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
if err != nil {
if utils.IsNotFoundError(err) || utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
manager.logger.V(2).Info("Ignoring error when querying for neg during GC", "negName", name, "zone", zone, "err", err)
metrics.PublishNegControllerErrorCountMetrics(err, true)
return nil
}
return err
Expand Down
42 changes: 42 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-gce/pkg/utils"
)

const (
Expand Down Expand Up @@ -49,6 +50,12 @@ const (
ipv6EndpointType = "IPv6"
dualStackEndpointType = "DualStack"
migrationEndpointType = "Migration"

gceServerError = "GCE_server_error"
k8sServerError = "K8s_server_error"
ignoredError = "ignored_error"
otherError = "other_error"
totalNegError = "total_neg_error"
)

type syncType string
Expand Down Expand Up @@ -211,6 +218,17 @@ var (
},
[]string{"endpoint_type"},
)

// NegControllerErrorCount tracks the count of server errors(GCE/K8s) and
// all errors from NEG controller.
NegControllerErrorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: negControllerSubsystem,
Name: "error_count",
Help: "Counts of server errors and NEG controller errors.",
},
[]string{"error_type"},
)
)

var register sync.Once
Expand All @@ -234,6 +252,7 @@ func RegisterMetrics() {
prometheus.MustRegister(DualStackMigrationLongestUnfinishedDuration)
prometheus.MustRegister(DualStackMigrationServiceCount)
prometheus.MustRegister(SyncerCountByEndpointType)
prometheus.MustRegister(NegControllerErrorCount)

RegisterSyncerMetrics()
})
Expand Down Expand Up @@ -279,9 +298,32 @@ func PublishDegradedModeCorrectnessMetrics(count int, endpointType string, negTy
DegradeModeCorrectness.WithLabelValues(negType, endpointType).Observe(float64(count))
}

// PublishNegControllerErrorCountMetrics publishes collected metrics
// for neg controller errors.
func PublishNegControllerErrorCountMetrics(err error, isIgnored bool) {
if err == nil {
return
}
NegControllerErrorCount.WithLabelValues(totalNegError).Inc()
NegControllerErrorCount.WithLabelValues(getErrorLabel(err, isIgnored)).Inc()
}

func getResult(err error) string {
if err != nil {
return resultError
}
return resultSuccess
}

func getErrorLabel(err error, isIgnored bool) string {
if isIgnored {
return ignoredError
}
if utils.IsGCEServerError(err) {
return gceServerError
}
if utils.IsK8sServerError(err) {
return k8sServerError
}
return otherError
}
3 changes: 3 additions & 0 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -173,6 +174,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
}

retry, err = p.processHealthStatus(key, res)
metrics.PublishNegControllerErrorCountMetrics(err, true)
if retry {
<-p.clock.After(hcRetryDelay)
}
Expand Down Expand Up @@ -293,6 +295,7 @@ func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthS
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
logger.Error(err, "Failed to parse backend service reference from a Network Endpoint health status", "healthStatus", healthStatus)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
if id != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/readiness/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -225,6 +226,7 @@ func (r *readinessReflector) SyncPod(pod *v1.Pod) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod)
if err != nil {
r.logger.Error(err, "Failed to generate pod key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}

Expand Down Expand Up @@ -260,6 +262,7 @@ func (r *readinessReflector) pollNeg(key negMeta) {
retry, err := r.poller.Poll(key)
if err != nil {
r.logger.Error(err, "Failed to poll neg", "neg", key)
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
if retry {
r.poll()
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/readiness/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils/patch"
Expand Down Expand Up @@ -136,6 +137,7 @@ func removeIrrelevantEndpoints(endpointMap negtypes.EndpointPodMap, podLister ca
pod, exists, err := getPodFromStore(podLister, namespacedName.Namespace, namespacedName.Name)
if err != nil {
klog.Warningf("Failed to retrieve pod %q from store: %v", namespacedName.String(), err)
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
if err == nil && exists && needToProcess(pod) {
continue
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
node, err := l.nodeLister.Get(*addr.NodeName)
if err != nil {
l.logger.Error(err, "failed to retrieve node object", "nodeName", *addr.NodeName)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
if ok := candidateNodeCheck(node); !ok {
Expand All @@ -96,6 +97,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
zoneNodeMap[zone] = append(zoneNodeMap[zone], node)
Expand Down Expand Up @@ -164,6 +166,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
zone, err := l.zoneGetter.GetZoneForNode(node.Name)
if err != nil {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
zoneNodeMap[zone] = append(zoneNodeMap[zone], node)
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/syncers/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (s *syncer) Start() error {
retryCh := make(<-chan time.Time)
err := s.core.sync()
if err != nil {
metrics.PublishNegControllerErrorCountMetrics(err, false)
delay, retryErr := s.backoff.NextRetryDelay()
retryMsg := ""
if retryErr == ErrRetriesExceeded {
Expand Down
12 changes: 12 additions & 0 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[
// This is to prevent if the NEG object is deleted or misconfigured by user
s.needInit = true
needRetry = true
metrics.PublishNegControllerErrorCountMetrics(err, false)
}

for networkEndpoint := range networkEndpointMap {
Expand All @@ -588,6 +589,7 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[
if needRetry {
if retryErr := s.retry.Retry(); retryErr != nil {
s.recordEvent(apiv1.EventTypeWarning, "RetryFailed", fmt.Sprintf("Failed to retry NEG sync for %q: %v", s.NegSyncerKey.String(), retryErr))
metrics.PublishNegControllerErrorCountMetrics(retryErr, false)
}
return
}
Expand Down Expand Up @@ -624,6 +626,7 @@ func (s *transactionSyncer) isZoneChange() bool {
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
metrics.PublishNegControllerErrorCountMetrics(err, true)
return false
}

Expand All @@ -632,6 +635,7 @@ func (s *transactionSyncer) isZoneChange() bool {
id, err := cloud.ParseResourceURL(ref.SelfLink)
if err != nil {
s.logger.Error(err, "unable to parse selflink", "selfLink", ref.SelfLink)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
existingZones.Insert(id.Key.Zone)
Expand All @@ -640,6 +644,7 @@ func (s *transactionSyncer) isZoneChange() bool {
zones, err := s.zoneGetter.ListZones(negtypes.NodePredicateForEndpointCalculatorMode(s.EpCalculatorMode))
if err != nil {
s.logger.Error(err, "unable to list zones")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return false
}
currZones := sets.NewString(zones...)
Expand Down Expand Up @@ -712,6 +717,7 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating init status for neg, failed to get neg from store.")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}

Expand All @@ -728,6 +734,7 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
_, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating Neg CR")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
}

Expand All @@ -739,6 +746,7 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating status for neg, failed to get neg from store")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
neg := origNeg.DeepCopy()
Expand All @@ -759,6 +767,7 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
_, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "Error updating Neg CR")
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
}

Expand All @@ -775,6 +784,7 @@ func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.Endp
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
lastSyncTimestamp := negCR.Status.LastSyncTime
Expand Down Expand Up @@ -906,6 +916,7 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
if err != nil || !ok {
metrics.PublishLabelPropagationError(labels.OtherError)
logger.Error(err, "getEndpointPodLabelMap: error getting pod", "pod", key, "exist", ok)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
}
pod, ok := obj.(*v1.Pod)
Expand All @@ -917,6 +928,7 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
labelMap, err := labels.GetPodLabelMap(pod, lpConfig)
if err != nil {
recorder.Eventf(pod, apiv1.EventTypeWarning, "LabelsExceededLimit", "Label Propagation Error: %v", err)
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
endpointPodLabelMap[endpoint] = labelMap
}
Expand Down
Loading