From 3c781a21474d250552b7d77c52e0897402fc0c8d Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Fri, 24 Feb 2023 12:07:36 +0800 Subject: [PATCH 1/4] Supply extra metrics for monitoring the flows' directions under PreferLeader mode. Signed-off-by: Lucasliang --- internal/locate/region_cache.go | 41 +++++++++++++++++++++++++++++++ internal/locate/region_request.go | 9 ++++++- internal/locate/slow_score.go | 24 ++++++++++++++++++ metrics/metrics.go | 10 ++++++++ 4 files changed, 83 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index bf969b5a54..ef4d0ceaa2 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -434,6 +434,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.enableForwarding = config.GetGlobalConfig().EnableForwarding // Default use 15s as the update inerval. go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) + go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) return c } @@ -2271,6 +2272,8 @@ type Store struct { // A statistic for counting the request latency to this store slowScore SlowScoreStat + // A statistic for counting the flows of different replicas on this store + replicaFlowsStats [numReplicaFlowsType]uint64 } type resolveState uint64 @@ -2712,6 +2715,7 @@ func (s *Store) recordSlowScoreStat(timecost time.Duration) { s.slowScore.recordSlowScoreStat(timecost) } +// markAlreadySlow marks the related store already slow. func (s *Store) markAlreadySlow() { s.slowScore.markAlreadySlow() } @@ -2731,6 +2735,7 @@ func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) { } } +// checkAndUpdateStoreSlowScores checks and updates slowScore on each store. func (c *RegionCache) checkAndUpdateStoreSlowScores() { defer func() { r := recover() @@ -2752,6 +2757,42 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() { } } +// getReplicaFlowsStats returns the statistics on the related replicaFlowsType. +func (s *Store) getReplicaFlowsStats(destType replicaFlowsType) uint64 { + return atomic.LoadUint64(&s.replicaFlowsStats[destType]) +} + +// resetReplicaFlowsStats resets the statistics on the related replicaFlowsType. +func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) { + atomic.StoreUint64(&s.replicaFlowsStats[destType], 0) +} + +// recordReplicaFlowsStats records the statistics on the related replicaFlowsType. +func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { + atomic.AddUint64(&s.replicaFlowsStats[destType], 1) +} + +// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType. +func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + for destType := toLeader; destType < numReplicaFlowsType; destType++ { + metrics.TiKVPreferLeaderFlowsCounter.WithLabelValues(destType.String(), store.addr).Add(float64(store.getReplicaFlowsStats(destType))) + store.resetReplicaFlowsStats(destType) + } + } + c.storeMu.RUnlock() + } + } +} + func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { // Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to // access it. diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 00e2f043e4..c3d0f203cc 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -287,7 +287,6 @@ type replicaSelector struct { // | reachable +-----+-----+ all proxies are tried ^ // +------------+tryNewProxy+-------------------------+ // +-----------+ - type selectorState interface { next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) onSendSuccess(*replicaSelector) @@ -594,6 +593,14 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector state.lastIdx = state.leaderIdx selector.targetIdx = state.leaderIdx } + // Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode. + if state.option.preferLeader { + if selector.targetIdx != state.leaderIdx { + selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toFollower) + } else { + selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader) + } + } return selector.buildRPCContext(bo) } diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go index 0da7f145b3..562c9e9db7 100644 --- a/internal/locate/slow_score.go +++ b/internal/locate/slow_score.go @@ -15,6 +15,7 @@ package locate import ( + "fmt" "math" "sync/atomic" "time" @@ -155,3 +156,26 @@ func (ss *SlowScoreStat) markAlreadySlow() { func (ss *SlowScoreStat) isSlow() bool { return ss.getSlowScore() >= slowScoreThreshold } + +// replicaFlowsType indicates the type of the destination replica of flows. +type replicaFlowsType int + +const ( + // toLeader indicates that flows are sent to leader replica. + toLeader replicaFlowsType = iota + // toFollower indicates that flows are sent to followers' replica + toFollower + // numflowsDestType reserved to keep max replicaFlowsType value. + numReplicaFlowsType +) + +func (a replicaFlowsType) String() string { + switch a { + case toLeader: + return "ToLeader" + case toFollower: + return "ToFollower" + default: + return fmt.Sprintf("%d", a) + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 3b767ceb8c..296380e911 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -99,6 +99,7 @@ var ( TiKVGrpcConnectionState *prometheus.GaugeVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec + TiKVPreferLeaderFlowsCounter *prometheus.CounterVec ) // Label constants. @@ -616,6 +617,14 @@ func initMetrics(namespace, subsystem string) { Help: "Slow scores of each tikv node based on RPC timecosts", }, []string{LblStore}) + TiKVPreferLeaderFlowsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "prefer_leader_flows_counter", + Help: "Counter of flows under PreferLeader mode.", + }, []string{LblType, LblStore}) + initShortcuts() } @@ -689,6 +698,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) + prometheus.MustRegister(TiKVPreferLeaderFlowsCounter) } // readCounter reads the value of a prometheus.Counter. From d98bd47838edaacae0c5a00140dea9ba9cba108f Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Mon, 27 Feb 2023 10:38:12 +0800 Subject: [PATCH 2/4] Polish code-styles Signed-off-by: Lucasliang --- internal/locate/region_request.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c3d0f203cc..f18a0c3dda 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -287,6 +287,7 @@ type replicaSelector struct { // | reachable +-----+-----+ all proxies are tried ^ // +------------+tryNewProxy+-------------------------+ // +-----------+ + type selectorState interface { next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) onSendSuccess(*replicaSelector) From 1565de273b21f5bdf4f193d59be7c045fcf585e0 Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Tue, 28 Feb 2023 20:41:00 +0800 Subject: [PATCH 3/4] Modify the interval of reporting to make the metrics more consecutive. Signed-off-by: Lucasliang --- internal/locate/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ef4d0ceaa2..4dfad6c553 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -434,7 +434,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.enableForwarding = config.GetGlobalConfig().EnableForwarding // Default use 15s as the update inerval. go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) - go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) + go c.asyncReportStoreReplicaFlows(time.Duration(interval/4) * time.Second) return c } From 13810086c524c399e1ae1209c114ebf400e12041 Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Tue, 28 Feb 2023 21:11:37 +0800 Subject: [PATCH 4/4] Change the type of metrics. Signed-off-by: Lucasliang --- internal/locate/region_cache.go | 4 ++-- metrics/metrics.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 4dfad6c553..64e953764c 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -434,7 +434,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.enableForwarding = config.GetGlobalConfig().EnableForwarding // Default use 15s as the update inerval. go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) - go c.asyncReportStoreReplicaFlows(time.Duration(interval/4) * time.Second) + go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) return c } @@ -2784,7 +2784,7 @@ func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { c.storeMu.RLock() for _, store := range c.storeMu.stores { for destType := toLeader; destType < numReplicaFlowsType; destType++ { - metrics.TiKVPreferLeaderFlowsCounter.WithLabelValues(destType.String(), store.addr).Add(float64(store.getReplicaFlowsStats(destType))) + metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) store.resetReplicaFlowsStats(destType) } } diff --git a/metrics/metrics.go b/metrics/metrics.go index a8ed32b5ba..7d349b59e9 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -99,7 +99,7 @@ var ( TiKVGrpcConnectionState *prometheus.GaugeVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec - TiKVPreferLeaderFlowsCounter *prometheus.CounterVec + TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec ) // Label constants. @@ -620,11 +620,11 @@ func initMetrics(namespace, subsystem string) { Help: "Slow scores of each tikv node based on RPC timecosts", }, []string{LblStore}) - TiKVPreferLeaderFlowsCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ + TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "prefer_leader_flows_counter", + Name: "prefer_leader_flows_gauge", Help: "Counter of flows under PreferLeader mode.", }, []string{LblType, LblStore}) @@ -701,7 +701,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) - prometheus.MustRegister(TiKVPreferLeaderFlowsCounter) + prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) } // readCounter reads the value of a prometheus.Counter.