diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index be5641716a..988223880d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -233,8 +233,8 @@ func (r *regionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex { func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool { _, s := r.accessStore(tiKVOnly, aidx) - // filter label unmatched store - return s.IsLabelsMatch(op.labels) + // filter label unmatched store and slow stores when ReplicaReadMode == PreferLeader + return s.IsLabelsMatch(op.labels) && (!op.preferLeader || (aidx == r.workTiKVIdx && !s.isSlow())) } func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) { @@ -430,6 +430,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) go c.cacheGC() c.enableForwarding = config.GetGlobalConfig().EnableForwarding + // Default use 15s as the update inerval. + go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) return c } @@ -549,8 +551,9 @@ func (c *RPCContext) String() string { } type storeSelectorOp struct { - leaderOnly bool - labels []*metapb.StoreLabel + leaderOnly bool + preferLeader bool + labels []*metapb.StoreLabel } // StoreSelectorOption configures storeSelectorOp. @@ -570,6 +573,13 @@ func WithLeaderOnly() StoreSelectorOption { } } +// WithPerferLeader indicates selecting stores with leader as priority until leader unaccessible. +func WithPerferLeader() StoreSelectorOption { + return func(op *storeSelectorOp) { + op.preferLeader = true + } +} + // GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region // must be out of date and already dropped from cache. func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32, opts ...StoreSelectorOption) (*RPCContext, error) { @@ -605,6 +615,9 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed, options) case kv.ReplicaReadMixed: store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed, options) + case kv.ReplicaReadPreferLeader: + options.preferLeader = true + store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed, options) default: isLeaderReq = true store, peer, accessIdx, storeIdx = cachedRegion.WorkStorePeer(regionStore) @@ -2230,6 +2243,9 @@ type Store struct { // this mechanism is currently only applicable for TiKV stores. livenessState uint32 unreachableSince time.Time + + // A statistic for counting the request latency to this store + slowScore SlowScoreStat } type resolveState uint64 @@ -2352,6 +2368,9 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} c.storeMu.Lock() + if s.addr == addr { + newStore.slowScore = s.slowScore + } c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() s.setResolveState(deleted) @@ -2629,6 +2648,66 @@ func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) { return } +// getSlowScore returns the slow score of store. +func (s *Store) getSlowScore() uint64 { + return s.slowScore.getSlowScore() +} + +// isSlow returns whether current Store is slow or not. +func (s *Store) isSlow() bool { + return s.slowScore.isSlow() +} + +// updateSlowScore updates the slow score of this store according to the timecost of current request. +func (s *Store) updateSlowScoreStat() { + s.slowScore.updateSlowScore() +} + +// recordSlowScoreStat records timecost of each request. +func (s *Store) recordSlowScoreStat(timecost time.Duration) { + s.slowScore.recordSlowScoreStat(timecost) +} + +func (s *Store) markAlreadySlow() { + s.slowScore.markAlreadySlow() +} + +// asyncUpdateStoreSlowScore updates the slow score of each store periodically. +func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // update store slowScores + c.checkAndUpdateStoreSlowScores() + } + } +} + +func (c *RegionCache) checkAndUpdateStoreSlowScores() { + defer func() { + r := recover() + if r != nil { + logutil.BgLogger().Error("panic in the checkAndUpdateStoreSlowScores goroutine", + zap.Reflect("r", r), + zap.Stack("stack trace")) + } + }() + slowScoreMetrics := make(map[string]float64) + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + store.updateSlowScoreStat() + slowScoreMetrics[store.addr] = float64(store.getSlowScore()) + } + c.storeMu.RUnlock() + for store, score := range slowScoreMetrics { + metrics.TiKVStoreSlowScoreGauge.WithLabelValues(store).Set(score) + } +} + func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { // Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to // access it. diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index d5b8e79eb0..fe5ff039a6 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1702,3 +1702,29 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() { }, 3*time.Second, 200*time.Millisecond) s.checkCache(remaining) } + +func (s *testRegionCacheSuite) TestSlowScoreStat() { + slowScore := SlowScoreStat{ + avgScore: 1, + } + s.False(slowScore.isSlow()) + slowScore.recordSlowScoreStat(time.Millisecond * 1) + slowScore.updateSlowScore() + s.False(slowScore.isSlow()) + for i := 2; i <= 100; i++ { + slowScore.recordSlowScoreStat(time.Millisecond * time.Duration(i)) + if i%5 == 0 { + slowScore.updateSlowScore() + s.False(slowScore.isSlow()) + } + } + for i := 100; i >= 2; i-- { + slowScore.recordSlowScoreStat(time.Millisecond * time.Duration(i)) + if i%5 == 0 { + slowScore.updateSlowScore() + s.False(slowScore.isSlow()) + } + } + slowScore.markAlreadySlow() + s.True(slowScore.isSlow()) +} diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f3312650e4..06f2750b9b 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -264,9 +264,8 @@ type replicaSelector struct { // selectorState is the interface of states of the replicaSelector. // Here is the main state transition diagram: // -// exceeding maxReplicaAttempt -// +-------------------+ || RPC failure && unreachable && no forwarding -// +// exceeding maxReplicaAttempt +// +-------------------+ || RPC failure && unreachable && no forwarding // +-------->+ accessKnownLeader +----------------+ // | +------+------------+ | // | | | @@ -283,8 +282,7 @@ type replicaSelector struct { // | leader becomes v +---+---+ // | reachable +-----+-----+ all proxies are tried ^ // +------------+tryNewProxy+-------------------------+ -// -// +-----------+ +// +-----------+ type selectorState interface { next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) onSendSuccess(*replicaSelector) @@ -516,7 +514,7 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { // If there is no suitable follower, requests will be sent to the leader as a fallback. type accessFollower struct { stateBase - // If tryLeader is true, the request can also be sent to the leader. + // If tryLeader is true, the request can also be sent to the leader when !leader.isSlow() tryLeader bool isGlobalStaleRead bool option storeSelectorOp @@ -551,6 +549,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector state.lastIdx++ } + // If selector is under `ReplicaReadPreferLeader` mode, we should choose leader as high priority. + if state.option.preferLeader { + state.lastIdx = state.leaderIdx + } for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { idx := AccessIndex((int(state.lastIdx) + i) % replicaSize) // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader @@ -592,7 +594,10 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool // The request can only be sent to the leader. ((state.option.leaderOnly && idx == state.leaderIdx) || // Choose a replica with matched labels. - (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner))) + (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)) && + // And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers + // as candidates to serve the Read request. + (!state.option.preferLeader || !replica.store.isSlow())) } type invalidStore struct { @@ -632,6 +637,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik attempts: 0, }) } + var state selectorState if !req.ReplicaReadType.IsFollowerRead() { if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { @@ -644,8 +650,12 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik for _, op := range opts { op(&option) } + if req.ReplicaReadType == kv.ReplicaReadPreferLeader { + WithPerferLeader()(&option) + } + tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader state = &accessFollower{ - tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed, + tryLeader: tryLeader, isGlobalStaleRead: req.IsGlobalStaleRead(), option: option, leaderIdx: regionStore.workTiKVIdx, @@ -805,6 +815,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() // schedule a store addr resolve. store.markNeedCheck(s.regionCache.notifyCheckCh) + store.markAlreadySlow() } } @@ -1202,6 +1213,10 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo if !injectFailOnSend { start := time.Now() resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout) + // Record timecost of external requests on related Store when ReplicaReadMode == PreferLeader. + if req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) { + rpcCtx.Store.recordSlowScoreStat(time.Since(start)) + } if s.Stats != nil { RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { @@ -1522,6 +1537,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetServerIsBusy() != nil { + // Mark the server is busy (the next incoming READs could be redirect + // to expected followers. ) + if ctx != nil && ctx.Store != nil { + ctx.Store.markAlreadySlow() + } + logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go new file mode 100644 index 0000000000..0da7f145b3 --- /dev/null +++ b/internal/locate/slow_score.go @@ -0,0 +1,157 @@ +// Copyright 2023 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package locate + +import ( + "math" + "sync/atomic" + "time" +) + +const ( + slowScoreInitVal = 1 + slowScoreThreshold = 80 + slowScoreMax = 100 + slowScoreInitTimeoutInUs = 500000 // unit: us + slowScoreMaxTimeoutInUs = 30000000 // max timeout of one txn, unit: us + slidingWindowSize = 10 // default size of sliding window +) + +// CountSlidingWindow represents the statistics on a bunch of sliding windows. +type CountSlidingWindow struct { + avg uint64 + sum uint64 + history []uint64 +} + +// Avg returns the average value of this sliding window +func (cnt *CountSlidingWindow) Avg() uint64 { + return cnt.avg +} + +// Sum returns the sum value of this sliding window +func (cnt *CountSlidingWindow) Sum() uint64 { + return cnt.sum +} + +// Append adds one value into this sliding windown and returns the gradient. +func (cnt *CountSlidingWindow) Append(value uint64) (gradient float64) { + prevAvg := cnt.avg + if len(cnt.history) < slidingWindowSize { + cnt.sum += value + } else { + cnt.sum = cnt.sum - cnt.history[0] + value + cnt.history = cnt.history[1:] + } + cnt.history = append(cnt.history, value) + cnt.avg = cnt.sum / (uint64(len(cnt.history))) + gradient = 1e-6 + if prevAvg > 0 && value != prevAvg { + gradient = (float64(value) - float64(prevAvg)) / float64(prevAvg) + } + return gradient +} + +// SlowScoreStat represents the statistics on business of Store. +type SlowScoreStat struct { + avgScore uint64 + avgTimecost uint64 + intervalTimecost uint64 // sum of the timecost in one counting interval. Unit: us + intervalUpdCount uint64 // count of update in one counting interval. + tsCntSlidingWindow CountSlidingWindow // sliding window on timecost + updCntSlidingWindow CountSlidingWindow // sliding window on update count +} + +func (ss *SlowScoreStat) getSlowScore() uint64 { + return atomic.LoadUint64(&ss.avgScore) +} + +// updateSlowScore updates the statistics on SlowScore periodically. +// +// updateSlowScore will update the SlowScore of each Store according to the two factors: +// - Requests in one timing tick. This factor can be regarded as QPS on each store. +// - Average timecost on each request in one timing tick. This factor is used to detect +// whether the relative store is busy on processing requests. +// +// If one Store is slow, its Requests will keep decreasing gradually, but Average timecost will +// keep ascending. And the updating algorithm just follows this mechanism and compute the +// trend of slow, by calculating gradients of slow in each tick. +func (ss *SlowScoreStat) updateSlowScore() { + if atomic.LoadUint64(&ss.avgTimecost) == 0 { + // Init the whole statistics. + atomic.StoreUint64(&ss.avgScore, slowScoreInitVal) + atomic.StoreUint64(&ss.avgTimecost, slowScoreInitTimeoutInUs) + return + } + + avgTimecost := atomic.LoadUint64(&ss.avgTimecost) + intervalUpdCount := atomic.LoadUint64(&ss.intervalUpdCount) + intervalTimecost := atomic.LoadUint64(&ss.intervalTimecost) + + updGradient := float64(1.0) + tsGradient := float64(1.0) + if intervalUpdCount > 0 { + intervalAvgTimecost := intervalTimecost / intervalUpdCount + updGradient = ss.updCntSlidingWindow.Append(intervalUpdCount) + tsGradient = ss.tsCntSlidingWindow.Append(intervalAvgTimecost) + } + // Update avgScore & avgTimecost + avgScore := atomic.LoadUint64(&ss.avgScore) + if updGradient+0.1 <= float64(1e-9) && tsGradient-0.1 >= float64(1e-9) { + risenRatio := math.Min(float64(5.43), math.Abs(tsGradient/updGradient)) + curAvgScore := math.Ceil(math.Min(float64(avgScore)*risenRatio+float64(1.0), float64(slowScoreMax))) + atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, uint64(curAvgScore)) + } else { + costScore := uint64(math.Ceil(math.Max(float64(slowScoreInitVal), math.Min(float64(2.71), 1.0+math.Abs(updGradient))))) + if avgScore <= slowScoreInitVal+costScore { + atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, slowScoreInitVal) + } else { + atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, avgScore-costScore) + } + } + atomic.CompareAndSwapUint64(&ss.avgTimecost, avgTimecost, ss.tsCntSlidingWindow.Avg()) + + // Resets the counter of inteval timecost + atomic.StoreUint64(&ss.intervalTimecost, 0) + atomic.StoreUint64(&ss.intervalUpdCount, 0) +} + +// recordSlowScoreStat records the timecost of each request. +func (ss *SlowScoreStat) recordSlowScoreStat(timecost time.Duration) { + atomic.AddUint64(&ss.intervalUpdCount, 1) + avgTimecost := atomic.LoadUint64(&ss.avgTimecost) + if avgTimecost == 0 { + // Init the whole statistics with the original one. + atomic.StoreUint64(&ss.avgScore, slowScoreInitVal) + atomic.StoreUint64(&ss.avgTimecost, slowScoreInitTimeoutInUs) + atomic.StoreUint64(&ss.intervalTimecost, uint64(timecost/time.Microsecond)) + return + } + curTimecost := uint64(timecost / time.Microsecond) + if curTimecost >= slowScoreMaxTimeoutInUs { + // Current query is too slow to serve (>= 30s, max timeout of a request) in this tick. + atomic.StoreUint64(&ss.avgScore, slowScoreMax) + return + } + atomic.AddUint64(&ss.intervalTimecost, curTimecost) +} + +func (ss *SlowScoreStat) markAlreadySlow() { + atomic.StoreUint64(&ss.avgScore, slowScoreMax) +} + +func (ss *SlowScoreStat) isSlow() bool { + return ss.getSlowScore() >= slowScoreThreshold +} diff --git a/kv/store_vars.go b/kv/store_vars.go index 1f10d3b45f..cce3e146b1 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -64,6 +64,8 @@ const ( ReplicaReadMixed // ReplicaReadLearner stands for 'read from learner'. ReplicaReadLearner + // ReplicaReadPreferLeader stands for 'read from leader and auto-turn to followers if leader is abnormal'. + ReplicaReadPreferLeader ) // IsFollowerRead checks if follower is going to be used to read data. diff --git a/metrics/metrics.go b/metrics/metrics.go index ff948055b2..3b767ceb8c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -98,6 +98,7 @@ var ( TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec TiKVGrpcConnectionState *prometheus.GaugeVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec + TiKVStoreSlowScoreGauge *prometheus.GaugeVec ) // Label constants. @@ -607,6 +608,14 @@ func initMetrics(namespace, subsystem string) { Help: "Counter of keys locked in aggressive locking mode", }, []string{LblType}) + TiKVStoreSlowScoreGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "store_slow_score", + Help: "Slow scores of each tikv node based on RPC timecosts", + }, []string{LblStore}) + initShortcuts() } @@ -679,6 +688,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter) prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) + prometheus.MustRegister(TiKVStoreSlowScoreGauge) } // readCounter reads the value of a prometheus.Counter. diff --git a/util/request_source.go b/util/request_source.go index 144e34dec2..9b0a3dabe7 100644 --- a/util/request_source.go +++ b/util/request_source.go @@ -2,6 +2,7 @@ package util import ( "context" + "strings" ) // RequestSourceTypeKeyType is a dummy type to avoid naming collision in context. @@ -81,6 +82,11 @@ func RequestSourceFromCtx(ctx context.Context) string { return SourceUnknown } +// IsInternalRequest returns the type of the request source. +func IsInternalRequest(source string) bool { + return strings.HasPrefix(source, InternalRequest) +} + // ResourceGroupNameKeyType is the context key type of resource group name. type resourceGroupNameKeyType struct{}