Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add metrics for sync result" #1982

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
22 changes: 0 additions & 22 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,11 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
)

var (
syncResultLabel = "result"
syncResultKey = "sync_result"

// syncerSyncResult tracks the count for each sync result
syncerSyncResult = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: negControllerSubsystem,
Name: syncResultKey,
Help: "Current count for each sync result",
},
[]string{syncResultLabel},
)
)

type SyncerMetricsCollector interface {
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
SetSyncerEPMetrics(key negtypes.NegSyncerKey, epState *negtypes.SyncerEPStat)
Expand Down Expand Up @@ -79,7 +63,6 @@ func FakeSyncerMetrics() *SyncerMetrics {

// RegisterSyncerMetrics registers syncer related metrics
func RegisterSyncerMetrics() {
prometheus.MustRegister(syncerSyncResult)
}

func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {
Expand All @@ -98,11 +81,6 @@ func (sm *SyncerMetrics) export() {

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *negtypes.NegSyncResult) {
if syncResult.Result == negtypes.ResultInProgress {
return
}
syncerSyncResult.WithLabelValues(syncResult.Result).Inc()

sm.mu.Lock()
defer sm.mu.Unlock()
if sm.syncerStatusMap == nil {
Expand Down
81 changes: 34 additions & 47 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,14 @@ func (s *transactionSyncer) syncInternal() error {
defer s.syncLock.Unlock()

start := time.Now()
result := s.syncInternalImpl()
err := s.syncInternalImpl()

s.updateStatus(result.Error)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start)
if result.Error != nil {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, result)
}
return result.Error
s.updateStatus(err)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start)
return err
}

func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
func (s *transactionSyncer) syncInternalImpl() error {
// TODO(cheungdavid): for now we reset the boolean so it is a no-op, but
// in the future, it will be used to trigger degraded mode if the syncer is in error state.
if s.inErrorState() {
Expand All @@ -205,20 +202,20 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {

if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultNegNotFound)
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return nil
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultCurrentEPNotFound)
return err
}
s.logStats(currentMap, "current NEG endpoints")

Expand All @@ -233,11 +230,11 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {

slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultEPSNotFound)
return err
}
if len(slices) < 1 {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
Expand All @@ -262,16 +259,14 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if valid, result := s.CheckEPField(err); !valid {
s.setErrorState(result.Result)
return result
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
}
if valid, result := s.CheckEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(result.Result)
return result
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(reason)
}
if err != nil {
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
}

s.logStats(targetMap, "desired NEG endpoints")
Expand All @@ -295,7 +290,7 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name)
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return nil
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")
Expand Down Expand Up @@ -359,19 +354,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

// CheckEndpointInfo checks if endpoint information is correct.
// It returns false with the corresponding reason if one of the two checks fails:
// isValidEndpointInfo checks if endpoint information is correct.
// It returns false and the corresponding reason if one of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) CheckEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, *negtypes.NegSyncResult) {
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero)
return false, negtypes.ResultEPCalculationCountZero
}

// Endpoint count from EndpointData
Expand All @@ -381,35 +376,27 @@ func (s *transactionSyncer) CheckEndpointInfo(eds []negtypes.EndpointsData, endp
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false, negtypes.NewNegSyncResult(negtypes.ErrEPSEndpointCountZero, negtypes.ResultEPSEndpointCountZero)
return false, negtypes.ResultEPSEndpointCountZero
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false, negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer)
return false, negtypes.ResultEPCountsDiffer
}
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return true, negtypes.ResultSuccess
}

// CheckEPField checks if endpoints have valid field
// It return the result boolean with the corresponding reason
func (s *transactionSyncer) CheckEPField(err error) (bool, *negtypes.NegSyncResult) {
if errors.Is(err, negtypes.ErrEPMissingNodeName) {
// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingNodeName)
return false, negtypes.ResultEPMissingNodeName
}
if errors.Is(err, negtypes.ErrEPMissingZone) {
if errors.Is(err, ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false, negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingZone)
}
if errors.Is(err, negtypes.ErrNodeNotFound) {
return true, negtypes.NewNegSyncResult(err, negtypes.ResultNodeNotFound)
}
if err != nil {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultOtherError)
} else {
return true, negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
return false, negtypes.ResultEPMissingZone
}
return true, negtypes.ResultSuccess
}

// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request
Expand All @@ -433,7 +420,7 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) *negtypes.NegSyncResult {
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
Expand Down Expand Up @@ -469,14 +456,14 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if err := syncFunc(addEndpoints, attachOp); err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
return err
}

if err := syncFunc(removeEndpoints, detachOp); err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
return err
}
go s.collectSyncResult(&wg)
return negtypes.NewNegSyncResult(nil, negtypes.ResultInProgress)
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
Expand Down
Loading