Skip to content

Commit

Permalink
Add metrics to track endpointslice staleness
Browse files Browse the repository at this point in the history
Added metrics to track the sync staleness of endpointslices, where
staleness is defined as how long since it has been last processed.
  • Loading branch information
sawsa307 committed Feb 2, 2023
1 parent e246ee3 commit 67915aa
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 21 deletions.
17 changes: 17 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
negOpLatencyKey = "neg_operation_duration_seconds"
negOpEndpointsKey = "neg_operation_endpoints"
lastSyncTimestampKey = "sync_timestamp"
epsStalenessKey = "endpointslice_staleness"

resultSuccess = "success"
resultError = "error"
Expand Down Expand Up @@ -127,6 +128,17 @@ var (
Help: "The timestamp of the last execution of NEG controller sync loop.",
},
)

// EPSStaleness tracks for every endpoint slice, how long since it was last processed
EPSStaleness = prometheus.NewHistogram(
prometheus.HistogramOpts{
Subsystem: negControllerSubsystem,
Name: epsStalenessKey,
Help: "The duration for an endpoint slice since it was last processed by syncer",
// custom buckets - [1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(~4min), 512s(~8min), 1024s(~17min), 2048 (~34min), 4096(~68min), +Inf]
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
},
)
)

var register sync.Once
Expand All @@ -139,6 +151,7 @@ func RegisterMetrics() {
prometheus.MustRegister(SyncerSyncLatency)
prometheus.MustRegister(LastSyncTimestamp)
prometheus.MustRegister(InitializationLatency)
prometheus.MustRegister(EPSStaleness)
})
}

Expand Down Expand Up @@ -168,6 +181,10 @@ func PublishNegInitializationMetrics(latency time.Duration) {
InitializationLatency.Observe(latency.Seconds())
}

func PublishNegEPSStalenessMetrics(epsStaleness time.Duration) {
EPSStaleness.Observe(epsStaleness.Seconds())
}

func getResult(err error) string {
if err != nil {
return resultError
Expand Down
67 changes: 46 additions & 21 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ type transactionSyncer struct {

logger klog.Logger

// lastSyncTimestamp tracks the timestamp of the last execution of sync operation by this syncer
lastSyncTimestamp time.Time

// epsLastSyncTimestampMap tracks for every endpoint slice, the timestamp of when it is last processed
epsLastSyncTimestampMap map[types.UID]time.Time

// inError indicates if the syncer is in any of 4 error scenarios
// 1. Endpoint counts from EPS is different from calculated endpoint list
// 2. EndpontSlice has missing or invalid data
Expand Down Expand Up @@ -129,26 +135,27 @@ func NewTransactionSyncer(

// TransactionSyncer implements the syncer core
ts := &transactionSyncer{
NegSyncerKey: negSyncerKey,
needInit: true,
transactions: NewTransactionTable(),
nodeLister: nodeLister,
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
endpointSliceLister: endpointSliceLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
customName: customName,
enableEndpointSlices: enableEndpointSlices,
inError: false,
logger: logger,
NegSyncerKey: negSyncerKey,
needInit: true,
transactions: NewTransactionTable(),
nodeLister: nodeLister,
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
endpointSliceLister: endpointSliceLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
customName: customName,
enableEndpointSlices: enableEndpointSlices,
inError: false,
logger: logger,
epsLastSyncTimestampMap: make(map[types.UID]time.Time),
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger)
Expand Down Expand Up @@ -240,9 +247,27 @@ func (s *transactionSyncer) syncInternalImpl() error {
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
currTime := time.Now()
newLastSyncMap := make(map[types.UID]time.Time)
for i, slice := range slices {
endpointSlices[i] = slice.(*discovery.EndpointSlice)
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
key := endpointslice.ObjectMeta.UID
if lastSyncTime, contains := s.epsLastSyncTimestampMap[key]; contains {
metrics.PublishNegEPSStalenessMetrics(currTime.Sub(lastSyncTime))
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.ObjectMeta.Namespace, "Name", endpointslice.ObjectMeta.Name, "staleness", currTime.Sub(lastSyncTime).Seconds())
}
newLastSyncMap[key] = currTime
}
// if endpointslice previously in map and now got removed
for key, lastSyncTime := range s.epsLastSyncTimestampMap {
if _, contains := newLastSyncMap[key]; !contains {
metrics.PublishNegEPSStalenessMetrics(currTime.Sub(lastSyncTime))
s.logger.V(3).Info("Endpoint slice syncs", "key", key, "staleness", currTime.Sub(lastSyncTime).Seconds())
}
}
s.epsLastSyncTimestampMap = newLastSyncMap

endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) {
Expand Down

0 comments on commit 67915aa

Please sign in to comment.