From 67915aae2e6e46fc6668ea5ac4a689389f6f2d2d Mon Sep 17 00:00:00 2001 From: David Cheung Date: Thu, 2 Feb 2023 20:14:10 +0000 Subject: [PATCH] Add metrics to track endpointslice staleness Added metrics to track the sync staleness of endpointslices, where staleness is defined as how long since it has been last processed. --- pkg/neg/metrics/metrics.go | 17 +++++++++ pkg/neg/syncers/transaction.go | 67 +++++++++++++++++++++++----------- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/pkg/neg/metrics/metrics.go b/pkg/neg/metrics/metrics.go index b5fe7de76f..9dba1848cd 100644 --- a/pkg/neg/metrics/metrics.go +++ b/pkg/neg/metrics/metrics.go @@ -31,6 +31,7 @@ const ( negOpLatencyKey = "neg_operation_duration_seconds" negOpEndpointsKey = "neg_operation_endpoints" lastSyncTimestampKey = "sync_timestamp" + epsStalenessKey = "endpointslice_staleness" resultSuccess = "success" resultError = "error" @@ -127,6 +128,17 @@ var ( Help: "The timestamp of the last execution of NEG controller sync loop.", }, ) + + // EPSStaleness tracks for every endpoint slice, how long since it was last processed + EPSStaleness = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: negControllerSubsystem, + Name: epsStalenessKey, + Help: "The duration for an endpoint slice since it was last processed by syncer", + // custom buckets - [1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(~4min), 512s(~8min), 1024s(~17min), 2048 (~34min), 4096(~68min), +Inf] + Buckets: prometheus.ExponentialBuckets(1, 2, 13), + }, + ) ) var register sync.Once @@ -139,6 +151,7 @@ func RegisterMetrics() { prometheus.MustRegister(SyncerSyncLatency) prometheus.MustRegister(LastSyncTimestamp) prometheus.MustRegister(InitializationLatency) + prometheus.MustRegister(EPSStaleness) }) } @@ -168,6 +181,10 @@ func PublishNegInitializationMetrics(latency time.Duration) { InitializationLatency.Observe(latency.Seconds()) } +func PublishNegEPSStalenessMetrics(epsStaleness time.Duration) { + EPSStaleness.Observe(epsStaleness.Seconds()) +} + func getResult(err error) string { if err != nil { return resultError diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 2cc107acf5..541cb50095 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -97,6 +97,12 @@ type transactionSyncer struct { logger klog.Logger + // lastSyncTimestamp tracks the timestamp of the last execution of sync operation by this syncer + lastSyncTimestamp time.Time + + // epsLastSyncTimestampMap tracks for every endpoint slice, the timestamp of when it is last processed + epsLastSyncTimestampMap map[types.UID]time.Time + // inError indicates if the syncer is in any of 4 error scenarios // 1. Endpoint counts from EPS is different from calculated endpoint list // 2. EndpontSlice has missing or invalid data @@ -129,26 +135,27 @@ func NewTransactionSyncer( // TransactionSyncer implements the syncer core ts := &transactionSyncer{ - NegSyncerKey: negSyncerKey, - needInit: true, - transactions: NewTransactionTable(), - nodeLister: nodeLister, - podLister: podLister, - serviceLister: serviceLister, - endpointLister: endpointLister, - endpointSliceLister: endpointSliceLister, - svcNegLister: svcNegLister, - recorder: recorder, - cloud: cloud, - zoneGetter: zoneGetter, - endpointsCalculator: epc, - reflector: reflector, - kubeSystemUID: kubeSystemUID, - svcNegClient: svcNegClient, - customName: customName, - enableEndpointSlices: enableEndpointSlices, - inError: false, - logger: logger, + NegSyncerKey: negSyncerKey, + needInit: true, + transactions: NewTransactionTable(), + nodeLister: nodeLister, + podLister: podLister, + serviceLister: serviceLister, + endpointLister: endpointLister, + endpointSliceLister: endpointSliceLister, + svcNegLister: svcNegLister, + recorder: recorder, + cloud: cloud, + zoneGetter: zoneGetter, + endpointsCalculator: epc, + reflector: reflector, + kubeSystemUID: kubeSystemUID, + svcNegClient: svcNegClient, + customName: customName, + enableEndpointSlices: enableEndpointSlices, + inError: false, + logger: logger, + epsLastSyncTimestampMap: make(map[types.UID]time.Time), } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) @@ -240,9 +247,27 @@ func (s *transactionSyncer) syncInternalImpl() error { return nil } endpointSlices := make([]*discovery.EndpointSlice, len(slices)) + currTime := time.Now() + newLastSyncMap := make(map[types.UID]time.Time) for i, slice := range slices { - endpointSlices[i] = slice.(*discovery.EndpointSlice) + endpointslice := slice.(*discovery.EndpointSlice) + endpointSlices[i] = endpointslice + key := endpointslice.ObjectMeta.UID + if lastSyncTime, contains := s.epsLastSyncTimestampMap[key]; contains { + metrics.PublishNegEPSStalenessMetrics(currTime.Sub(lastSyncTime)) + s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.ObjectMeta.Namespace, "Name", endpointslice.ObjectMeta.Name, "staleness", currTime.Sub(lastSyncTime).Seconds()) + } + newLastSyncMap[key] = currTime } + // if endpointslice previously in map and now got removed + for key, lastSyncTime := range s.epsLastSyncTimestampMap { + if _, contains := newLastSyncMap[key]; !contains { + metrics.PublishNegEPSStalenessMetrics(currTime.Sub(lastSyncTime)) + s.logger.V(3).Info("Endpoint slice syncs", "key", key, "staleness", currTime.Sub(lastSyncTime).Seconds()) + } + } + s.epsLastSyncTimestampMap = newLastSyncMap + endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) {