Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add runtime information for point-get executor (#18666) #18817

Merged
merged 11 commits into from
Jul 31, 2020
4 changes: 0 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -98,9 +97,6 @@ type MockPhysicalPlan interface {
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
if config.GetGlobalConfig().EnableCollectExecutionInfo && b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
}
switch v := p.(type) {
case nil:
return nil
Expand Down
12 changes: 11 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,10 +1543,16 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
TaskID: stmtctx.AllocateTaskID(),
}
<<<<<<< HEAD
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
=======
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
globalConfig := config.GetGlobalConfig()
if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
}
switch config.GetGlobalConfig().OOMAction {
switch globalConfig.OOMAction {
case config.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
Expand Down Expand Up @@ -1670,6 +1676,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
if globalConfig.EnableCollectExecutionInfo {
sc.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
Expand Down
61 changes: 61 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
<<<<<<< HEAD
=======
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/rowcodec"
)

Expand Down Expand Up @@ -80,6 +86,8 @@ type PointGetExecutor struct {

// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *pointGetRuntimeStats
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -146,10 +154,41 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
<<<<<<< HEAD
=======
return nil
}

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
if e.runtimeStats != nil {
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
return nil
}

// Next implements the Executor interface.
func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
}
e.done = true

>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
var tblID int64
if e.partInfo != nil {
tblID = e.partInfo.ID
Expand Down Expand Up @@ -390,3 +429,25 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
}
return nil
}

type pointGetRuntimeStats struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

func (e *pointGetRuntimeStats) String() string {
var basic, rpcStatsStr string
if e.BasicRuntimeStats != nil {
basic = e.BasicRuntimeStats.String()
}
if e.SnapshotRuntimeStats != nil {
rpcStatsStr = e.SnapshotRuntimeStats.String()
}
if rpcStatsStr == "" {
return basic
}
if basic == "" {
return rpcStatsStr
}
return basic + ", " + rpcStatsStr
}
7 changes: 7 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ const (
ReplicaRead
// Set task ID
TaskID
<<<<<<< HEAD
=======
// InfoSchema is schema version used by txn startTS.
InfoSchema
// CollectRuntimeStats is used to enable collect runtime stats.
CollectRuntimeStats
>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
)

// Priority value for transaction priority.
Expand Down
89 changes: 89 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package core_test

import (
"bytes"
"fmt"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -1177,3 +1179,90 @@ func (s *testIntegrationSuite) TestIssue16935(c *C) {

tk.MustQuery("SELECT * FROM t0 LEFT JOIN v0 ON TRUE WHERE v0.c0 IS NULL;")
}
<<<<<<< HEAD
=======

func (s *testIntegrationSuite) TestAccessPathOnClusterIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_enable_clustered_index = 1")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b varchar(20), c decimal(40,10), d int, primary key(a,b), key(c))")
tk.MustExec(`insert into t1 values (1,"111",1.1,11), (2,"222",2.2,12), (3,"333",3.3,13)`)
tk.MustExec("analyze table t1")

var input []string
var output []struct {
SQL string
Plan []string
Res []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows())
output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows())
})
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(tt).Sort().Check(testkit.Rows(output[i].Res...))
}
}

func (s *testIntegrationSuite) TestClusterIndexUniqueDoubleRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database cluster_idx_unique_double_read;")
tk.MustExec("use cluster_idx_unique_double_read;")
defer tk.MustExec("drop database cluster_idx_unique_double_read;")
tk.MustExec("set @@tidb_enable_clustered_index = 1")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t (a varchar(64), b varchar(64), uk int, v int, primary key(a, b), unique key uuk(uk));")
tk.MustExec("insert t values ('a', 'a1', 1, 11), ('b', 'b1', 2, 22), ('c', 'c1', 3, 33);")
tk.MustQuery("select * from t use index (uuk);").Check(testkit.Rows("a a1 1 11", "b b1 2 22", "c c1 3 33"))
}

func (s *testIntegrationSuite) TestIndexJoinOnClusteredIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_enable_clustered_index = 1")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t (a int, b varchar(20), c decimal(40,10), d int, primary key(a,b), key(c))")
tk.MustExec(`insert into t values (1,"111",1.1,11), (2,"222",2.2,12), (3,"333",3.3,13)`)
tk.MustExec("analyze table t")

var input []string
var output []struct {
SQL string
Plan []string
Res []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows())
output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...))
}
}

func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b varchar(20))")
tk.MustExec("insert into t values (1,1)")

res := tk.MustQuery("explain analyze select * from t where a=1;")
resBuff := bytes.NewBufferString("")
for _, row := range res.Rows() {
fmt.Fprintf(resBuff, "%s\n", row)
}
explain := resBuff.String()
c.Assert(strings.Contains(explain, "Get:{num_rpc:"), IsTrue, Commentf("%s", explain))
c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain))
}
>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
7 changes: 7 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,13 +821,19 @@ type clientHelper struct {
*minCommitTSPushed
Client
resolveLite bool
stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
}

// ResolveLocks wraps the ResolveLocks function and store the resolved result.
func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
var err error
var resolvedLocks []uint64
var msBeforeTxnExpired int64
if ch.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start))
}(time.Now())
}
if ch.resolveLite {
msBeforeTxnExpired, resolvedLocks, err = ch.LockResolver.resolveLocksLite(bo, callerStartTS, locks)
} else {
Expand All @@ -849,6 +855,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID
if len(directStoreAddr) > 0 {
sender.storeAddr = directStoreAddr
}
sender.stats = ch.stats
req.Context.ResolvedLocks = ch.minCommitTSPushed.Get()
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType)
return resp, ctx, sender.storeAddr, err
Expand Down
47 changes: 47 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type RegionRequestSender struct {
storeAddr string
rpcError error
failStoreIDs map[uint64]struct{}
stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
}

// RegionRequestRuntimeStats records the runtime stats of send region requests.
type RegionRequestRuntimeStats struct {
count int64
// Send region request consume time.
consume int64
}

// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
Expand All @@ -78,7 +86,16 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA
if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil {
return nil, false, errors.Trace(e)
}
<<<<<<< HEAD
resp, err = ss.client.SendRequest(bo.ctx, ctx.Addr, req, timout)
=======
if ss.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start))
}(time.Now())
}
resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout)
>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
if err != nil {
ss.rpcError = err
for _, failedCtx := range ctxs {
Expand All @@ -93,6 +110,19 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA
return
}

func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) {
stat, ok := stats[cmd]
if !ok {
stats[cmd] = &RegionRequestRuntimeStats{
count: 1,
consume: int64(d),
}
return
}
stat.count++
stat.consume += int64(d)
}

func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
Expand Down Expand Up @@ -263,8 +293,25 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
}
defer s.releaseStoreToken(ctx.Store)
}
<<<<<<< HEAD
resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout)

=======

if s.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start))
}(time.Now())
}

ctx := bo.ctx
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
var cancel context.CancelFunc
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
defer cancel()
}
resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout)
>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666)
if err != nil {
s.rpcError = err
if e := s.onSendFail(bo, ctx, err); e != nil {
Expand Down
Loading