Skip to content

Commit

Permalink
Introduce the RURuntimeStats (#732)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Mar 16, 2023
1 parent c9119d0 commit 9d95090
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/stretchr/testify v1.8.1
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.6-0.20230228091502-e2da5527026f
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac
go.uber.org/goleak v1.2.1
)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
Expand Down
32 changes: 25 additions & 7 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package client

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/tikv/client-go/v2/internal/resourcecontrol"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/util"
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
)

Expand All @@ -34,17 +36,17 @@ var _ Client = interceptedClient{}

type interceptedClient struct {
Client
ruRuntimeStatsMap *sync.Map
}

// NewInterceptedClient creates a Client which can execute interceptor.
func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client {
return interceptedClient{client, ruRuntimeStatsMap}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
resourceGroupName := req.GetResourceGroupName()
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, resourceGroupName)
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS()))
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
Expand All @@ -61,6 +63,16 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
return r.Client.SendRequest(ctx, addr, req, timeout)
}

func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats {
if r.ruRuntimeStatsMap == nil || startTS == 0 {
return nil
}
if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok {
return v.(*util.RURuntimeStats)
}
return nil
}

var (
// ResourceControlSwitch is used to control whether to enable the resource control.
ResourceControlSwitch atomic.Value
Expand All @@ -73,11 +85,12 @@ var (
func buildResourceControlInterceptor(
ctx context.Context,
req *tikvrpc.Request,
resourceGroupName string,
ruRuntimeStats *util.RURuntimeStats,
) interceptor.RPCInterceptor {
if !ResourceControlSwitch.Load().(bool) {
return nil
}
resourceGroupName := req.GetResourceGroupName()
// When the group name is empty or "default", we don't need to
// perform the resource control.
if len(resourceGroupName) == 0 || resourceGroupName == "default" {
Expand All @@ -92,14 +105,19 @@ func buildResourceControlInterceptor(
// Build the interceptor.
return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
consumption, err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
ResourceControlInterceptor.OnResponse(ctx, resourceGroupName, reqInfo, respInfo)
consumption, err = ResourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
}
return resp, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c emptyClient) CloseAddr(addr string) error {

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{})
client := NewInterceptedClient(emptyClient{}, nil)
ctx := interceptor.WithRPCInterceptor(context.Background(), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
executed = true
Expand Down
59 changes: 53 additions & 6 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ import (
"google.golang.org/grpc/keepalive"
)

// DCLabelKey indicates the key of label which represents the dc for Store.
const DCLabelKey = "zone"
const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2
// Since the default max transaction TTL is 1 hour, we can use this to
// clean up the RU runtime stats as well.
ruRuntimeStatsCleanThreshold = time.Hour
ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
Expand Down Expand Up @@ -127,6 +134,9 @@ type KVStore struct {

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled

// StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction.
ruRuntimeStatsMap sync.Map

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -209,13 +219,14 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
cancel: cancel,
gP: NewSpool(128, 10*time.Second),
}
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap))
store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)

store.wg.Add(2)
store.wg.Add(3)
go store.runSafePointChecker()
go store.safeTSUpdater()
go store.ruRuntimeStatsMapCleaner()

return store, nil
}
Expand Down Expand Up @@ -531,14 +542,14 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {

func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(time.Second * 2)
t := time.NewTicker(safeTSUpdateInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()
for {
select {
case <-s.ctx.Done():
case <-ctx.Done():
return
case <-t.C:
s.updateSafeTS(ctx)
Expand Down Expand Up @@ -600,6 +611,42 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()

cleanThreshold := ruRuntimeStatsCleanThreshold
if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil {
t.Reset(time.Millisecond * 100)
cleanThreshold = time.Millisecond
}

for {
select {
case <-ctx.Done():
return
case now := <-t.C:
s.ruRuntimeStatsMap.Range(func(key, _ interface{}) bool {
startTSTime := oracle.GetTimeFromTS(key.(uint64))
if now.Sub(startTSTime) >= cleanThreshold {
s.ruRuntimeStatsMap.Delete(key)
}
return true
})
}
}
}

// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it.
func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats {
rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats())
return rrs.(*util.RURuntimeStats)
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
28 changes: 28 additions & 0 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)

func TestKV(t *testing.T) {
util.EnableFailpoints()
suite.Run(t, new(testKVSuite))
}

Expand Down Expand Up @@ -122,3 +125,28 @@ func (s *testKVSuite) TestMinSafeTs() {
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *testKVSuite) TestRURuntimeStatsCleanUp() {
s.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`))
defer func() {
s.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean"))
}()

mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
testSuite: s,
}
s.store.SetTiKVClient(&mockClient)

// Create a ruRuntimeStats first.
startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
ruRuntimeStats := s.store.CreateRURuntimeStats(startTS)
s.NotNil(ruRuntimeStats)
// Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap.
time.Sleep(time.Millisecond * 150)
// The ruRuntimeStatsMap should be cleaned up.
s.store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool {
s.Fail("ruRuntimeStatsMap should be cleaned up")
return true
})
}
48 changes: 48 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,51 @@ func (req *Request) IsRawWriteRequest() bool {

// ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context.
type ResourceGroupTagger func(req *Request)

// GetStartTS returns the `start_ts` of the request.
func (req *Request) GetStartTS() uint64 {
switch req.Type {
case CmdGet:
return req.Get().GetVersion()
case CmdScan:
return req.Scan().GetVersion()
case CmdPrewrite:
return req.Prewrite().GetStartVersion()
case CmdCommit:
return req.Commit().GetStartVersion()
case CmdCleanup:
return req.Cleanup().GetStartVersion()
case CmdBatchGet:
return req.BatchGet().GetVersion()
case CmdBatchRollback:
return req.BatchRollback().GetStartVersion()
case CmdScanLock:
return req.ScanLock().GetMaxVersion()
case CmdResolveLock:
return req.ResolveLock().GetStartVersion()
case CmdPessimisticLock:
return req.PessimisticLock().GetStartVersion()
case CmdPessimisticRollback:
return req.PessimisticRollback().GetStartVersion()
case CmdTxnHeartBeat:
return req.TxnHeartBeat().GetStartVersion()
case CmdCheckTxnStatus:
return req.CheckTxnStatus().GetLockTs()
case CmdCheckSecondaryLocks:
return req.CheckSecondaryLocks().GetStartVersion()
case CmdFlashbackToVersion:
return req.FlashbackToVersion().GetStartTs()
case CmdPrepareFlashbackToVersion:
req.PrepareFlashbackToVersion().GetStartTs()
case CmdCop:
return req.Cop().GetStartTs()
case CmdCopStream:
return req.Cop().GetStartTs()
case CmdBatchCop:
return req.BatchCop().GetStartTs()
case CmdMvccGetByStartTs:
return req.MvccGetByStartTs().GetStartTs()
default:
}
return 0
}
2 changes: 1 addition & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type KVTxn struct {
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
// resourceGroupName is the name of tenent resource group.
// resourceGroupName is the name of tenant resource group.
resourceGroupName string

aggressiveLockingContext *aggressiveLockingContext
Expand Down
1 change: 1 addition & 0 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo

req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos},
kvrpcpb.Context{
// TODO: how to pass the `start_ts` here?
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
},
Expand Down
Loading

0 comments on commit 9d95090

Please sign in to comment.