diff --git a/executor/analyze_col_v2.go b/executor/analyze_col_v2.go index ed08d5f718e6e..5b42975ed895a 100644 --- a/executor/analyze_col_v2.go +++ b/executor/analyze_col_v2.go @@ -264,14 +264,14 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( } sc := e.ctx.GetSessionVars().StmtCtx - statsConcurrency, err := getBuildStatsConcurrency(e.ctx) + samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx) if err != nil { return 0, nil, nil, nil, nil, err } // Start workers to merge the result from collectors. - mergeResultCh := make(chan *samplingMergeResult, statsConcurrency) - mergeTaskCh := make(chan []byte, statsConcurrency) + mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency) + mergeTaskCh := make(chan []byte, samplingStatsConcurrency) var taskEg errgroup.Group // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { @@ -283,8 +283,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker) }) e.samplingMergeWg = &util.WaitGroupWrapper{} - e.samplingMergeWg.Add(statsConcurrency) - for i := 0; i < statsConcurrency; i++ { + e.samplingMergeWg.Add(samplingStatsConcurrency) + for i := 0; i < samplingStatsConcurrency; i++ { go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i) } // Merge the result from collectors. @@ -296,7 +296,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( err = getAnalyzePanicErr(r) } }() - for mergeWorkerPanicCnt < statsConcurrency { + for mergeWorkerPanicCnt < samplingStatsConcurrency { mergeResult, ok := <-mergeResultCh if !ok { break @@ -378,16 +378,16 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( fmSketches = make([]*statistics.FMSketch, 0, totalLen) buildResultChan := make(chan error, totalLen) buildTaskChan := make(chan *samplingBuildTask, totalLen) - if totalLen < statsConcurrency { - statsConcurrency = totalLen + if totalLen < samplingStatsConcurrency { + samplingStatsConcurrency = totalLen } e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(buildResultChan) sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo)) exitCh := make(chan struct{}) - e.samplingBuilderWg.Add(statsConcurrency) + e.samplingBuilderWg.Add(samplingStatsConcurrency) // Start workers to build stats. - for i := 0; i < statsConcurrency; i++ { + for i := 0; i < samplingStatsConcurrency; i++ { e.samplingBuilderWg.Run(func() { e.subBuildWorker(buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh) }) @@ -430,7 +430,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( close(buildTaskChan) panicCnt := 0 - for panicCnt < statsConcurrency { + for panicCnt < samplingStatsConcurrency { err1, ok := <-buildResultChan if !ok { break diff --git a/executor/analyze_utils.go b/executor/analyze_utils.go index 4bcdeca61b543..f13507ebc0796 100644 --- a/executor/analyze_utils.go +++ b/executor/analyze_utils.go @@ -29,9 +29,9 @@ import ( "go.uber.org/atomic" ) -func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) { +func getIntFromSessionVars(ctx sessionctx.Context, name string) (int, error) { sessionVars := ctx.GetSessionVars() - concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), variable.TiDBBuildStatsConcurrency) + concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), name) if err != nil { return 0, err } @@ -39,6 +39,14 @@ func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) { return int(c), err } +func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) { + return getIntFromSessionVars(ctx, variable.TiDBBuildStatsConcurrency) +} + +func getBuildSamplingStatsConcurrency(ctx sessionctx.Context) (int, error) { + return getIntFromSessionVars(ctx, variable.TiDBBuildSamplingStatsConcurrency) +} + var errAnalyzeWorkerPanic = errors.New("analyze worker panic") var errAnalyzeOOM = errors.Errorf("analyze panic due to memory quota exceeds, please try with smaller samplerate(refer to %d/count)", config.DefRowsForSampleRate) diff --git a/executor/set_test.go b/executor/set_test.go index fec2ff9dfbf24..9e04db43ad1e2 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -297,10 +297,15 @@ func TestSetVar(t *testing.T) { tk.MustExec("set tidb_build_stats_concurrency = 42") tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows("42")) + tk.MustExec("set tidb_build_sampling_stats_concurrency = 42") + tk.MustQuery(`select @@tidb_build_sampling_stats_concurrency;`).Check(testkit.Rows("42")) + require.Error(t, tk.ExecToErr("set tidb_build_sampling_stats_concurrency = 'abc'")) require.Error(t, tk.ExecToErr("set tidb_build_stats_concurrency = 'abc'")) tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows("42")) tk.MustExec("set tidb_build_stats_concurrency = 257") tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows(strconv.Itoa(variable.MaxConfigurableConcurrency))) + tk.MustExec("set tidb_build_sampling_stats_concurrency = 257") + tk.MustQuery(`select @@tidb_build_sampling_stats_concurrency;`).Check(testkit.Rows(strconv.Itoa(variable.MaxConfigurableConcurrency))) tk.MustExec(`set tidb_partition_prune_mode = "static"`) tk.MustQuery(`select @@tidb_partition_prune_mode;`).Check(testkit.Rows("static")) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 67c399904cca4..d90894791874e 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1654,6 +1654,7 @@ var defaultSysVars = []*SysVar{ return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, Value: strconv.Itoa(DefBuildStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildSamplingStatsConcurrency, Value: strconv.Itoa(DefBuildSamplingStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error { s.AllowCartesianBCJ = TidbOptInt(val, DefOptCartesianBCJ) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 79c8adbdb0828..5269ff957fdf1 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -289,6 +289,9 @@ const ( // those indices can be scanned concurrently, with the cost of higher system performance impact. TiDBBuildStatsConcurrency = "tidb_build_stats_concurrency" + // TiDBBuildSamplingStatsConcurrency is used to control the concurrency of build sampling stats task. + TiDBBuildSamplingStatsConcurrency = "tidb_build_sampling_stats_concurrency" + // TiDBDistSQLScanConcurrency is used to set the concurrency of a distsql scan task. // A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes. // Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact. @@ -1128,6 +1131,7 @@ const ( DefIndexLookupSize = 20000 DefDistSQLScanConcurrency = 15 DefBuildStatsConcurrency = 4 + DefBuildSamplingStatsConcurrency = 2 DefAutoAnalyzeRatio = 0.5 DefAutoAnalyzeStartTime = "00:00 +0000" DefAutoAnalyzeEndTime = "23:59 +0000"