Skip to content

Commit

Permalink
executor,distsql,util: distinguish walltime from sum of walltime in e…
Browse files Browse the repository at this point in the history
…xecution info (#57507)

close #56746
  • Loading branch information
yibin authored Nov 28, 2024
1 parent fb7e32f commit 37a1f42
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestSelectWithRuntimeStats(t *testing.T) {

func TestSelectResultRuntimeStats(t *testing.T) {
stmtStats := execdetails.NewRuntimeStatsColl(nil)
basic := stmtStats.GetBasicRuntimeStats(1)
basic := stmtStats.GetBasicRuntimeStats(1, true)
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.Plan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
exec.Recreated(pointGetPlan)
a.PsStmt.PointGet.Executor = exec
executor = exec
}
Expand Down
36 changes: 20 additions & 16 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2471,19 +2471,23 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
}
tupleJoiner := join.NewJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0,
defaultValues, otherConditions, exec.RetTypes(leftChild), exec.RetTypes(rightChild), nil, false)
serialExec := &join.NestedLoopApplyExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec, innerExec),
InnerExec: innerExec,
OuterExec: outerExec,
OuterFilter: outerFilter,
InnerFilter: innerFilter,
Outer: v.JoinType != logicalop.InnerJoin,
Joiner: tupleJoiner,
OuterSchema: v.OuterSchema,
Sctx: b.ctx,
CanUseCache: v.CanUseCache,
}
executor_metrics.ExecutorCounterNestedLoopApplyExec.Inc()

constructSerialExec := func() exec.Executor {
serialExec := &join.NestedLoopApplyExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec, innerExec),
InnerExec: innerExec,
OuterExec: outerExec,
OuterFilter: outerFilter,
InnerFilter: innerFilter,
Outer: v.JoinType != logicalop.InnerJoin,
Joiner: tupleJoiner,
OuterSchema: v.OuterSchema,
Sctx: b.ctx,
CanUseCache: v.CanUseCache,
}
executor_metrics.ExecutorCounterNestedLoopApplyExec.Inc()
return serialExec
}

// try parallel mode
if v.Concurrency > 1 {
Expand All @@ -2495,13 +2499,13 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
clonedInnerPlan, err := plannercore.SafeClone(v.SCtx(), innerPlan)
if err != nil {
b.err = nil
return serialExec
return constructSerialExec()
}
corCol := coreusage.ExtractCorColumnsBySchema4PhysicalPlan(clonedInnerPlan, outerPlan.Schema())
clonedInnerExec := b.build(clonedInnerPlan)
if b.err != nil {
b.err = nil
return serialExec
return constructSerialExec()
}
innerExecs = append(innerExecs, clonedInnerExec)
corCols = append(corCols, corCol)
Expand All @@ -2525,7 +2529,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
useCache: v.CanUseCache,
}
}
return serialExec
return constructSerialExec()
}

func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) exec.Executor {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.stmtRuntimeStatsColl != nil {
if idxID != w.idxLookup.ID() && w.idxLookup.stats != nil {
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID)
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true)
}
}
for i := 0; i < len(results); {
Expand Down
45 changes: 44 additions & 1 deletion pkg/executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func TestIssue35911(t *testing.T) {
timeStr1 := extractTime.FindStringSubmatch(rows[4][5].(string))[1]
time1, err := time.ParseDuration(timeStr1)
require.NoError(t, err)
timeStr2 := extractTime.FindStringSubmatch(rows[5][5].(string))[1]
extractTime2, _ := regexp.Compile("^total_time:(.*?),")
timeStr2 := extractTime2.FindStringSubmatch(rows[5][5].(string))[1]
time2, err := time.ParseDuration(timeStr2)
require.NoError(t, err)
// The duration of IndexLookUp should be longer than its build side child
Expand All @@ -341,6 +342,48 @@ func TestIssue35911(t *testing.T) {
require.EqualValues(t, 5, concurrency)
}

func TestTotalTimeCases(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 bigint, c2 int, c3 int, c4 int, primary key(c1, c2), index (c3));")
lineNum := 1000
for i := 0; i < lineNum; i++ {
tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d, %d);", i, i+1, i+2, i+3))
}
tk.MustExec("analyze table t1")
tk.MustExec("set @@tidb_executor_concurrency = 5;")

tk.MustExec("set @@tidb_enable_parallel_apply = 0;")
rows := tk.MustQuery("explain analyze select (select /*+ NO_DECORRELATE() */ sum(c4) from t1 where t1.c3 = alias.c3) from t1 alias where alias.c1 = 1;").Rows()
require.True(t, len(rows) == 11)

// Line3 is tikv_task, others should be all walltime
for i := 0; i < 11; i++ {
if i != 3 {
require.True(t, strings.HasPrefix(rows[i][5].(string), "time:"))
}
}

// use parallel_apply
tk.MustExec("set @@tidb_enable_parallel_apply = 1;")
rows = tk.MustQuery("explain analyze select (select /*+ NO_DECORRELATE() */ sum(c4) from t1 where t1.c3 = alias.c3) from t1 alias where alias.c1 = 1;").Rows()
require.True(t, len(rows) == 11)
// Line0-2 is walltime, Line3 is tikv_task, Line9 Line10 are special, they are total time in integration environment, while
// walltime in uts due to only one IndexLookUp executor is actually open, others should be all total_time.
for i := 0; i < 11; i++ {
if i == 9 || i == 10 {
continue
}
if i < 3 {
require.True(t, strings.HasPrefix(rows[i][5].(string), "time:"))
} else if i > 3 {
require.True(t, strings.HasPrefix(rows[i][5].(string), "total_time:"))
}
}
}

func flatJSONPlan(j *plannercore.ExplainInfoForEncode) (res []*plannercore.ExplainInfoForEncode) {
if j == nil {
return
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
byItems: is.ByItems,
pushedLimit: pushedIndexLimit,
}
if worker.stats != nil && worker.idxID != 0 {
worker.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(worker.idxID, true)
}
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
Expand Down Expand Up @@ -1800,7 +1803,7 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
return nil, nil, err
}
if w.stats != nil && w.idxID != 0 {
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID).Record(time.Since(start), chk.NumRows())
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID, false).Record(time.Since(start), chk.NumRows())
}
if chk.NumRows() == 0 {
failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func newExecutorStats(stmtCtx *stmtctx.StatementContext, id int) executorStats {

if stmtCtx.RuntimeStatsColl != nil {
if id > 0 {
e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id, true)
}
}

Expand Down Expand Up @@ -338,7 +338,7 @@ func (e *BaseExecutorV2) BuildNewBaseExecutorV2(stmtRuntimeStatsColl *execdetail
newExecutorStats := e.executorStats
if stmtRuntimeStatsColl != nil {
if id > 0 {
newExecutorStats.runtimeStats = stmtRuntimeStatsColl.GetBasicRuntimeStats(id)
newExecutorStats.runtimeStats = stmtRuntimeStatsColl.GetBasicRuntimeStats(id, true)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/internal/exec/indexusage.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTab
return
}

basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID)
basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID, false)
if basic == nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/internal/exec/indexusage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestIndexUsageReporter(t *testing.T) {

// For PointGet and BatchPointGet
planID := 3
runtimeStatsColl.GetBasicRuntimeStats(planID).Record(time.Second, 2024)
runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestIndexUsageReporter(t *testing.T) {
RealtimeCount: 100,
})
planID = 4
runtimeStatsColl.GetBasicRuntimeStats(planID).Record(time.Second, 2024)
runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1)

require.Eventually(t, func() bool {
Expand Down
14 changes: 9 additions & 5 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,15 @@ func matchPartitionNames(pid int64, partitionNames []pmodel.CIStr, pi *model.Par
return false
}

// Recreated based on Init, change baseExecutor fields also
func (e *PointGetExecutor) Recreated(p *plannercore.PointGetPlan) {
e.Init(p)
// It's necessary to at least reset the `runtimeStats` of the `BaseExecutor`.
// As the `StmtCtx` may have changed, a new index usage reporter should also be created.
e.BaseExecutor = exec.NewBaseExecutor(e.Ctx(), p.Schema(), p.ID())
e.indexUsageReporter = buildIndexUsageReporter(e.Ctx(), p)
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan) {
decoder := NewRowDecoder(e.Ctx(), p.Schema(), p.TblInfo)
Expand All @@ -218,11 +227,6 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan) {
e.partitionDefIdx = p.PartitionIdx
e.columns = p.Columns
e.buildVirtualColumnInfo()

// It's necessary to at least reset the `runtimeStats` of the `BaseExecutor`.
// As the `StmtCtx` may have changed, a new index usage reporter should also be created.
e.BaseExecutor = exec.NewBaseExecutor(e.Ctx(), p.Schema(), p.ID())
e.indexUsageReporter = buildIndexUsageReporter(e.Ctx(), p)
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand Down
31 changes: 24 additions & 7 deletions pkg/util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,8 @@ func (waitSummary *TiFlashWaitSummary) CanBeIgnored() bool {

// BasicRuntimeStats is the basic runtime stats.
type BasicRuntimeStats struct {
// the count of executors with the same id
executorCount atomic.Int32
// executor's Next() called times.
loop atomic.Int32
// executor consume time, including open, next, and close time.
Expand All @@ -1323,6 +1325,7 @@ func (e *BasicRuntimeStats) GetActRows() int64 {
// Clone implements the RuntimeStats interface.
func (e *BasicRuntimeStats) Clone() RuntimeStats {
result := &BasicRuntimeStats{}
result.executorCount.Store(e.executorCount.Load())
result.loop.Store(e.loop.Load())
result.consume.Store(e.consume.Load())
result.open.Store(e.open.Load())
Expand Down Expand Up @@ -1419,17 +1422,21 @@ func (e *BasicRuntimeStats) String() string {
return ""
}
var str strings.Builder
timePrefix := ""
if e.executorCount.Load() > 1 {
timePrefix = "total_"
}
totalTime := e.consume.Load()
openTime := e.open.Load()
closeTime := e.close.Load()
str.WriteString("time:")
str.WriteString(fmt.Sprintf("%stime:", timePrefix))
str.WriteString(FormatDuration(time.Duration(totalTime)))
if openTime >= int64(time.Millisecond) {
str.WriteString(", open:")
str.WriteString(fmt.Sprintf(", %sopen:", timePrefix))
str.WriteString(FormatDuration(time.Duration(openTime)))
}
if closeTime >= int64(time.Millisecond) {
str.WriteString(", close:")
str.WriteString(fmt.Sprintf(", %sclose:", timePrefix))
str.WriteString(FormatDuration(time.Duration(closeTime)))
}
str.WriteString(", loops:")
Expand Down Expand Up @@ -1494,17 +1501,27 @@ func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) {
}
}

// GetBasicRuntimeStats gets basicRuntimeStats for a executor.
func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int) *BasicRuntimeStats {
// GetBasicRuntimeStats gets basicRuntimeStats for a executor
// When rootStat/rootStat's basicRuntimeStats is nil, the behavior is decided by initNewExecutorStats argument:
// 1. If true, it created a new one, and increase basicRuntimeStats' executorCount
// 2. Else, it returns nil
func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int, initNewExecutorStats bool) *BasicRuntimeStats {
e.mu.Lock()
defer e.mu.Unlock()
stats, ok := e.rootStats[planID]
if !ok {
if !ok && initNewExecutorStats {
stats = NewRootRuntimeStats()
e.rootStats[planID] = stats
}
if stats.basic == nil {
if stats == nil {
return nil
}

if stats.basic == nil && initNewExecutorStats {
stats.basic = &BasicRuntimeStats{}
stats.basic.executorCount.Add(1)
} else if stats.basic != nil && initNewExecutorStats {
stats.basic.executorCount.Add(1)
}
return stats.basic
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ func TestRuntimeStatsWithCommit(t *testing.T) {
func TestRootRuntimeStats(t *testing.T) {
pid := 1
stmtStats := NewRuntimeStatsColl(nil)
basic1 := stmtStats.GetBasicRuntimeStats(pid)
basic2 := stmtStats.GetBasicRuntimeStats(pid)
basic1 := stmtStats.GetBasicRuntimeStats(pid, true)
basic2 := stmtStats.GetBasicRuntimeStats(pid, true)
basic1.RecordOpen(time.Millisecond * 10)
basic1.Record(time.Second, 20)
basic2.Record(time.Second*2, 30)
Expand All @@ -474,7 +474,7 @@ func TestRootRuntimeStats(t *testing.T) {
Commit: commitDetail,
})
stats := stmtStats.GetRootStats(1)
expect := "time:3.11s, open:10ms, close:100ms, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}"
expect := "total_time:3.11s, total_open:10ms, total_close:100ms, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}"
require.Equal(t, expect, stats.String())
}

Expand Down

0 comments on commit 37a1f42

Please sign in to comment.