Skip to content

Commit

Permalink
Merge pull request #1982 from sawsa307/revert-1911-neg-sync-result-me…
Browse files Browse the repository at this point in the history
…tric

Revert "Add metrics for sync result"
  • Loading branch information
k8s-ci-robot authored Feb 28, 2023
2 parents 1a13d16 + 451c47f commit 8da6eb2
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 110 deletions.
1 change: 0 additions & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
22 changes: 0 additions & 22 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,11 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
)

var (
syncResultLabel = "result"
syncResultKey = "sync_result"

// syncerSyncResult tracks the count for each sync result
syncerSyncResult = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: negControllerSubsystem,
Name: syncResultKey,
Help: "Current count for each sync result",
},
[]string{syncResultLabel},
)
)

type SyncerMetricsCollector interface {
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
SetSyncerEPMetrics(key negtypes.NegSyncerKey, epState *negtypes.SyncerEPStat)
Expand Down Expand Up @@ -79,7 +63,6 @@ func FakeSyncerMetrics() *SyncerMetrics {

// RegisterSyncerMetrics registers syncer related metrics
func RegisterSyncerMetrics() {
prometheus.MustRegister(syncerSyncResult)
}

func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {
Expand All @@ -98,11 +81,6 @@ func (sm *SyncerMetrics) export() {

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *negtypes.NegSyncResult) {
if syncResult.Result == negtypes.ResultInProgress {
return
}
syncerSyncResult.WithLabelValues(syncResult.Result).Inc()

sm.mu.Lock()
defer sm.mu.Unlock()
if sm.syncerStatusMap == nil {
Expand Down
81 changes: 34 additions & 47 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,14 @@ func (s *transactionSyncer) syncInternal() error {
defer s.syncLock.Unlock()

start := time.Now()
result := s.syncInternalImpl()
err := s.syncInternalImpl()

s.updateStatus(result.Error)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start)
if result.Error != nil {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, result)
}
return result.Error
s.updateStatus(err)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start)
return err
}

func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
func (s *transactionSyncer) syncInternalImpl() error {
// TODO(cheungdavid): for now we reset the boolean so it is a no-op, but
// in the future, it will be used to trigger degraded mode if the syncer is in error state.
if s.inErrorState() {
Expand All @@ -205,20 +202,20 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {

if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultNegNotFound)
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return nil
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultCurrentEPNotFound)
return err
}
s.logStats(currentMap, "current NEG endpoints")

Expand All @@ -233,11 +230,11 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {

slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultEPSNotFound)
return err
}
if len(slices) < 1 {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
Expand All @@ -262,16 +259,14 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if valid, result := s.CheckEPField(err); !valid {
s.setErrorState(result.Result)
return result
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
}
if valid, result := s.CheckEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(result.Result)
return result
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(reason)
}
if err != nil {
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
}

s.logStats(targetMap, "desired NEG endpoints")
Expand All @@ -295,7 +290,7 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name)
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return nil
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")
Expand Down Expand Up @@ -359,19 +354,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

// CheckEndpointInfo checks if endpoint information is correct.
// It returns false with the corresponding reason if one of the two checks fails:
// isValidEndpointInfo checks if endpoint information is correct.
// It returns false and the corresponding reason if one of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) CheckEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, *negtypes.NegSyncResult) {
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero)
return false, negtypes.ResultEPCalculationCountZero
}

// Endpoint count from EndpointData
Expand All @@ -381,35 +376,27 @@ func (s *transactionSyncer) CheckEndpointInfo(eds []negtypes.EndpointsData, endp
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false, negtypes.NewNegSyncResult(negtypes.ErrEPSEndpointCountZero, negtypes.ResultEPSEndpointCountZero)
return false, negtypes.ResultEPSEndpointCountZero
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer)
return false, negtypes.ResultEPCountsDiffer
}
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return true, negtypes.ResultSuccess
}

// CheckEPField checks if endpoints have valid field
// It return the result boolean with the corresponding reason
func (s *transactionSyncer) CheckEPField(err error) (bool, *negtypes.NegSyncResult) {
if errors.Is(err, negtypes.ErrEPMissingNodeName) {
// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingNodeName)
return false, negtypes.ResultEPMissingNodeName
}
if errors.Is(err, negtypes.ErrEPMissingZone) {
if errors.Is(err, ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingZone)
}
if errors.Is(err, negtypes.ErrNodeNotFound) {
return true, negtypes.NewNegSyncResult(err, negtypes.ResultNodeNotFound)
}
if err != nil {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultOtherError)
} else {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return false, negtypes.ResultEPMissingZone
}
return true, negtypes.ResultSuccess
}

// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request
Expand All @@ -433,7 +420,7 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) *negtypes.NegSyncResult {
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
Expand Down Expand Up @@ -469,14 +456,14 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if err := syncFunc(addEndpoints, attachOp); err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
return err
}

if err := syncFunc(removeEndpoints, detachOp); err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
return err
}
go s.collectSyncResult(&wg)
return negtypes.NewNegSyncResult(nil, negtypes.ResultInProgress)
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
Expand Down
Loading

0 comments on commit 8da6eb2

Please sign in to comment.