Skip to content

Commit

Permalink
Add metrics for sync result
Browse files Browse the repository at this point in the history
Added metrics to collect the cumulative count of sync results.
  • Loading branch information
sawsa307 committed Feb 24, 2023
1 parent 81c79a6 commit ff4751f
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 72 deletions.
1 change: 1 addition & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func (sm *SyncerMetrics) export() {

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *negtypes.NegSyncResult) {
if syncResult.Result == negtypes.ResultInProgress {
return
}
syncerSyncResult.WithLabelValues(syncResult.Result).Inc()

sm.mu.Lock()
Expand Down
105 changes: 68 additions & 37 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,17 @@ func (s *transactionSyncer) syncInternal() error {
defer s.syncLock.Unlock()

start := time.Now()
err := s.syncInternalImpl()
result := s.syncInternalImpl()

s.updateStatus(err)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start)
return err
s.updateStatus(result.Error)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start)
if s.enableEndpointSlices && result.Error != nil {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, result)
}
return result.Error
}

func (s *transactionSyncer) syncInternalImpl() error {
func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
// TODO(cheungdavid): for now we reset the boolean so it is a no-op, but
// in the future, it will be used to trigger degraded mode if the syncer is in error state.
if s.inErrorState() {
Expand All @@ -209,20 +212,20 @@ func (s *transactionSyncer) syncInternalImpl() error {

if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultNegNotFound)
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultCurrentEPNotFound)
}
s.logStats(currentMap, "current NEG endpoints")

Expand All @@ -238,11 +241,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
if s.enableEndpointSlices {
slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultEPSNotFound)
}
if len(slices) < 1 {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
Expand All @@ -267,14 +270,16 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
if valid, result := s.CheckEPField(err); !valid {
s.setErrorState(result.Result)
return result
}
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(reason)
if valid, result := s.CheckEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(result.Result)
return result
}
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
}
} else {
ep, exists, err := s.endpointLister.Get(
Expand All @@ -286,16 +291,16 @@ func (s *transactionSyncer) syncInternalImpl() error {
},
)
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}
if !exists {
s.logger.Info("Endpoint does not exist. Skipping NEG sync", "endpoint", klog.KRef(s.Namespace, s.Name))
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints))
targetMap, endpointPodMap, _, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
}
}

Expand All @@ -320,7 +325,7 @@ func (s *transactionSyncer) syncInternalImpl() error {

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name)
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")
Expand Down Expand Up @@ -384,19 +389,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

// isValidEndpointInfo checks if endpoint information is correct.
// It returns false and the corresponding reason if one of the two checks fails:
// CheckEndpointInfo checks if endpoint information is correct.
// It returns false with the corresponding reason if one of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) {
func (s *transactionSyncer) CheckEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, *negtypes.NegSyncResult) {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false, negtypes.ResultEPCalculationCountZero
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero)
}

// Endpoint count from EndpointData
Expand All @@ -406,27 +411,35 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false, negtypes.ResultEPSEndpointCountZero
return false, negtypes.NewNegSyncResult(negtypes.ErrEPSEndpointCountZero, negtypes.ResultEPSEndpointCountZero)
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false, negtypes.ResultEPCountsDiffer
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer)
}
return true, negtypes.ResultSuccess
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}

// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
// CheckEPField checks if endpoints have valid field
// It return the result boolean with the corresponding reason
func (s *transactionSyncer) CheckEPField(err error) (bool, *negtypes.NegSyncResult) {
if errors.Is(err, negtypes.ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false, negtypes.ResultEPMissingNodeName
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingNodeName)
}
if errors.Is(err, ErrEPMissingZone) {
if errors.Is(err, negtypes.ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false, negtypes.ResultEPMissingZone
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingZone)
}
if errors.Is(err, negtypes.ErrNodeNotFound) {
return true, negtypes.NewNegSyncResult(err, negtypes.ResultNodeNotFound)
}
if err != nil {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultOtherError)
} else {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
return true, negtypes.ResultSuccess
}

// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request
Expand All @@ -450,7 +463,7 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) *negtypes.NegSyncResult {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
Expand Down Expand Up @@ -486,19 +499,37 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if err := syncFunc(addEndpoints, attachOp); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}

if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}
go s.collectSyncResult(&wg)
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultInProgress)
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
wg.Wait()
s.syncLock.Lock()
defer s.syncLock.Unlock()

var syncResult *negtypes.NegSyncResult
switch s.errorState {
case "":
syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
case negtypes.ResultInvalidAPIResponse:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse)
case negtypes.ResultInvalidEPAttach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach)
case negtypes.ResultInvalidEPDetach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach)
default:
syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError)
}

s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
Expand Down
Loading

0 comments on commit ff4751f

Please sign in to comment.