diff --git a/go.mod b/go.mod index d2d43d4ee6..f3772efed0 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/stretchr/testify v1.8.1 github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a - github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 + github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac github.com/twmb/murmur3 v1.1.3 go.etcd.io/etcd/api/v3 v3.5.2 go.etcd.io/etcd/client/v3 v3.5.2 diff --git a/go.sum b/go.sum index 25e37eb020..d15ddbaf41 100644 --- a/go.sum +++ b/go.sum @@ -202,8 +202,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 7ee8ee2248..f90b85454c 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -12,7 +12,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tidwall/gjson v1.14.1 github.com/tikv/client-go/v2 v2.0.6-0.20230228091502-e2da5527026f - github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 + github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac go.uber.org/goleak v1.2.1 ) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 477647c52e..aacf1c2181 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -441,8 +441,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index c7047c677b..546953aa76 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -16,12 +16,14 @@ package client import ( "context" + "sync" "sync/atomic" "time" "github.com/tikv/client-go/v2/internal/resourcecontrol" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" + "github.com/tikv/client-go/v2/util" resourceControlClient "github.com/tikv/pd/client/resource_group/controller" ) @@ -34,17 +36,17 @@ var _ Client = interceptedClient{} type interceptedClient struct { Client + ruRuntimeStatsMap *sync.Map } // NewInterceptedClient creates a Client which can execute interceptor. -func NewInterceptedClient(client Client) Client { - return interceptedClient{client} +func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client { + return interceptedClient{client, ruRuntimeStatsMap} } func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { // Build the resource control interceptor. - resourceGroupName := req.GetResourceGroupName() - var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, resourceGroupName) + var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS())) // Chain the interceptors if there are multiple interceptors. if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { if finalInterceptor != nil { @@ -61,6 +63,16 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti return r.Client.SendRequest(ctx, addr, req, timeout) } +func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats { + if r.ruRuntimeStatsMap == nil || startTS == 0 { + return nil + } + if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok { + return v.(*util.RURuntimeStats) + } + return nil +} + var ( // ResourceControlSwitch is used to control whether to enable the resource control. ResourceControlSwitch atomic.Value @@ -73,11 +85,12 @@ var ( func buildResourceControlInterceptor( ctx context.Context, req *tikvrpc.Request, - resourceGroupName string, + ruRuntimeStats *util.RURuntimeStats, ) interceptor.RPCInterceptor { if !ResourceControlSwitch.Load().(bool) { return nil } + resourceGroupName := req.GetResourceGroupName() // When the group name is empty or "default", we don't need to // perform the resource control. if len(resourceGroupName) == 0 || resourceGroupName == "default" { @@ -92,14 +105,19 @@ func buildResourceControlInterceptor( // Build the interceptor. return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) + consumption, err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) if err != nil { return nil, err } + ruRuntimeStats.Update(consumption) resp, err := next(target, req) if resp != nil { respInfo := resourcecontrol.MakeResponseInfo(resp) - ResourceControlInterceptor.OnResponse(ctx, resourceGroupName, reqInfo, respInfo) + consumption, err = ResourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo) + if err != nil { + return nil, err + } + ruRuntimeStats.Update(consumption) } return resp, err } diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go index 2e1cca2230..88442fe238 100644 --- a/internal/client/client_interceptor_test.go +++ b/internal/client/client_interceptor_test.go @@ -40,7 +40,7 @@ func (c emptyClient) CloseAddr(addr string) error { func TestInterceptedClient(t *testing.T) { executed := false - client := NewInterceptedClient(emptyClient{}) + client := NewInterceptedClient(emptyClient{}, nil) ctx := interceptor.WithRPCInterceptor(context.Background(), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { executed = true diff --git a/tikv/kv.go b/tikv/kv.go index 3776014e85..86f02a0243 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -74,8 +74,15 @@ import ( "google.golang.org/grpc/keepalive" ) -// DCLabelKey indicates the key of label which represents the dc for Store. -const DCLabelKey = "zone" +const ( + // DCLabelKey indicates the key of label which represents the dc for Store. + DCLabelKey = "zone" + safeTSUpdateInterval = time.Second * 2 + // Since the default max transaction TTL is 1 hour, we can use this to + // clean up the RU runtime stats as well. + ruRuntimeStatsCleanThreshold = time.Hour + ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2 +) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { cfg := config.GetGlobalConfig() @@ -127,6 +134,9 @@ type KVStore struct { replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled + // StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction. + ruRuntimeStatsMap sync.Map + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -209,13 +219,14 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl cancel: cancel, gP: NewSpool(128, 10*time.Second), } - store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap)) store.lockResolver = txnlock.NewLockResolver(store) loadOption(store, opt...) - store.wg.Add(2) + store.wg.Add(3) go store.runSafePointChecker() go store.safeTSUpdater() + go store.ruRuntimeStatsMapCleaner() return store, nil } @@ -531,14 +542,14 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { func (s *KVStore) safeTSUpdater() { defer s.wg.Done() - t := time.NewTicker(time.Second * 2) + t := time.NewTicker(safeTSUpdateInterval) defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) defer cancel() for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return case <-t.C: s.updateSafeTS(ctx) @@ -600,6 +611,42 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } +func (s *KVStore) ruRuntimeStatsMapCleaner() { + defer s.wg.Done() + t := time.NewTicker(ruRuntimeStatsCleanInterval) + defer t.Stop() + ctx, cancel := context.WithCancel(s.ctx) + ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) + defer cancel() + + cleanThreshold := ruRuntimeStatsCleanThreshold + if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil { + t.Reset(time.Millisecond * 100) + cleanThreshold = time.Millisecond + } + + for { + select { + case <-ctx.Done(): + return + case now := <-t.C: + s.ruRuntimeStatsMap.Range(func(key, _ interface{}) bool { + startTSTime := oracle.GetTimeFromTS(key.(uint64)) + if now.Sub(startTSTime) >= cleanThreshold { + s.ruRuntimeStatsMap.Delete(key) + } + return true + }) + } + } +} + +// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it. +func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats { + rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats()) + return rrs.(*util.RURuntimeStats) +} + // EnableResourceControl enables the resource control. func EnableResourceControl() { client.ResourceControlSwitch.Store(true) diff --git a/tikv/kv_test.go b/tikv/kv_test.go index a027cd8262..801948bb85 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" @@ -28,9 +29,11 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" ) func TestKV(t *testing.T) { + util.EnableFailpoints() suite.Run(t, new(testKVSuite)) } @@ -122,3 +125,28 @@ func (s *testKVSuite) TestMinSafeTs() { s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2)) s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) } + +func (s *testKVSuite) TestRURuntimeStatsCleanUp() { + s.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`)) + defer func() { + s.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean")) + }() + + mockClient := storeSafeTsMockClient{ + Client: s.store.GetTiKVClient(), + testSuite: s, + } + s.store.SetTiKVClient(&mockClient) + + // Create a ruRuntimeStats first. + startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0) + ruRuntimeStats := s.store.CreateRURuntimeStats(startTS) + s.NotNil(ruRuntimeStats) + // Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap. + time.Sleep(time.Millisecond * 150) + // The ruRuntimeStatsMap should be cleaned up. + s.store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool { + s.Fail("ruRuntimeStatsMap should be cleaned up") + return true + }) +} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 40f33f0dfb..9993543410 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -1289,3 +1289,51 @@ func (req *Request) IsRawWriteRequest() bool { // ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context. type ResourceGroupTagger func(req *Request) + +// GetStartTS returns the `start_ts` of the request. +func (req *Request) GetStartTS() uint64 { + switch req.Type { + case CmdGet: + return req.Get().GetVersion() + case CmdScan: + return req.Scan().GetVersion() + case CmdPrewrite: + return req.Prewrite().GetStartVersion() + case CmdCommit: + return req.Commit().GetStartVersion() + case CmdCleanup: + return req.Cleanup().GetStartVersion() + case CmdBatchGet: + return req.BatchGet().GetVersion() + case CmdBatchRollback: + return req.BatchRollback().GetStartVersion() + case CmdScanLock: + return req.ScanLock().GetMaxVersion() + case CmdResolveLock: + return req.ResolveLock().GetStartVersion() + case CmdPessimisticLock: + return req.PessimisticLock().GetStartVersion() + case CmdPessimisticRollback: + return req.PessimisticRollback().GetStartVersion() + case CmdTxnHeartBeat: + return req.TxnHeartBeat().GetStartVersion() + case CmdCheckTxnStatus: + return req.CheckTxnStatus().GetLockTs() + case CmdCheckSecondaryLocks: + return req.CheckSecondaryLocks().GetStartVersion() + case CmdFlashbackToVersion: + return req.FlashbackToVersion().GetStartTs() + case CmdPrepareFlashbackToVersion: + req.PrepareFlashbackToVersion().GetStartTs() + case CmdCop: + return req.Cop().GetStartTs() + case CmdCopStream: + return req.Cop().GetStartTs() + case CmdBatchCop: + return req.BatchCop().GetStartTs() + case CmdMvccGetByStartTs: + return req.MvccGetByStartTs().GetStartTs() + default: + } + return 0 +} diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 2bc42d62b2..7074b967bc 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -151,7 +151,7 @@ type KVTxn struct { interceptor interceptor.RPCInterceptor assertionLevel kvrpcpb.AssertionLevel *util.RequestSource - // resourceGroupName is the name of tenent resource group. + // resourceGroupName is the name of tenant resource group. resourceGroupName string aggressiveLockingContext *aggressiveLockingContext diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 8005909e4c..c5882e39af 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -288,6 +288,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos}, kvrpcpb.Context{ + // TODO: how to pass the `start_ts` here? RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }, diff --git a/util/execdetails.go b/util/execdetails.go index 2e6c904b2f..fa764553a9 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -37,6 +37,7 @@ package util import ( "bytes" "context" + "fmt" "math" "strconv" "sync" @@ -44,6 +45,8 @@ import ( "time" "github.com/pingcap/kvproto/pkg/kvrpcpb" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + uatomic "go.uber.org/atomic" ) type commitDetailCtxKeyType struct{} @@ -678,3 +681,45 @@ type ResolveLockDetail struct { func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) { rd.ResolveLockTime += resolveLock.ResolveLockTime } + +// RURuntimeStats is the runtime stats collector for RU. +type RURuntimeStats struct { + readRU *uatomic.Float64 + writeRU *uatomic.Float64 +} + +// NewRURuntimeStats creates a new RURuntimeStats. +func NewRURuntimeStats() *RURuntimeStats { + return &RURuntimeStats{ + readRU: uatomic.NewFloat64(0), + writeRU: uatomic.NewFloat64(0), + } +} + +// Clone implements the RuntimeStats interface. +func (rs *RURuntimeStats) Clone() *RURuntimeStats { + return &RURuntimeStats{ + readRU: uatomic.NewFloat64(rs.readRU.Load()), + writeRU: uatomic.NewFloat64(rs.writeRU.Load()), + } +} + +// Merge implements the RuntimeStats interface. +func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { + rs.readRU.Add(other.readRU.Load()) + rs.writeRU.Add(other.writeRU.Load()) +} + +// String implements fmt.Stringer interface. +func (rs *RURuntimeStats) String() string { + return fmt.Sprintf("RRU:%f, WRU:%f", rs.readRU.Load(), rs.writeRU.Load()) +} + +// Update updates the RU runtime stats with the given consumption info. +func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) { + if rs == nil || consumption == nil { + return + } + rs.readRU.Add(consumption.RRU) + rs.writeRU.Add(consumption.WRU) +}