diff --git a/distsql/distsql.go b/distsql/distsql.go index 2f952da2a7d3c..77b75efc480fd 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -23,17 +23,20 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "go.uber.org/zap" ) // DispatchMPPTasks dispatches all tasks and returns an iterator. func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) { + ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) _, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback) if resp == nil { @@ -88,6 +91,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie zap.String("stmt", originalSQL)) } } + + ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb) if resp == nil { return nil, errors.New("client returns nil response") @@ -149,8 +154,9 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq // Analyze do a analyze request. func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{}, - isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) { - resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil) + isRestrict bool, stmtCtx *stmtctx.StatementContext) (SelectResult, error) { + ctx = WithSQLKvExecCounterInterceptor(ctx, stmtCtx) + resp := client.Send(ctx, kvReq, vars, stmtCtx.MemTracker, false, nil) if resp == nil { return nil, errors.New("client returns nil response") } @@ -244,3 +250,15 @@ func init() { systemEndian = tipb.Endian_LittleEndian } } + +// WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the +// number of SQL executions of each TiKV (if any). +func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context { + if variable.TopSQLEnabled() && stmtCtx.KvExecCounter != nil { + // Unlike calling Transaction or Snapshot interface, in distsql package we directly + // face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of + // calling SetRPCInterceptor on Transaction or Snapshot. + return interceptor.WithRPCInterceptor(ctx, stmtCtx.KvExecCounter.RPCInterceptor()) + } + return ctx +} diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 6ff4a5cb284e5..9be2738da2251 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -191,7 +191,7 @@ func TestAnalyze(t *testing.T) { Build() require.NoError(t, err) - response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx.MemTracker) + response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx) require.NoError(t, err) result, ok := response.(*selectResult) diff --git a/executor/adapter.go b/executor/adapter.go index 87f87a9712516..39e660099ed3f 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -233,6 +233,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec ctx = opentracing.ContextWithSpan(ctx, span1) } ctx = a.setPlanLabelForTopSQL(ctx) + a.observeStmtBeginForTopSQL() startTs := uint64(math.MaxUint64) err := a.Ctx.InitTxnWithStartTS(startTs) if err != nil { @@ -383,6 +384,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { } // ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`. ctx = a.setPlanLabelForTopSQL(ctx) + a.observeStmtBeginForTopSQL() if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -896,6 +898,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo // `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`. a.LogSlowQuery(txnTS, succ, hasMoreResults) a.SummaryStmt(succ) + a.observeStmtFinishedForTopSQL() if sessVars.StmtCtx.IsTiFlash.Load() { if succ { totalTiFlashQuerySuccCounter.Inc() @@ -1247,3 +1250,31 @@ func (a *ExecStmt) GetTextToLog() string { } return sql } + +func (a *ExecStmt) observeStmtBeginForTopSQL() { + if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil { + sqlDigest, planDigest := a.getSQLPlanDigest() + vars.StmtStats.OnExecutionBegin(sqlDigest, planDigest) + // This is a special logic prepared for TiKV's SQLExecCount. + vars.StmtCtx.KvExecCounter = vars.StmtStats.CreateKvExecCounter(sqlDigest, planDigest) + } +} + +func (a *ExecStmt) observeStmtFinishedForTopSQL() { + if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil { + sqlDigest, planDigest := a.getSQLPlanDigest() + vars.StmtStats.OnExecutionFinished(sqlDigest, planDigest) + } +} + +func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) { + var sqlDigest, planDigest []byte + vars := a.Ctx.GetSessionVars() + if _, d := vars.StmtCtx.SQLDigest(); d != nil { + sqlDigest = d.Bytes() + } + if _, d := vars.StmtCtx.GetPlanDigest(); d != nil { + planDigest = d.Bytes() + } + return sqlDigest, planDigest +} diff --git a/executor/analyze.go b/executor/analyze.go index 5397c1ee0608c..c8f91ccc13f63 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -400,7 +400,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang return err } ctx := context.TODO() - result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker) + result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx) if err != nil { return err } @@ -763,7 +763,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe return nil, err } ctx := context.TODO() - result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker) + result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx) if err != nil { return nil, err } @@ -1854,6 +1854,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot) for _, t := range e.scanTasks { iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey)) if err != nil { @@ -1875,6 +1876,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.IsolationLevel, kv.SI) snapshot.SetOption(kv.Priority, kv.PriorityLow) setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot) readReplicaType := e.ctx.GetSessionVars().GetReplicaRead() if readReplicaType.IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, readReplicaType) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index c30bf507d6d9d..f642cf92d56ff 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -150,6 +150,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { }) } setResourceGroupTaggerForTxn(stmtCtx, snapshot) + setRPCInterceptorOfExecCounterForTxn(sessVars, snapshot) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/checksum.go b/executor/checksum.go index 69fd6ed319e75..013fd3be2226f 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -128,7 +128,7 @@ func (e *ChecksumTableExec) checksumWorker(taskCh <-chan *checksumTask, resultCh } func (e *ChecksumTableExec) handleChecksumRequest(req *kv.Request) (resp *tipb.ChecksumResponse, err error) { - ctx := context.TODO() + ctx := distsql.WithSQLKvExecCounterInterceptor(context.TODO(), e.ctx.GetSessionVars().StmtCtx) res, err := distsql.Checksum(ctx, e.ctx.GetClient(), req, e.ctx.GetSessionVars().KVVars) if err != nil { return nil, err diff --git a/executor/executor.go b/executor/executor.go index eee07f8774ed0..4338529cea8ac 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1914,3 +1914,11 @@ func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snap snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger()) } } + +// setRPCInterceptorOfExecCounterForTxn binds an interceptor for client-go to count +// the number of SQL executions of each TiKV. +func setRPCInterceptorOfExecCounterForTxn(vars *variable.SessionVars, snapshot kv.Snapshot) { + if snapshot != nil && variable.TopSQLEnabled() && vars.StmtCtx.KvExecCounter != nil { + snapshot.SetOption(kv.RPCInterceptor, vars.StmtCtx.KvExecCounter.RPCInterceptor()) + } +} diff --git a/executor/insert.go b/executor/insert.go index 2862416ddf04a..7b0758c7ff076 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -67,6 +67,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return err } setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn) + setRPCInterceptorOfExecCounterForTxn(sessVars, txn) txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/point_get.go b/executor/point_get.go index 45f3fa76e263f..698626b4e1403 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -191,6 +191,7 @@ func (e *PointGetExecutor) Open(context.Context) error { } }) setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), e.snapshot) return nil } diff --git a/executor/replace.go b/executor/replace.go index 78e0085aa520e..fe1930639f446 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -223,6 +223,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { } } setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, txn) + setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), txn) prefetchStart := time.Now() // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. diff --git a/executor/update.go b/executor/update.go index 7df144b28196c..16024bc403fa1 100644 --- a/executor/update.go +++ b/executor/update.go @@ -275,6 +275,10 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { txn, err := e.ctx.Txn(true) if err == nil { txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) + if e.ctx.GetSessionVars().StmtCtx.KvExecCounter != nil { + // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. + txn.SetOption(kv.RPCInterceptor, e.ctx.GetSessionVars().StmtCtx.KvExecCounter.RPCInterceptor()) + } } } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { diff --git a/go.mod b/go.mod index 3c2868df8b118..6f2a22ed5dd5d 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 + github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index a6b2d60e60a01..36c8604668b6b 100644 --- a/go.sum +++ b/go.sum @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 h1:B+cAIm2P1/SNsVV1vL9/mRaGUVl/vdgV8MU03O0vY28= -github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 h1:38Jst/O36MKXAt7aD1Ipnx4nKwclG66ifkcmi4f0NZ4= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= diff --git a/kv/option.go b/kv/option.go index 2a7a17fedcb6c..f3247b5b31b2c 100644 --- a/kv/option.go +++ b/kv/option.go @@ -68,12 +68,14 @@ const ( ResourceGroupTagger // KVFilter indicates the filter to ignore key-values in the transaction's memory buffer. KVFilter - // SnapInterceptor is used for setting the interceptor for snapshot SnapInterceptor // CommitTSUpperBoundChec is used by cached table // The commitTS must be greater than all the write lock lease of the visited cached table. CommitTSUpperBoundCheck + // RPCInterceptor is interceptor.RPCInterceptor on Transaction or Snapshot, used to decorate + // additional logic before and after the underlying client-go RPC request. + RPCInterceptor ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index a7976a5001bee..b7171a79c9525 100644 --- a/session/session.go +++ b/session/session.go @@ -548,6 +548,10 @@ func (s *session) doCommit(ctx context.Context) error { s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit) s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC) s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger()) + if sessVars.StmtCtx.KvExecCounter != nil { + // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. + s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor()) + } // priority of the sysvar is lower than `start transaction with causal consistency only` if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) { // We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions @@ -2311,6 +2315,9 @@ func (s *session) Close() { s.RollbackTxn(ctx) if s.sessionVars != nil { s.sessionVars.WithdrawAllPreparedStmt() + if s.sessionVars.StmtStats != nil { + s.sessionVars.StmtStats.SetFinished() + } } s.ClearDiskFullOpt() } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index e41eb4766b47b..a064f891dd854 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/resourcegrouptag" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" @@ -207,6 +208,12 @@ type StatementContext struct { // WaitLockLeaseTime is the duration of cached table read lease expiration time. WaitLockLeaseTime time.Duration + + // KvExecCounter is created from SessionVars.StmtStats to count the number of SQL + // executions of the kv layer during the current execution of the statement. + // Its life cycle is limited to this execution, and a new KvExecCounter is + // always created during each statement execution. + KvExecCounter *stmtstats.KvExecCounter } // StmtHints are SessionVars related sql hints. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 8169eaa5c2d66..d03cbdce86fde 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -29,8 +29,6 @@ import ( "sync/atomic" "time" - utilMath "github.com/pingcap/tidb/util/math" - "github.com/pingcap/errors" pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" "github.com/pingcap/tidb/config" @@ -48,10 +46,12 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" + utilMath "github.com/pingcap/tidb/util/math" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tidb/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/twmb/murmur3" @@ -969,6 +969,13 @@ type SessionVars struct { // EnablePaging indicates whether enable paging in coprocessor requests. EnablePaging bool + + // StmtStats is used to count various indicators of each SQL in this session + // at each point in time. These data will be periodically taken away by the + // background goroutine. The background goroutine will continue to aggregate + // all the local data in each session, and finally report them to the remote + // regularly. + StmtStats *stmtstats.StatementStats } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. @@ -1203,6 +1210,7 @@ func NewSessionVars() *SessionVars { MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, EnablePlacementChecks: DefEnablePlacementCheck, Rng: utilMath.NewWithTime(), + StmtStats: stmtstats.CreateStatementStats(), } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index 3c372bae83725..28b73e15e3228 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -23,6 +23,7 @@ import ( derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/driver/options" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/txnkv/txnutil" ) @@ -120,6 +121,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { s.KVSnapshot.SetReadReplicaScope(val.(string)) case kv.SnapInterceptor: s.interceptor = val.(kv.SnapshotInterceptor) + case kv.RPCInterceptor: + s.KVSnapshot.SetRPCInterceptor(val.(interceptor.RPCInterceptor)) } } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 717bf3b154761..bb9e38a4f3c03 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -32,6 +32,7 @@ import ( tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -232,6 +233,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.snapshotInterceptor = val.(kv.SnapshotInterceptor) case kv.CommitTSUpperBoundCheck: txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool)) + case kv.RPCInterceptor: + txn.KVTxn.SetRPCInterceptor(val.(interceptor.RPCInterceptor)) } } diff --git a/util/topsql/stmtstats/aggregator.go b/util/topsql/stmtstats/aggregator.go new file mode 100644 index 0000000000000..d78ed7b62dafb --- /dev/null +++ b/util/topsql/stmtstats/aggregator.go @@ -0,0 +1,156 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "context" + "sync" + "time" + + "go.uber.org/atomic" +) + +// globalAggregator is global *aggregator. +var globalAggregator = newAggregator() + +// StatementStatsRecord is the merged StatementStatsMap with timestamp. +type StatementStatsRecord struct { + Timestamp int64 + Data StatementStatsMap +} + +// aggregator is used to collect and aggregate data from all StatementStats. +// It is responsible for collecting data from all StatementStats, aggregating +// them together, uploading them and regularly cleaning up the closed StatementStats. +type aggregator struct { + ctx context.Context + cancel context.CancelFunc + statsSet sync.Map // map[*StatementStats]struct{} + collectors sync.Map // map[Collector]struct{} + running *atomic.Bool +} + +// newAggregator creates an empty aggregator. +func newAggregator() *aggregator { + return &aggregator{running: atomic.NewBool(false)} +} + +// run will block the current goroutine and execute the main loop of aggregator. +func (m *aggregator) run() { + m.ctx, m.cancel = context.WithCancel(context.Background()) + m.running.Store(true) + defer func() { + m.running.Store(false) + }() + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-tick.C: + m.aggregate() + } + } +} + +// aggregate data from all associated StatementStats. +// If StatementStats has been closed, collect will remove it from the map. +func (m *aggregator) aggregate() { + r := StatementStatsRecord{ + Timestamp: time.Now().Unix(), + Data: StatementStatsMap{}, + } + m.statsSet.Range(func(statsR, _ interface{}) bool { + stats := statsR.(*StatementStats) + if stats.Finished() { + m.unregister(stats) + } + r.Data.Merge(stats.Take()) + return true + }) + m.collectors.Range(func(c, _ interface{}) bool { + c.(Collector).CollectStmtStatsRecords([]StatementStatsRecord{r}) + return true + }) +} + +// register binds StatementStats to aggregator. +// register is thread-safe. +func (m *aggregator) register(stats *StatementStats) { + m.statsSet.Store(stats, struct{}{}) +} + +// unregister removes StatementStats from aggregator. +// unregister is thread-safe. +func (m *aggregator) unregister(stats *StatementStats) { + m.statsSet.Delete(stats) +} + +// registerCollector binds a Collector to aggregator. +// registerCollector is thread-safe. +func (m *aggregator) registerCollector(collector Collector) { + m.collectors.Store(collector, struct{}{}) +} + +// unregisterCollector removes Collector from aggregator. +// unregisterCollector is thread-safe. +func (m *aggregator) unregisterCollector(collector Collector) { + m.collectors.Delete(collector) +} + +// close ends the execution of the current aggregator. +func (m *aggregator) close() { + m.cancel() +} + +// closed returns whether the aggregator has been closed. +func (m *aggregator) closed() bool { + return !m.running.Load() +} + +// SetupAggregator is used to initialize the background aggregator goroutine of the stmtstats module. +// SetupAggregator is **not** thread-safe. +func SetupAggregator() { + if globalAggregator.closed() { + go globalAggregator.run() + } +} + +// CloseAggregator is used to stop the background aggregator goroutine of the stmtstats module. +// SetupAggregator is **not** thread-safe. +func CloseAggregator() { + if !globalAggregator.closed() { + globalAggregator.close() + } +} + +// RegisterCollector binds a Collector to globalAggregator. +// RegisterCollector is thread-safe. +func RegisterCollector(collector Collector) { + globalAggregator.registerCollector(collector) +} + +// UnregisterCollector removes Collector from globalAggregator. +// UnregisterCollector is thread-safe. +func UnregisterCollector(collector Collector) { + globalAggregator.unregisterCollector(collector) +} + +// Collector is used to collect StatementStatsRecord. +type Collector interface { + // CollectStmtStatsRecords is used to collect list of StatementStatsRecord. + CollectStmtStatsRecords([]StatementStatsRecord) +} diff --git a/util/topsql/stmtstats/aggregator_test.go b/util/topsql/stmtstats/aggregator_test.go new file mode 100644 index 0000000000000..24a72bb89131d --- /dev/null +++ b/util/topsql/stmtstats/aggregator_test.go @@ -0,0 +1,93 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func Test_SetupCloseAggregator(t *testing.T) { + for n := 0; n < 3; n++ { + SetupAggregator() + time.Sleep(100 * time.Millisecond) + assert.False(t, globalAggregator.closed()) + CloseAggregator() + time.Sleep(100 * time.Millisecond) + assert.True(t, globalAggregator.closed()) + } +} + +func Test_RegisterUnregisterCollector(t *testing.T) { + SetupAggregator() + defer CloseAggregator() + time.Sleep(100 * time.Millisecond) + collector := newMockCollector(func(records []StatementStatsRecord) {}) + RegisterCollector(collector) + _, ok := globalAggregator.collectors.Load(collector) + assert.True(t, ok) + UnregisterCollector(collector) + _, ok = globalAggregator.collectors.Load(collector) + assert.False(t, ok) +} + +func Test_aggregator_register_collect(t *testing.T) { + a := newAggregator() + stats := &StatementStats{ + data: StatementStatsMap{}, + finished: atomic.NewBool(false), + } + a.register(stats) + stats.OnExecutionBegin([]byte("SQL-1"), []byte("")) + var records []StatementStatsRecord + a.registerCollector(newMockCollector(func(rs []StatementStatsRecord) { + records = append(records, rs...) + })) + a.aggregate() + assert.NotEmpty(t, records) + assert.Equal(t, uint64(1), records[0].Data[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) +} + +func Test_aggregator_run_close(t *testing.T) { + wg := sync.WaitGroup{} + a := newAggregator() + assert.True(t, a.closed()) + wg.Add(1) + go func() { + a.run() + wg.Done() + }() + time.Sleep(100 * time.Millisecond) + assert.False(t, a.closed()) + a.close() + wg.Wait() + assert.True(t, a.closed()) +} + +type mockCollector struct { + f func(records []StatementStatsRecord) +} + +func newMockCollector(f func(records []StatementStatsRecord)) Collector { + return &mockCollector{f: f} +} + +func (c *mockCollector) CollectStmtStatsRecords(records []StatementStatsRecord) { + c.f(records) +} diff --git a/util/topsql/stmtstats/kv_exec_count.go b/util/topsql/stmtstats/kv_exec_count.go new file mode 100644 index 0000000000000..7da4dc8eebdcd --- /dev/null +++ b/util/topsql/stmtstats/kv_exec_count.go @@ -0,0 +1,73 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "sync" + + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +// CreateKvExecCounter creates an associated KvExecCounter from StatementStats. +// The created KvExecCounter can only be used during a single statement execution +// and cannot be reused. +func (s *StatementStats) CreateKvExecCounter(sqlDigest, planDigest []byte) *KvExecCounter { + return &KvExecCounter{ + stats: s, + digest: SQLPlanDigest{SQLDigest: BinaryDigest(sqlDigest), PlanDigest: BinaryDigest(planDigest)}, + marked: map[string]struct{}{}, + } +} + +// KvExecCounter is used to count the number of SQL executions of the kv layer. +// It internally calls addKvExecCount of StatementStats at the right time, to +// ensure the semantic of "SQL execution count of TiKV". +type KvExecCounter struct { + stats *StatementStats + digest SQLPlanDigest + mu sync.Mutex + marked map[string]struct{} // HashSet +} + +// RPCInterceptor returns an interceptor.RPCInterceptor for client-go. +// The returned interceptor is generally expected to be bind to transaction or +// snapshot. In this way, the logic preset by KvExecCounter will be executed before +// each RPC request is initiated, in order to count the number of SQL executions of +// the TiKV dimension. +func (c *KvExecCounter) RPCInterceptor() interceptor.RPCInterceptor { + return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { + return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + c.mark(target) + return next(target, req) + } + } +} + +// mark this target during the current execution of statement. +// If this target is marked for the first time, then increase the number of execution. +// mark is thread-safe. +func (c *KvExecCounter) mark(target string) { + firstMark := false + c.mu.Lock() + if _, ok := c.marked[target]; !ok { + c.marked[target] = struct{}{} + firstMark = true + } + c.mu.Unlock() + if firstMark { + c.stats.addKvExecCount([]byte(c.digest.SQLDigest), []byte(c.digest.PlanDigest), target, 1) + } +} diff --git a/util/topsql/stmtstats/kv_exec_count_test.go b/util/topsql/stmtstats/kv_exec_count_test.go new file mode 100644 index 0000000000000..c55a5300c0891 --- /dev/null +++ b/util/topsql/stmtstats/kv_exec_count_test.go @@ -0,0 +1,43 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func TestKvExecCounter(t *testing.T) { + stats := CreateStatementStats() + counter := stats.CreateKvExecCounter([]byte("SQL-1"), []byte("")) + interceptor := counter.RPCInterceptor() + for n := 0; n < 10; n++ { + _, _ = interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return nil, nil + })("TIKV-1", nil) + } + for n := 0; n < 10; n++ { + _, _ = interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return nil, nil + })("TIKV-2", nil) + } + assert.Len(t, counter.marked, 2) + assert.Contains(t, counter.marked, "TIKV-1") + assert.Contains(t, counter.marked, "TIKV-2") + assert.NotNil(t, stats.data[SQLPlanDigest{SQLDigest: "SQL-1"}]) + assert.Equal(t, uint64(1), stats.data[SQLPlanDigest{SQLDigest: "SQL-1"}].KvStatsItem.KvExecCount["TIKV-1"]) +} diff --git a/util/topsql/stmtstats/main_test.go b/util/topsql/stmtstats/main_test.go new file mode 100644 index 0000000000000..24f6c2574c522 --- /dev/null +++ b/util/topsql/stmtstats/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + goleak.VerifyTestMain(m) +} diff --git a/util/topsql/stmtstats/stmtstats.go b/util/topsql/stmtstats/stmtstats.go new file mode 100644 index 0000000000000..24faa93899cce --- /dev/null +++ b/util/topsql/stmtstats/stmtstats.go @@ -0,0 +1,219 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "sync" + + "go.uber.org/atomic" +) + +var _ StatementObserver = &StatementStats{} + +// StatementObserver is an abstract interface as a callback to the corresponding +// position of TiDB's SQL statement execution process. StatementStats implements +// StatementObserver and performs counting such as SQLExecCount/SQLDuration internally. +// The caller only needs to be responsible for calling different methods at the +// corresponding locations, without paying attention to implementation details. +type StatementObserver interface { + // OnExecutionBegin should be called before statement execution. + OnExecutionBegin(sqlDigest, planDigest []byte) + + // OnExecutionFinished should be called after the statement is executed. + OnExecutionFinished(sqlDigest, planDigest []byte) +} + +// StatementStats is a counter used locally in each session. +// We can use StatementStats to count data such as "the number of SQL executions", +// and it is expected that these statistics will eventually be collected and merged +// in the background. +type StatementStats struct { + mu sync.Mutex + data StatementStatsMap + finished *atomic.Bool +} + +// CreateStatementStats try to create and register an StatementStats. +func CreateStatementStats() *StatementStats { + stats := &StatementStats{ + data: StatementStatsMap{}, + finished: atomic.NewBool(false), + } + globalAggregator.register(stats) + return stats +} + +// OnExecutionBegin implements StatementObserver.OnExecutionBegin. +func (s *StatementStats) OnExecutionBegin(sqlDigest, planDigest []byte) { + s.mu.Lock() + defer s.mu.Unlock() + item := s.GetOrCreateStatementStatsItem(sqlDigest, planDigest) + + item.ExecCount++ + // Count more data here. +} + +// OnExecutionFinished implements StatementObserver.OnExecutionFinished. +func (s *StatementStats) OnExecutionFinished(sqlDigest, planDigest []byte) { + // Count more data here. +} + +// GetOrCreateStatementStatsItem creates the corresponding StatementStatsItem +// for the specified SQLPlanDigest and timestamp if it does not exist before. +// GetOrCreateStatementStatsItem is just a helper function, not responsible for +// concurrency control, so GetOrCreateStatementStatsItem is **not** thread-safe. +func (s *StatementStats) GetOrCreateStatementStatsItem(sqlDigest, planDigest []byte) *StatementStatsItem { + key := SQLPlanDigest{SQLDigest: BinaryDigest(sqlDigest), PlanDigest: BinaryDigest(planDigest)} + item, ok := s.data[key] + if !ok { + s.data[key] = NewStatementStatsItem() + item = s.data[key] + } + return item +} + +// addKvExecCount is used to count the number of executions of a certain SQLPlanDigest for a certain target. +// addKvExecCount is thread-safe. +func (s *StatementStats) addKvExecCount(sqlDigest, planDigest []byte, target string, n uint64) { + s.mu.Lock() + defer s.mu.Unlock() + item := s.GetOrCreateStatementStatsItem(sqlDigest, planDigest) + item.KvStatsItem.KvExecCount[target] += n +} + +// Take takes out all existing StatementStatsMap data from StatementStats. +// Take is thread-safe. +func (s *StatementStats) Take() StatementStatsMap { + s.mu.Lock() + defer s.mu.Unlock() + data := s.data + s.data = StatementStatsMap{} + return data +} + +// SetFinished marks this StatementStats as "finished" and no more counting or +// aggregation should happen. Associated resources will be cleaned up, like background +// aggregators. +// Generally, as the StatementStats is created when a session starts, SetFinished +// should be called when the session ends. +func (s *StatementStats) SetFinished() { + s.finished.Store(true) +} + +// Finished returns whether the StatementStats has been finished. +func (s *StatementStats) Finished() bool { + return s.finished.Load() +} + +// BinaryDigest is converted from parser.Digest.Bytes(), and the purpose +// is to be used as the key of the map. +type BinaryDigest string + +// SQLPlanDigest is used as the key of StatementStatsMap to +// distinguish different sql. +type SQLPlanDigest struct { + SQLDigest BinaryDigest + PlanDigest BinaryDigest +} + +// StatementStatsMap is the local data type of StatementStats. +type StatementStatsMap map[SQLPlanDigest]*StatementStatsItem + +// Merge merges other into StatementStatsMap. +// Values with the same SQLPlanDigest will be merged. +// +// After executing Merge, some pointers in other may be referenced +// by m. So after calling Merge, it is best not to continue to use +// other unless you understand what you are doing. +func (m StatementStatsMap) Merge(other StatementStatsMap) { + if m == nil || other == nil { + return + } + for newDigest, newItem := range other { + item, ok := m[newDigest] + if !ok { + m[newDigest] = newItem + continue + } + item.Merge(newItem) + } +} + +// StatementStatsItem represents a set of mergeable statistics. +// StatementStatsItem is used in a larger data structure to represent +// the stats of a certain SQLPlanDigest under a certain timestamp. +// If there are more indicators that need to be added in the future, +// please add it in StatementStatsItem and implement its aggregation +// in the Merge method. +type StatementStatsItem struct { + // ExecCount represents the number of SQL executions of TiDB. + ExecCount uint64 + + // KvStatsItem contains all indicators of kv layer. + KvStatsItem KvStatementStatsItem +} + +// NewStatementStatsItem creates an empty StatementStatsItem. +func NewStatementStatsItem() *StatementStatsItem { + return &StatementStatsItem{ + KvStatsItem: NewKvStatementStatsItem(), + } +} + +// Merge merges other into StatementStatsItem. +// +// After executing Merge, some pointers in other may be referenced +// by i. So after calling Merge, it is best not to continue to use +// other unless you understand what you are doing. +// +// If you add additional indicators, you need to add their merge code here. +func (i *StatementStatsItem) Merge(other *StatementStatsItem) { + if i == nil || other == nil { + return + } + i.ExecCount += other.ExecCount + i.KvStatsItem.Merge(other.KvStatsItem) +} + +// KvStatementStatsItem is part of StatementStatsItem, it only contains +// indicators of kv layer. +type KvStatementStatsItem struct { + // KvExecCount represents the number of SQL executions of TiKV. + KvExecCount map[string]uint64 +} + +// NewKvStatementStatsItem creates an empty KvStatementStatsItem. +func NewKvStatementStatsItem() KvStatementStatsItem { + return KvStatementStatsItem{ + KvExecCount: map[string]uint64{}, + } +} + +// Merge merges other into KvStatementStatsItem. +// +// After executing Merge, some pointers in other may be referenced +// by i. So after calling Merge, it is best not to continue to use +// other unless you understand what you are doing. +// +// If you add additional indicators, you need to add their merge code here. +func (i *KvStatementStatsItem) Merge(other KvStatementStatsItem) { + if i.KvExecCount == nil { + i.KvExecCount = other.KvExecCount + } else { + for target, count := range other.KvExecCount { + i.KvExecCount[target] += count + } + } +} diff --git a/util/topsql/stmtstats/stmtstats_test.go b/util/topsql/stmtstats/stmtstats_test.go new file mode 100644 index 0000000000000..b78208d918d76 --- /dev/null +++ b/util/topsql/stmtstats/stmtstats_test.go @@ -0,0 +1,187 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstats + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// String is only used for debugging. +func (d SQLPlanDigest) String() string { + bs := bytes.NewBufferString("") + if len(d.SQLDigest) >= 5 { + bs.Write([]byte(d.SQLDigest)[:5]) + } + if len(d.PlanDigest) >= 5 { + bs.WriteRune('-') + bs.Write([]byte(d.PlanDigest)[:5]) + } + return bs.String() +} + +// String is only used for debugging. +func (m StatementStatsMap) String() string { + if len(m) == 0 { + return "StatementStatsMap {}" + } + bs := bytes.NewBufferString("") + bs.WriteString("StatementStatsMap {\n") + for k, v := range m { + bs.WriteString(fmt.Sprintf(" %s => %s\n", k, v)) + } + bs.WriteString("}") + return bs.String() +} + +// String is only used for debugging. +func (i *StatementStatsItem) String() string { + if i == nil { + return "" + } + b, _ := json.Marshal(i) + return string(b) +} + +func TestKvStatementStatsItem_Merge(t *testing.T) { + item1 := KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "127.0.0.1:10001": 1, + "127.0.0.1:10002": 2, + }, + } + item2 := KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "127.0.0.1:10002": 2, + "127.0.0.1:10003": 3, + }, + } + assert.Len(t, item1.KvExecCount, 2) + assert.Len(t, item2.KvExecCount, 2) + item1.Merge(item2) + assert.Len(t, item1.KvExecCount, 3) + assert.Len(t, item2.KvExecCount, 2) + assert.Equal(t, uint64(1), item1.KvExecCount["127.0.0.1:10001"]) + assert.Equal(t, uint64(3), item1.KvExecCount["127.0.0.1:10003"]) + assert.Equal(t, uint64(3), item1.KvExecCount["127.0.0.1:10003"]) +} + +func TestStatementsStatsItem_Merge(t *testing.T) { + item1 := &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: NewKvStatementStatsItem(), + } + item2 := &StatementStatsItem{ + ExecCount: 2, + KvStatsItem: NewKvStatementStatsItem(), + } + item1.Merge(item2) + assert.Equal(t, uint64(3), item1.ExecCount) +} + +func TestStatementStatsMap_Merge(t *testing.T) { + m1 := StatementStatsMap{ + SQLPlanDigest{SQLDigest: "SQL-1"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + SQLPlanDigest{SQLDigest: "SQL-2"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + } + m2 := StatementStatsMap{ + SQLPlanDigest{SQLDigest: "SQL-2"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + SQLPlanDigest{SQLDigest: "SQL-3"}: &StatementStatsItem{ + ExecCount: 1, + KvStatsItem: KvStatementStatsItem{ + KvExecCount: map[string]uint64{ + "KV-1": 1, + "KV-2": 2, + }, + }, + }, + } + assert.Len(t, m1, 2) + assert.Len(t, m2, 2) + m1.Merge(m2) + assert.Len(t, m1, 3) + assert.Len(t, m2, 2) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-2"}].ExecCount) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-3"}].ExecCount) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-1"}].KvStatsItem.KvExecCount["KV-1"]) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-1"}].KvStatsItem.KvExecCount["KV-2"]) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-2"}].KvStatsItem.KvExecCount["KV-1"]) + assert.Equal(t, uint64(4), m1[SQLPlanDigest{SQLDigest: "SQL-2"}].KvStatsItem.KvExecCount["KV-2"]) + assert.Equal(t, uint64(1), m1[SQLPlanDigest{SQLDigest: "SQL-3"}].KvStatsItem.KvExecCount["KV-1"]) + assert.Equal(t, uint64(2), m1[SQLPlanDigest{SQLDigest: "SQL-3"}].KvStatsItem.KvExecCount["KV-2"]) + m1.Merge(nil) + assert.Len(t, m1, 3) +} + +func TestCreateStatementStats(t *testing.T) { + stats := CreateStatementStats() + assert.NotNil(t, stats) + _, ok := globalAggregator.statsSet.Load(stats) + assert.True(t, ok) + assert.False(t, stats.Finished()) + stats.SetFinished() + assert.True(t, stats.Finished()) +} + +func TestExecCounter_AddExecCount_Take(t *testing.T) { + stats := CreateStatementStats() + m := stats.Take() + assert.Len(t, m, 0) + for n := 0; n < 1; n++ { + stats.OnExecutionBegin([]byte("SQL-1"), []byte("")) + } + for n := 0; n < 2; n++ { + stats.OnExecutionBegin([]byte("SQL-2"), []byte("")) + } + for n := 0; n < 3; n++ { + stats.OnExecutionBegin([]byte("SQL-3"), []byte("")) + } + m = stats.Take() + assert.Len(t, m, 3) + assert.Equal(t, uint64(1), m[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) + assert.Equal(t, uint64(2), m[SQLPlanDigest{SQLDigest: "SQL-2"}].ExecCount) + assert.Equal(t, uint64(3), m[SQLPlanDigest{SQLDigest: "SQL-3"}].ExecCount) + m = stats.Take() + assert.Len(t, m, 0) +} diff --git a/util/topsql/stmtstats/stmtstatstest/main_test.go b/util/topsql/stmtstats/stmtstatstest/main_test.go new file mode 100644 index 0000000000000..ecf1220642ecf --- /dev/null +++ b/util/topsql/stmtstats/stmtstatstest/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstatstest + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go b/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go new file mode 100644 index 0000000000000..d37de52178e0e --- /dev/null +++ b/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go @@ -0,0 +1,150 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtstatstest + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/topsql/stmtstats" + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/testutils" +) + +func TestExecCount(t *testing.T) { + // Prepare stmt stats. + stmtstats.SetupAggregator() + defer stmtstats.CloseAggregator() + + // Register stmt stats collector. + var mu sync.Mutex + total := stmtstats.StatementStatsMap{} + stmtstats.RegisterCollector(newMockCollector(func(rs []stmtstats.StatementStatsRecord) { + mu.Lock() + defer mu.Unlock() + for _, r := range rs { + total.Merge(r.Data) + } + })) + + // Create mock store. + store, err := mockstore.NewMockStore(mockstore.WithClusterInspector(func(c testutils.Cluster) { + mockstore.BootstrapWithSingleStore(c) + })) + assert.NoError(t, err) + defer func() { + assert.NoError(t, store.Close()) + }() + + // Prepare mock store. + session.SetSchemaLease(0) + session.DisableStats4Test() + d, err := session.BootstrapSession(store) + assert.NoError(t, err) + defer d.Close() + d.SetStatsUpdating(true) + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + + // Create table for testing. + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int);") + + // Enable TopSQL + variable.TopSQLVariable.Enable.Store(true) + config.UpdateGlobal(func(conf *config.Config) { + conf.TopSQL.ReceiverAddress = "mock-agent" + }) + + // Execute CRUD. + const ExecCountPerSQL = 100 + _, insertSQLDigest := parser.NormalizeDigest("insert into t values (0);") + for n := 0; n < ExecCountPerSQL; n++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d);", n)) + } + _, updateSQLDigest := parser.NormalizeDigest("update t set a = 0 where a = 0;") + for n := 0; n < ExecCountPerSQL; n++ { + tk.MustExec(fmt.Sprintf("update t set a = %d where a = %d;", n, n)) + } + _, selectSQLDigest := parser.NormalizeDigest("select a from t where a = 0;") + for n := 0; n < ExecCountPerSQL; n++ { + tk.MustQuery(fmt.Sprintf("select a from t where a = %d;", n)) + } + _, deleteSQLDigest := parser.NormalizeDigest("delete from t where a = 0;") + for n := 1; n <= ExecCountPerSQL; n++ { + tk.MustExec(fmt.Sprintf("delete from t where a = %d;", n)) + } + + // Wait for collect. + time.Sleep(2 * time.Second) + + // Assertion. + func() { + mu.Lock() + defer mu.Unlock() + + assert.NotEmpty(t, total) + sqlDigests := map[stmtstats.BinaryDigest]struct{}{ + stmtstats.BinaryDigest(insertSQLDigest.Bytes()): {}, + stmtstats.BinaryDigest(updateSQLDigest.Bytes()): {}, + stmtstats.BinaryDigest(selectSQLDigest.Bytes()): {}, + stmtstats.BinaryDigest(deleteSQLDigest.Bytes()): {}, + } + found := 0 + for digest, item := range total { + if _, ok := sqlDigests[digest.SQLDigest]; ok { + found++ + assert.Equal(t, uint64(ExecCountPerSQL), item.ExecCount) + var kvSum uint64 + for _, kvCount := range item.KvStatsItem.KvExecCount { + kvSum += kvCount + } + assert.Equal(t, uint64(ExecCountPerSQL), kvSum) + } + } + assert.Equal(t, 4, found) // insert, update, select, delete + }() + + // Drop table. + tk.MustExec("use test") + r := tk.MustQuery("show tables") + for _, tb := range r.Rows() { + tableName := tb[0] + tk.MustExec(fmt.Sprintf("drop table %v", tableName)) + } +} + +type mockCollector struct { + f func(records []stmtstats.StatementStatsRecord) +} + +func newMockCollector(f func(records []stmtstats.StatementStatsRecord)) stmtstats.Collector { + return &mockCollector{f: f} +} + +func (c *mockCollector) CollectStmtStatsRecords(records []stmtstats.StatementStatsRecord) { + c.f(records) +} diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 8f2aac1566642..12a9da430e7c0 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/topsql/reporter" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/topsql/tracecpu" "go.uber.org/zap" ) @@ -50,6 +51,7 @@ func SetupTopSQL() { tracecpu.GlobalSQLCPUProfiler.SetCollector(remoteReporter) tracecpu.GlobalSQLCPUProfiler.Run() + stmtstats.SetupAggregator() } // Close uses to close and release the top sql resource. @@ -60,6 +62,7 @@ func Close() { if globalTopSQLReport != nil { globalTopSQLReport.Close() } + stmtstats.CloseAggregator() } // AttachSQLInfo attach the sql information info top sql.