Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for sync result #1911

Merged
merged 3 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,27 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
)

var (
syncResultLabel = "result"
syncResultKey = "sync_result"

// syncerSyncResult tracks the count for each sync result
syncerSyncResult = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: negControllerSubsystem,
Name: syncResultKey,
Help: "Current count for each sync result",
},
[]string{syncResultLabel},
)
)

type SyncerMetricsCollector interface {
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
}
Expand Down Expand Up @@ -56,6 +72,7 @@ func FakeSyncerMetrics() *SyncerMetrics {

// RegisterSyncerMetrics registers syncer related metrics
func RegisterSyncerMetrics() {
prometheus.MustRegister(syncerSyncResult)
}

func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {
Expand All @@ -74,6 +91,11 @@ func (sm *SyncerMetrics) export() {

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *negtypes.NegSyncResult) {
if syncResult.Result == negtypes.ResultInProgress {
return
}
syncerSyncResult.WithLabelValues(syncResult.Result).Inc()

sm.mu.Lock()
defer sm.mu.Unlock()
if sm.syncerStatusMap == nil {
Expand Down
105 changes: 68 additions & 37 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,17 @@ func (s *transactionSyncer) syncInternal() error {
defer s.syncLock.Unlock()

start := time.Now()
err := s.syncInternalImpl()
result := s.syncInternalImpl()

s.updateStatus(err)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start)
return err
s.updateStatus(result.Error)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start)
if s.enableEndpointSlices && result.Error != nil {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, result)
}
return result.Error
}

func (s *transactionSyncer) syncInternalImpl() error {
func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
// TODO(cheungdavid): for now we reset the boolean so it is a no-op, but
// in the future, it will be used to trigger degraded mode if the syncer is in error state.
if s.inErrorState() {
Expand All @@ -209,20 +212,20 @@ func (s *transactionSyncer) syncInternalImpl() error {

if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultNegNotFound)
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultCurrentEPNotFound)
}
s.logStats(currentMap, "current NEG endpoints")

Expand All @@ -238,11 +241,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
if s.enableEndpointSlices {
slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultEPSNotFound)
}
if len(slices) < 1 {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
Expand All @@ -267,14 +270,16 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
if valid, result := s.CheckEPField(err); !valid {
s.setErrorState(result.Result)
return result
}
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(reason)
if valid, result := s.CheckEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(result.Result)
return result
}
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
}
} else {
ep, exists, err := s.endpointLister.Get(
Expand All @@ -286,16 +291,16 @@ func (s *transactionSyncer) syncInternalImpl() error {
},
)
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}
if !exists {
s.logger.Info("Endpoint does not exist. Skipping NEG sync", "endpoint", klog.KRef(s.Namespace, s.Name))
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints))
targetMap, endpointPodMap, _, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
}
}

Expand All @@ -320,7 +325,7 @@ func (s *transactionSyncer) syncInternalImpl() error {

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name)
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")
Expand Down Expand Up @@ -384,19 +389,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

// isValidEndpointInfo checks if endpoint information is correct.
// It returns false and the corresponding reason if one of the two checks fails:
// CheckEndpointInfo checks if endpoint information is correct.
// It returns false with the corresponding reason if one of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) {
func (s *transactionSyncer) CheckEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, *negtypes.NegSyncResult) {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false, negtypes.ResultEPCalculationCountZero
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero)
}

// Endpoint count from EndpointData
Expand All @@ -406,27 +411,35 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false, negtypes.ResultEPSEndpointCountZero
return false, negtypes.NewNegSyncResult(negtypes.ErrEPSEndpointCountZero, negtypes.ResultEPSEndpointCountZero)
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false, negtypes.ResultEPCountsDiffer
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer)
}
return true, negtypes.ResultSuccess
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}

// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
// CheckEPField checks if endpoints have valid field
// It return the result boolean with the corresponding reason
func (s *transactionSyncer) CheckEPField(err error) (bool, *negtypes.NegSyncResult) {
if errors.Is(err, negtypes.ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false, negtypes.ResultEPMissingNodeName
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingNodeName)
}
if errors.Is(err, ErrEPMissingZone) {
if errors.Is(err, negtypes.ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false, negtypes.ResultEPMissingZone
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingZone)
}
if errors.Is(err, negtypes.ErrNodeNotFound) {
return true, negtypes.NewNegSyncResult(err, negtypes.ResultNodeNotFound)
}
if err != nil {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultOtherError)
} else {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
return true, negtypes.ResultSuccess
}

// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request
Expand All @@ -450,7 +463,7 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) *negtypes.NegSyncResult {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
Expand Down Expand Up @@ -486,19 +499,37 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if err := syncFunc(addEndpoints, attachOp); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}

if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}
go s.collectSyncResult(&wg)
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultInProgress)
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
wg.Wait()
s.syncLock.Lock()
defer s.syncLock.Unlock()

var syncResult *negtypes.NegSyncResult
switch s.errorState {
case "":
syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
case negtypes.ResultInvalidAPIResponse:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse)
case negtypes.ResultInvalidEPAttach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach)
case negtypes.ResultInvalidEPDetach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach)
default:
syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError)
}

s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
Expand Down
Loading