Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics] Supply extra metrics to monitor the flows under prefer-leader mode. #716

Merged
merged 12 commits into from
Mar 13, 2023
Merged
41 changes: 41 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
// Default use 15s as the update inerval.
go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second)
go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second)
return c
}

Expand Down Expand Up @@ -2277,6 +2278,8 @@ type Store struct {

// A statistic for counting the request latency to this store
slowScore SlowScoreStat
// A statistic for counting the flows of different replicas on this store
replicaFlowsStats [numReplicaFlowsType]uint64
}

type resolveState uint64
Expand Down Expand Up @@ -2718,6 +2721,7 @@ func (s *Store) recordSlowScoreStat(timecost time.Duration) {
s.slowScore.recordSlowScoreStat(timecost)
}

// markAlreadySlow marks the related store already slow.
func (s *Store) markAlreadySlow() {
s.slowScore.markAlreadySlow()
}
Expand All @@ -2737,6 +2741,7 @@ func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) {
}
}

// checkAndUpdateStoreSlowScores checks and updates slowScore on each store.
func (c *RegionCache) checkAndUpdateStoreSlowScores() {
defer func() {
r := recover()
Expand All @@ -2758,6 +2763,42 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() {
}
}

// getReplicaFlowsStats returns the statistics on the related replicaFlowsType.
func (s *Store) getReplicaFlowsStats(destType replicaFlowsType) uint64 {
return atomic.LoadUint64(&s.replicaFlowsStats[destType])
}

// resetReplicaFlowsStats resets the statistics on the related replicaFlowsType.
func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) {
atomic.StoreUint64(&s.replicaFlowsStats[destType], 0)
}

// recordReplicaFlowsStats records the statistics on the related replicaFlowsType.
func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) {
atomic.AddUint64(&s.replicaFlowsStats[destType], 1)
}

// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
}
}
c.storeMu.RUnlock()
}
}
}

func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) {
// Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to
// access it.
Expand Down
8 changes: 8 additions & 0 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,14 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
state.lastIdx = state.leaderIdx
selector.targetIdx = state.leaderIdx
}
// Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode.
if state.option.preferLeader {
if selector.targetIdx != state.leaderIdx {
selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toFollower)
} else {
selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader)
}
}
return selector.buildRPCContext(bo)
}

Expand Down
24 changes: 24 additions & 0 deletions internal/locate/slow_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package locate

import (
"fmt"
"math"
"sync/atomic"
"time"
Expand Down Expand Up @@ -155,3 +156,26 @@ func (ss *SlowScoreStat) markAlreadySlow() {
func (ss *SlowScoreStat) isSlow() bool {
return ss.getSlowScore() >= slowScoreThreshold
}

// replicaFlowsType indicates the type of the destination replica of flows.
type replicaFlowsType int

const (
// toLeader indicates that flows are sent to leader replica.
toLeader replicaFlowsType = iota
// toFollower indicates that flows are sent to followers' replica
toFollower
// numflowsDestType reserved to keep max replicaFlowsType value.
numReplicaFlowsType
)

func (a replicaFlowsType) String() string {
switch a {
case toLeader:
return "ToLeader"
case toFollower:
return "ToFollower"
default:
return fmt.Sprintf("%d", a)
}
}
10 changes: 10 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ var (
TiKVGrpcConnectionState *prometheus.GaugeVec
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
)

// Label constants.
Expand Down Expand Up @@ -619,6 +620,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Slow scores of each tikv node based on RPC timecosts",
}, []string{LblStore})

TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "prefer_leader_flows_gauge",
Help: "Counter of flows under PreferLeader mode.",
}, []string{LblType, LblStore})

initShortcuts()
}

Expand Down Expand Up @@ -692,6 +701,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVGrpcConnectionState)
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
prometheus.MustRegister(TiKVStoreSlowScoreGauge)
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down