Skip to content

Commit

Permalink
Add metrics for sync result
Browse files Browse the repository at this point in the history
Created sync metrics collector to collect sync related metrics. Added
metrics to collect the cumulative count of sync results.
  • Loading branch information
sawsa307 committed Jan 31, 2023
1 parent 9221b53 commit bb394ad
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 46 deletions.
122 changes: 83 additions & 39 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,28 @@ import (
)

var (
ErrEPCountsDiffer = errors.New("endpoint counts from endpointData and endpointPodMap differ")
ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field")
ErrNodeNotFound = errors.New("failed to retrieve associated zone of node")
ErrEPMissingZone = errors.New("endpoint has empty zone field")
ErrEPSEndpointCountZero = errors.New("endpoint count from endpointData cannot be zero")
ErrEPCalculationCountZero = errors.New("endpoint count from endpointPodMap cannot be zero")

ResultEPCountsDiffer = "EPCountsDiffer"
ResultEPMissingNodeName = "EPMissingNodeName"
ResultEPMissingZone = "EPMissingZone"
ResultInvalidEPAttach = "InvalidEPAttach"
ResultInvalidEPDetach = "InvalidEPDetach"
ResultEPSEndpointCountZero = "EPSEndpointCountZero"
ResultEPCalculationCountZero = "EPCalculationCountZero"
ResultNegNotFound = "NegNotFound"
ResultCurrentEPNotFound = "CurrentEPNotFound"
ResultEPSNotFound = "EPSNotFound"
ResultNodeNotFound = "NodeNotFound"
ResultOther = "OtherResult"
ResultSuccess = "Success"

// these results have their own errors
ResultInvalidEPAttach = "InvalidEPAttach"
ResultInvalidEPDetach = "InvalidEPDetach"
ResultNegNotFound = "NegNotFound"
ResultCurrentEPNotFound = "CurrentEPNotFound"
ResultEPSNotFound = "EPSNotFound"
ResultNodeNotFound = "NodeNotFound"
ResultOther = "OtherResult"
ResultSuccess = "Success"
)

type transactionSyncer struct {
Expand Down Expand Up @@ -120,6 +129,9 @@ type transactionSyncer struct {
// 4. Endpoint count from EPS or calculated endpoint list is 0
// Need to grab syncLock first for any reads or writes based on this value
inError bool

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector
}

func NewTransactionSyncer(
Expand All @@ -137,6 +149,7 @@ func NewTransactionSyncer(
epc negtypes.NetworkEndpointsCalculator,
kubeSystemUID string,
svcNegClient svcnegclient.Interface,
syncerMetrics *metrics.SyncerMetrics,
customName bool,
enableEndpointSlices bool,
log klog.Logger) negtypes.NegSyncer {
Expand All @@ -161,6 +174,7 @@ func NewTransactionSyncer(
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
syncCollector: syncerMetrics,
customName: customName,
enableEndpointSlices: enableEndpointSlices,
inError: false,
Expand Down Expand Up @@ -204,14 +218,17 @@ func (s *transactionSyncer) syncInternal() error {
defer s.syncLock.Unlock()

start := time.Now()
err := s.syncInternalImpl()
result := s.syncInternalImpl()

s.updateStatus(err)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start)
return err
s.updateStatus(result.Error)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start)
if s.enableEndpointSlices {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, result)
}
return result.Error
}

func (s *transactionSyncer) syncInternalImpl() error {
func (s *transactionSyncer) syncInternalImpl() *metrics.NegSyncResult {
// TODO(cheungdavid): for now we reset the boolean so it is a no-op, but
// in the future, it will be used to trigger degraded mode if the syncer is in error state.
if s.inErrorState() {
Expand All @@ -220,20 +237,20 @@ func (s *transactionSyncer) syncInternalImpl() error {

if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
return metrics.NewNegSyncResult(err, ResultNegNotFound)
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
return metrics.NewNegSyncResult(nil, ResultSuccess)
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
return metrics.NewNegSyncResult(err, ResultCurrentEPNotFound)
}
s.logStats(currentMap, "current NEG endpoints")

Expand All @@ -249,23 +266,29 @@ func (s *transactionSyncer) syncInternalImpl() error {
if s.enableEndpointSlices {
slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
return metrics.NewNegSyncResult(err, ResultEPSNotFound)
}
if len(slices) < 1 {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
return metrics.NewNegSyncResult(nil, ResultSuccess)
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
for i, slice := range slices {
endpointSlices[i] = slice.(*discovery.EndpointSlice)
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) {
syncResult := s.checkValidEPField(err)
if syncResult.Result == ResultEPMissingNodeName || syncResult.Result == ResultEPMissingZone {
s.setErrorState()
}
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
if syncResult.Error != nil {
return syncResult
}
syncResult = s.checkEndpointInfo(endpointsData, endpointPodMap, dupCount)
if syncResult.Error != nil {
s.setErrorState()
return syncResult
}
} else {
ep, exists, err := s.endpointLister.Get(
Expand All @@ -277,16 +300,16 @@ func (s *transactionSyncer) syncInternalImpl() error {
},
)
if err != nil {
return err
return metrics.NewNegSyncResult(err, ResultOther)
}
if !exists {
s.logger.Info("Endpoint does not exist. Skipping NEG sync", "endpoint", klog.KRef(s.Namespace, s.Name))
return nil
return metrics.NewNegSyncResult(nil, ResultSuccess)
}
endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints))
targetMap, endpointPodMap, _, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return metrics.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), ResultOther)
}
}

Expand All @@ -311,12 +334,17 @@ func (s *transactionSyncer) syncInternalImpl() error {

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name)
return nil
return metrics.NewNegSyncResult(nil, ResultSuccess)
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")

return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
err = s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
if err != nil {
return metrics.NewNegSyncResult(err, ResultOther)
} else {
return metrics.NewNegSyncResult(nil, ResultSuccess)
}
}

// syncLock must already be acquired before execution
Expand Down Expand Up @@ -375,19 +403,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

// isValidEndpointInfo checks if endpoint information is correct.
// It returns false if one of the two checks fails:
// checkEndpointInfo checks if endpoint information is correct.
// It returns error and corresponding reason if any of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool {
func (s *transactionSyncer) checkEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) *metrics.NegSyncResult {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false
return metrics.NewNegSyncResult(ErrEPCalculationCountZero, ResultEPCalculationCountZero)
}

// Endpoint count from EndpointData
Expand All @@ -397,34 +425,42 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false
return metrics.NewNegSyncResult(ErrEPSEndpointCountZero, ResultEPSEndpointCountZero)
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false
return metrics.NewNegSyncResult(ErrEPCountsDiffer, ResultEPCountsDiffer)
}
return true
return metrics.NewNegSyncResult(nil, ResultSuccess)
}

// isValidEPField returns false if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) bool {
// checkValidEPField checks the error from endpoint calculation and return the corresponding syncResult
func (s *transactionSyncer) checkValidEPField(err error) *metrics.NegSyncResult {
if errors.Is(err, ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false
return metrics.NewNegSyncResult(err, ResultEPMissingNodeName)
}
if errors.Is(err, ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false
return metrics.NewNegSyncResult(err, ResultEPMissingZone)
}
if errors.Is(err, ErrNodeNotFound) {
return metrics.NewNegSyncResult(err, ResultNodeNotFound)
}
if err != nil {
return metrics.NewNegSyncResult(err, ResultOther)
} else {
return metrics.NewNegSyncResult(nil, ResultSuccess)

}
return true
}

// isValidEPBatch returns false if the error from endpoint batch response is due to bad request
func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool {
apiErr, ok := err.(*googleapi.Error)
if !ok {
s.logger.Info("Detected error when parsing batch request error", "operation", operation, "error", err)
s.logger.Info("Detected error when parsing batch response error", "operation", operation, "error", err)
return false
}
errCode := apiErr.Code
Expand Down Expand Up @@ -515,6 +551,14 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
if !s.isValidEPBatch(err, operation, networkEndpoints) {
s.setErrorState()
var syncResult *metrics.NegSyncResult
if operation == attachOp {
syncResult = metrics.NewNegSyncResult(err, ResultInvalidEPAttach)
}
if operation == detachOp {
syncResult = metrics.NewNegSyncResult(err, ResultInvalidEPDetach)
}
s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package syncers

import (
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -48,12 +47,6 @@ const (
separator = "||"
)

var (
ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field")
ErrNodeNotFound = errors.New("failed to retrieve associated zone of node")
ErrEPMissingZone = errors.New("endpoint has empty zone field")
)

// encodeEndpoint encodes ip and instance into a single string
func encodeEndpoint(ip, instance, port string) string {
return strings.Join([]string{ip, instance, port}, separator)
Expand Down

0 comments on commit bb394ad

Please sign in to comment.