diff --git a/pkg/distsql/distsql_test.go b/pkg/distsql/distsql_test.go index 5385041b47474..0f5a9dcf4b37e 100644 --- a/pkg/distsql/distsql_test.go +++ b/pkg/distsql/distsql_test.go @@ -105,7 +105,7 @@ func TestSelectWithRuntimeStats(t *testing.T) { func TestSelectResultRuntimeStats(t *testing.T) { stmtStats := execdetails.NewRuntimeStatsColl(nil) - basic := stmtStats.GetBasicRuntimeStats(1) + basic := stmtStats.GetBasicRuntimeStats(1, true) basic.Record(time.Second, 20) s1 := &selectResultRuntimeStats{ backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond}, diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 061d7e42199e3..4e741647a5f23 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -326,7 +326,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { } else { // CachedPlan type is already checked in last step pointGetPlan := a.Plan.(*plannercore.PointGetPlan) - exec.Init(pointGetPlan) + exec.Recreated(pointGetPlan) a.PsStmt.PointGet.Executor = exec executor = exec } diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 1e84b48db7d30..d52e1201fb303 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -2471,19 +2471,23 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor } tupleJoiner := join.NewJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, otherConditions, exec.RetTypes(leftChild), exec.RetTypes(rightChild), nil, false) - serialExec := &join.NestedLoopApplyExec{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec, innerExec), - InnerExec: innerExec, - OuterExec: outerExec, - OuterFilter: outerFilter, - InnerFilter: innerFilter, - Outer: v.JoinType != logicalop.InnerJoin, - Joiner: tupleJoiner, - OuterSchema: v.OuterSchema, - Sctx: b.ctx, - CanUseCache: v.CanUseCache, - } - executor_metrics.ExecutorCounterNestedLoopApplyExec.Inc() + + constructSerialExec := func() exec.Executor { + serialExec := &join.NestedLoopApplyExec{ + BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec, innerExec), + InnerExec: innerExec, + OuterExec: outerExec, + OuterFilter: outerFilter, + InnerFilter: innerFilter, + Outer: v.JoinType != logicalop.InnerJoin, + Joiner: tupleJoiner, + OuterSchema: v.OuterSchema, + Sctx: b.ctx, + CanUseCache: v.CanUseCache, + } + executor_metrics.ExecutorCounterNestedLoopApplyExec.Inc() + return serialExec + } // try parallel mode if v.Concurrency > 1 { @@ -2495,13 +2499,13 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor clonedInnerPlan, err := plannercore.SafeClone(v.SCtx(), innerPlan) if err != nil { b.err = nil - return serialExec + return constructSerialExec() } corCol := coreusage.ExtractCorColumnsBySchema4PhysicalPlan(clonedInnerPlan, outerPlan.Schema()) clonedInnerExec := b.build(clonedInnerPlan) if b.err != nil { b.err = nil - return serialExec + return constructSerialExec() } innerExecs = append(innerExecs, clonedInnerExec) corCols = append(corCols, corCol) @@ -2525,7 +2529,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor useCache: v.CanUseCache, } } - return serialExec + return constructSerialExec() } func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) exec.Executor { diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index ef1f426adaa5b..38d288555caf1 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -1073,7 +1073,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select idxID := w.idxLookup.getIndexPlanRootID() if w.idxLookup.stmtRuntimeStatsColl != nil { if idxID != w.idxLookup.ID() && w.idxLookup.stats != nil { - w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID) + w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true) } } for i := 0; i < len(results); { diff --git a/pkg/executor/explain_test.go b/pkg/executor/explain_test.go index 51c7fb5d468b4..bd94ca044a513 100644 --- a/pkg/executor/explain_test.go +++ b/pkg/executor/explain_test.go @@ -322,7 +322,8 @@ func TestIssue35911(t *testing.T) { timeStr1 := extractTime.FindStringSubmatch(rows[4][5].(string))[1] time1, err := time.ParseDuration(timeStr1) require.NoError(t, err) - timeStr2 := extractTime.FindStringSubmatch(rows[5][5].(string))[1] + extractTime2, _ := regexp.Compile("^total_time:(.*?),") + timeStr2 := extractTime2.FindStringSubmatch(rows[5][5].(string))[1] time2, err := time.ParseDuration(timeStr2) require.NoError(t, err) // The duration of IndexLookUp should be longer than its build side child @@ -341,6 +342,48 @@ func TestIssue35911(t *testing.T) { require.EqualValues(t, 5, concurrency) } +func TestTotalTimeCases(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (c1 bigint, c2 int, c3 int, c4 int, primary key(c1, c2), index (c3));") + lineNum := 1000 + for i := 0; i < lineNum; i++ { + tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d, %d);", i, i+1, i+2, i+3)) + } + tk.MustExec("analyze table t1") + tk.MustExec("set @@tidb_executor_concurrency = 5;") + + tk.MustExec("set @@tidb_enable_parallel_apply = 0;") + rows := tk.MustQuery("explain analyze select (select /*+ NO_DECORRELATE() */ sum(c4) from t1 where t1.c3 = alias.c3) from t1 alias where alias.c1 = 1;").Rows() + require.True(t, len(rows) == 11) + + // Line3 is tikv_task, others should be all walltime + for i := 0; i < 11; i++ { + if i != 3 { + require.True(t, strings.HasPrefix(rows[i][5].(string), "time:")) + } + } + + // use parallel_apply + tk.MustExec("set @@tidb_enable_parallel_apply = 1;") + rows = tk.MustQuery("explain analyze select (select /*+ NO_DECORRELATE() */ sum(c4) from t1 where t1.c3 = alias.c3) from t1 alias where alias.c1 = 1;").Rows() + require.True(t, len(rows) == 11) + // Line0-2 is walltime, Line3 is tikv_task, Line9 Line10 are special, they are total time in integration environment, while + // walltime in uts due to only one IndexLookUp executor is actually open, others should be all total_time. + for i := 0; i < 11; i++ { + if i == 9 || i == 10 { + continue + } + if i < 3 { + require.True(t, strings.HasPrefix(rows[i][5].(string), "time:")) + } else if i > 3 { + require.True(t, strings.HasPrefix(rows[i][5].(string), "total_time:")) + } + } +} + func flatJSONPlan(j *plannercore.ExplainInfoForEncode) (res []*plannercore.ExplainInfoForEncode) { if j == nil { return diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 78c95001138b3..cd88db82049da 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -376,6 +376,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, byItems: is.ByItems, pushedLimit: pushedIndexLimit, } + if worker.stats != nil && worker.idxID != 0 { + worker.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(worker.idxID, true) + } if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error @@ -1800,7 +1803,7 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return nil, nil, err } if w.stats != nil && w.idxID != 0 { - w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID).Record(time.Since(start), chk.NumRows()) + w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID, false).Record(time.Since(start), chk.NumRows()) } if chk.NumRows() == 0 { failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) { diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index 12e80a8d92fa0..3b3673dc1edee 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -236,7 +236,7 @@ func newExecutorStats(stmtCtx *stmtctx.StatementContext, id int) executorStats { if stmtCtx.RuntimeStatsColl != nil { if id > 0 { - e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id) + e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id, true) } } @@ -338,7 +338,7 @@ func (e *BaseExecutorV2) BuildNewBaseExecutorV2(stmtRuntimeStatsColl *execdetail newExecutorStats := e.executorStats if stmtRuntimeStatsColl != nil { if id > 0 { - newExecutorStats.runtimeStats = stmtRuntimeStatsColl.GetBasicRuntimeStats(id) + newExecutorStats.runtimeStats = stmtRuntimeStatsColl.GetBasicRuntimeStats(id, true) } } diff --git a/pkg/executor/internal/exec/indexusage.go b/pkg/executor/internal/exec/indexusage.go index 223149042844d..23ca78a01b437 100644 --- a/pkg/executor/internal/exec/indexusage.go +++ b/pkg/executor/internal/exec/indexusage.go @@ -106,7 +106,7 @@ func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTab return } - basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID) + basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID, false) if basic == nil { return } diff --git a/pkg/executor/internal/exec/indexusage_test.go b/pkg/executor/internal/exec/indexusage_test.go index 6e59dcb70f1e9..2e787fbf0ed7e 100644 --- a/pkg/executor/internal/exec/indexusage_test.go +++ b/pkg/executor/internal/exec/indexusage_test.go @@ -53,7 +53,7 @@ func TestIndexUsageReporter(t *testing.T) { // For PointGet and BatchPointGet planID := 3 - runtimeStatsColl.GetBasicRuntimeStats(planID).Record(time.Second, 2024) + runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024) reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1) require.Eventually(t, func() bool { @@ -88,7 +88,7 @@ func TestIndexUsageReporter(t *testing.T) { RealtimeCount: 100, }) planID = 4 - runtimeStatsColl.GetBasicRuntimeStats(planID).Record(time.Second, 2024) + runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024) reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1) require.Eventually(t, func() bool { diff --git a/pkg/executor/point_get.go b/pkg/executor/point_get.go index 4add1d108e991..486edb3547c2b 100644 --- a/pkg/executor/point_get.go +++ b/pkg/executor/point_get.go @@ -198,6 +198,15 @@ func matchPartitionNames(pid int64, partitionNames []pmodel.CIStr, pi *model.Par return false } +// Recreated based on Init, change baseExecutor fields also +func (e *PointGetExecutor) Recreated(p *plannercore.PointGetPlan) { + e.Init(p) + // It's necessary to at least reset the `runtimeStats` of the `BaseExecutor`. + // As the `StmtCtx` may have changed, a new index usage reporter should also be created. + e.BaseExecutor = exec.NewBaseExecutor(e.Ctx(), p.Schema(), p.ID()) + e.indexUsageReporter = buildIndexUsageReporter(e.Ctx(), p) +} + // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan) { decoder := NewRowDecoder(e.Ctx(), p.Schema(), p.TblInfo) @@ -218,11 +227,6 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan) { e.partitionDefIdx = p.PartitionIdx e.columns = p.Columns e.buildVirtualColumnInfo() - - // It's necessary to at least reset the `runtimeStats` of the `BaseExecutor`. - // As the `StmtCtx` may have changed, a new index usage reporter should also be created. - e.BaseExecutor = exec.NewBaseExecutor(e.Ctx(), p.Schema(), p.ID()) - e.indexUsageReporter = buildIndexUsageReporter(e.Ctx(), p) } // buildVirtualColumnInfo saves virtual column indices and sort them in definition order diff --git a/pkg/util/execdetails/execdetails.go b/pkg/util/execdetails/execdetails.go index c677a17a938fb..97c08d4676bf0 100644 --- a/pkg/util/execdetails/execdetails.go +++ b/pkg/util/execdetails/execdetails.go @@ -1303,6 +1303,8 @@ func (waitSummary *TiFlashWaitSummary) CanBeIgnored() bool { // BasicRuntimeStats is the basic runtime stats. type BasicRuntimeStats struct { + // the count of executors with the same id + executorCount atomic.Int32 // executor's Next() called times. loop atomic.Int32 // executor consume time, including open, next, and close time. @@ -1323,6 +1325,7 @@ func (e *BasicRuntimeStats) GetActRows() int64 { // Clone implements the RuntimeStats interface. func (e *BasicRuntimeStats) Clone() RuntimeStats { result := &BasicRuntimeStats{} + result.executorCount.Store(e.executorCount.Load()) result.loop.Store(e.loop.Load()) result.consume.Store(e.consume.Load()) result.open.Store(e.open.Load()) @@ -1419,17 +1422,21 @@ func (e *BasicRuntimeStats) String() string { return "" } var str strings.Builder + timePrefix := "" + if e.executorCount.Load() > 1 { + timePrefix = "total_" + } totalTime := e.consume.Load() openTime := e.open.Load() closeTime := e.close.Load() - str.WriteString("time:") + str.WriteString(fmt.Sprintf("%stime:", timePrefix)) str.WriteString(FormatDuration(time.Duration(totalTime))) if openTime >= int64(time.Millisecond) { - str.WriteString(", open:") + str.WriteString(fmt.Sprintf(", %sopen:", timePrefix)) str.WriteString(FormatDuration(time.Duration(openTime))) } if closeTime >= int64(time.Millisecond) { - str.WriteString(", close:") + str.WriteString(fmt.Sprintf(", %sclose:", timePrefix)) str.WriteString(FormatDuration(time.Duration(closeTime))) } str.WriteString(", loops:") @@ -1494,17 +1501,27 @@ func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) { } } -// GetBasicRuntimeStats gets basicRuntimeStats for a executor. -func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int) *BasicRuntimeStats { +// GetBasicRuntimeStats gets basicRuntimeStats for a executor +// When rootStat/rootStat's basicRuntimeStats is nil, the behavior is decided by initNewExecutorStats argument: +// 1. If true, it created a new one, and increase basicRuntimeStats' executorCount +// 2. Else, it returns nil +func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int, initNewExecutorStats bool) *BasicRuntimeStats { e.mu.Lock() defer e.mu.Unlock() stats, ok := e.rootStats[planID] - if !ok { + if !ok && initNewExecutorStats { stats = NewRootRuntimeStats() e.rootStats[planID] = stats } - if stats.basic == nil { + if stats == nil { + return nil + } + + if stats.basic == nil && initNewExecutorStats { stats.basic = &BasicRuntimeStats{} + stats.basic.executorCount.Add(1) + } else if stats.basic != nil && initNewExecutorStats { + stats.basic.executorCount.Add(1) } return stats.basic } diff --git a/pkg/util/execdetails/execdetails_test.go b/pkg/util/execdetails/execdetails_test.go index addf86dde0af5..6128d085389f1 100644 --- a/pkg/util/execdetails/execdetails_test.go +++ b/pkg/util/execdetails/execdetails_test.go @@ -452,8 +452,8 @@ func TestRuntimeStatsWithCommit(t *testing.T) { func TestRootRuntimeStats(t *testing.T) { pid := 1 stmtStats := NewRuntimeStatsColl(nil) - basic1 := stmtStats.GetBasicRuntimeStats(pid) - basic2 := stmtStats.GetBasicRuntimeStats(pid) + basic1 := stmtStats.GetBasicRuntimeStats(pid, true) + basic2 := stmtStats.GetBasicRuntimeStats(pid, true) basic1.RecordOpen(time.Millisecond * 10) basic1.Record(time.Second, 20) basic2.Record(time.Second*2, 30) @@ -474,7 +474,7 @@ func TestRootRuntimeStats(t *testing.T) { Commit: commitDetail, }) stats := stmtStats.GetRootStats(1) - expect := "time:3.11s, open:10ms, close:100ms, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" + expect := "total_time:3.11s, total_open:10ms, total_close:100ms, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" require.Equal(t, expect, stats.String()) }