Skip to content

Commit

Permalink
Add metrics for sync result
Browse files Browse the repository at this point in the history
Added metrics to collect the cumulative count of sync results.
  • Loading branch information
sawsa307 committed Feb 15, 2023
1 parent ca267ab commit b14be83
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 62 deletions.
1 change: 1 addition & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
93 changes: 61 additions & 32 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,17 @@ func (s *transactionSyncer) syncInternal() error {
defer s.syncLock.Unlock()

start := time.Now()
err := s.syncInternalImpl()
result := s.syncInternalImpl()

s.updateStatus(err)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), err, start)
return err
s.updateStatus(result.Error)
metrics.PublishNegSyncMetrics(string(s.NegSyncerKey.NegType), string(s.endpointsCalculator.Mode()), result.Error, start)
if s.enableEndpointSlices {
s.syncCollector.UpdateSyncer(s.NegSyncerKey, result)
}
return result.Error
}

func (s *transactionSyncer) syncInternalImpl() error {
func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult {
// TODO(cheungdavid): for now we reset the boolean so it is a no-op, but
// in the future, it will be used to trigger degraded mode if the syncer is in error state.
if s.inErrorState() {
Expand All @@ -209,20 +212,20 @@ func (s *transactionSyncer) syncInternalImpl() error {

if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultNegNotFound)
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultCurrentEPNotFound)
}
s.logStats(currentMap, "current NEG endpoints")

Expand All @@ -238,23 +241,29 @@ func (s *transactionSyncer) syncInternalImpl() error {
if s.enableEndpointSlices {
slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultEPSNotFound)
}
if len(slices) < 1 {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
for i, slice := range slices {
endpointSlices[i] = slice.(*discovery.EndpointSlice)
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) {
syncResult := s.checkValidEPField(err)
if syncResult.Result == negtypes.ResultEPMissingNodeName || syncResult.Result == negtypes.ResultEPMissingZone {
s.setErrorState()
}
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
if syncResult.Error != nil {
return syncResult
}
syncResult = s.checkEndpointInfo(endpointsData, endpointPodMap, dupCount)
if syncResult.Error != nil {
s.setErrorState()
return syncResult
}
} else {
ep, exists, err := s.endpointLister.Get(
Expand All @@ -266,16 +275,16 @@ func (s *transactionSyncer) syncInternalImpl() error {
},
)
if err != nil {
return err
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
}
if !exists {
s.logger.Info("Endpoint does not exist. Skipping NEG sync", "endpoint", klog.KRef(s.Namespace, s.Name))
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints))
targetMap, endpointPodMap, _, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return negtypes.NewNegSyncResult(fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err), negtypes.ResultOtherError)
}
}

Expand All @@ -300,12 +309,17 @@ func (s *transactionSyncer) syncInternalImpl() error {

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
s.logger.V(3).Info("No endpoint change. Skip syncing NEG. ", s.Namespace, s.Name)
return nil
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")

return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
err = s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
if err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
} else {
return negtypes.NewNegSyncResult(nil, negtypes.ResultInProgress)
}
}

// syncLock must already be acquired before execution
Expand Down Expand Up @@ -364,19 +378,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

// isValidEndpointInfo checks if endpoint information is correct.
// It returns false if one of the two checks fails:
// checkEndpointInfo checks if endpoint information is correct.
// It returns NegSyncResult with error and corresponding reason if any of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool {
func (s *transactionSyncer) checkEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) *negtypes.NegSyncResult {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false
return negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero)
}

// Endpoint count from EndpointData
Expand All @@ -386,27 +400,34 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false
return negtypes.NewNegSyncResult(negtypes.ErrEPSEndpointCountZero, negtypes.ResultEPSEndpointCountZero)
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false
return negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer)
}
return true
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}

// isValidEPField returns false if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) bool {
if errors.Is(err, ErrEPMissingNodeName) {
// checkValidEPField checks the error from endpoint calculation and return the corresponding syncResult
func (s *transactionSyncer) checkValidEPField(err error) *negtypes.NegSyncResult {
if errors.Is(err, negtypes.ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false
return negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingNodeName)
}
if errors.Is(err, ErrEPMissingZone) {
if errors.Is(err, negtypes.ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false
return negtypes.NewNegSyncResult(err, negtypes.ResultEPMissingZone)
}
if errors.Is(err, negtypes.ErrNodeNotFound) {
return negtypes.NewNegSyncResult(err, negtypes.ResultNodeNotFound)
}
if err != nil {
return negtypes.NewNegSyncResult(err, negtypes.ResultOtherError)
} else {
return negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
}
return true
}

// isValidEPBatch returns false if the error from endpoint batch response is due to bad request
Expand Down Expand Up @@ -504,6 +525,14 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
if !s.isValidEPBatch(err, operation, networkEndpoints) {
s.setErrorState()
var syncResult *negtypes.NegSyncResult
if operation == attachOp {
syncResult = negtypes.NewNegSyncResult(err, negtypes.ResultInvalidEPAttach)
}
if operation == detachOp {
syncResult = negtypes.NewNegSyncResult(err, negtypes.ResultInvalidEPDetach)
}
s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}
}

Expand Down
41 changes: 21 additions & 20 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package syncers

import (
context2 "context"
"errors"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -1413,7 +1414,7 @@ func TestUnknownNodes(t *testing.T) {
}
}

func TestIsValidEndpointInfo(t *testing.T) {
func TestCheckEndpointInfo(t *testing.T) {
t.Parallel()
_, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true)

Expand Down Expand Up @@ -1473,7 +1474,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointsData []negtypes.EndpointsData
endpointPodMap map[negtypes.NetworkEndpoint]types.NamespacedName
dupCount int
expect bool
expect *negtypes.NegSyncResult
}{
{
desc: "counts equal, endpointData has no duplicated endpoints",
Expand Down Expand Up @@ -1545,7 +1546,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: true,
expect: negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess),
},
{
desc: "counts equal, endpointData has duplicated endpoints",
Expand Down Expand Up @@ -1626,7 +1627,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: testEndpointPodMap,
dupCount: 1,
expect: true,
expect: negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess),
},
{
desc: "counts not equal, endpointData has no duplicated endpoints",
Expand Down Expand Up @@ -1689,7 +1690,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer),
},
{
desc: "counts not equal, endpointData has duplicated endpoints",
Expand Down Expand Up @@ -1761,7 +1762,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: testEndpointPodMap,
dupCount: 1,
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPCountsDiffer, negtypes.ResultEPCountsDiffer),
},
{
desc: "endpointData has zero endpoint",
Expand Down Expand Up @@ -1795,7 +1796,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPSEndpointCountZero, negtypes.ResultEPSEndpointCountZero),
},
{
desc: "endpointPodMap has zero endpoint",
Expand Down Expand Up @@ -1867,7 +1868,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{},
dupCount: 0,
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero),
},
{
desc: "endpointData and endpointPodMap both have zero endpoint",
Expand Down Expand Up @@ -1901,7 +1902,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{},
dupCount: 0,
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPCalculationCountZero, negtypes.ResultEPCalculationCountZero), // PodMap count is check and returned first,
},
{
desc: "endpointData and endpointPodMap both have non-zero endpoints",
Expand Down Expand Up @@ -1973,20 +1974,20 @@ func TestIsValidEndpointInfo(t *testing.T) {
},
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: true,
expect: negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess),
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
if got := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect {
t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect)
if got := transactionSyncer.checkEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); !errors.Is(got.Error, tc.expect.Error) || got.Result != tc.expect.Result {
t.Errorf("checkEndpointInfo() = %v, expected %v", got, tc.expect)
}
})
}
}

func TestIsValidEPField(t *testing.T) {
func TestCheckValidEPField(t *testing.T) {
t.Parallel()
_, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), negtypes.VmIpPortEndpointType, false, true)

Expand Down Expand Up @@ -2015,7 +2016,7 @@ func TestIsValidEPField(t *testing.T) {
testCases := []struct {
desc string
endpointsData []negtypes.EndpointsData
expect bool
expect *negtypes.NegSyncResult
}{
{
desc: "no missing zone or nodeName",
Expand Down Expand Up @@ -2085,7 +2086,7 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: true,
expect: negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess),
},
{
desc: "contains one missing nodeName",
Expand Down Expand Up @@ -2155,7 +2156,7 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPMissingNodeName, negtypes.ResultEPMissingNodeName),
},
{
desc: "contains one empty nodeName",
Expand Down Expand Up @@ -2225,7 +2226,7 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPMissingNodeName, negtypes.ResultEPMissingNodeName),
},
{
desc: "contains one missing zone",
Expand Down Expand Up @@ -2295,14 +2296,14 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: negtypes.NewNegSyncResult(negtypes.ErrEPMissingZone, negtypes.ResultEPMissingZone),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, _, _, err := transactionSyncer.endpointsCalculator.CalculateEndpoints(tc.endpointsData, nil)
if got := transactionSyncer.isValidEPField(err); got != tc.expect {
t.Errorf("isValidEPField() = %t, expected %t, err: %v, ", got, tc.expect, err)
if got := transactionSyncer.checkValidEPField(err); !errors.Is(got.Error, tc.expect.Error) || got.Result != tc.expect.Result {
t.Errorf("checkValidEPField() = %v, expected %v", got.Error, tc.expect)
}
})
}
Expand Down
Loading

0 comments on commit b14be83

Please sign in to comment.