Skip to content

Commit

Permalink
variable: add variable to control the concurrency of buildSampling (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Oct 13, 2023
1 parent bf8f570 commit f4b287d
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
22 changes: 11 additions & 11 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}

sc := e.ctx.GetSessionVars().StmtCtx
statsConcurrency, err := getBuildStatsConcurrency(e.ctx)
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx)
if err != nil {
return 0, nil, nil, nil, nil, err
}

// Start workers to merge the result from collectors.
mergeResultCh := make(chan *samplingMergeResult, statsConcurrency)
mergeTaskCh := make(chan []byte, statsConcurrency)
mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency)
mergeTaskCh := make(chan []byte, samplingStatsConcurrency)
var taskEg errgroup.Group
// Start read data from resultHandler and send them to mergeTaskCh.
taskEg.Go(func() (err error) {
Expand All @@ -283,8 +283,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker)
})
e.samplingMergeWg = &util.WaitGroupWrapper{}
e.samplingMergeWg.Add(statsConcurrency)
for i := 0; i < statsConcurrency; i++ {
e.samplingMergeWg.Add(samplingStatsConcurrency)
for i := 0; i < samplingStatsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i)
}
// Merge the result from collectors.
Expand All @@ -296,7 +296,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
err = getAnalyzePanicErr(r)
}
}()
for mergeWorkerPanicCnt < statsConcurrency {
for mergeWorkerPanicCnt < samplingStatsConcurrency {
mergeResult, ok := <-mergeResultCh
if !ok {
break
Expand Down Expand Up @@ -378,16 +378,16 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
fmSketches = make([]*statistics.FMSketch, 0, totalLen)
buildResultChan := make(chan error, totalLen)
buildTaskChan := make(chan *samplingBuildTask, totalLen)
if totalLen < statsConcurrency {
statsConcurrency = totalLen
if totalLen < samplingStatsConcurrency {
samplingStatsConcurrency = totalLen
}
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(buildResultChan)
sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo))
exitCh := make(chan struct{})
e.samplingBuilderWg.Add(statsConcurrency)
e.samplingBuilderWg.Add(samplingStatsConcurrency)

// Start workers to build stats.
for i := 0; i < statsConcurrency; i++ {
for i := 0; i < samplingStatsConcurrency; i++ {
e.samplingBuilderWg.Run(func() {
e.subBuildWorker(buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh)
})
Expand Down Expand Up @@ -430,7 +430,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
close(buildTaskChan)

panicCnt := 0
for panicCnt < statsConcurrency {
for panicCnt < samplingStatsConcurrency {
err1, ok := <-buildResultChan
if !ok {
break
Expand Down
12 changes: 10 additions & 2 deletions executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,24 @@ import (
"go.uber.org/atomic"
)

func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) {
func getIntFromSessionVars(ctx sessionctx.Context, name string) (int, error) {
sessionVars := ctx.GetSessionVars()
concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), variable.TiDBBuildStatsConcurrency)
concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), name)
if err != nil {
return 0, err
}
c, err := strconv.ParseInt(concurrency, 10, 64)
return int(c), err
}

func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) {
return getIntFromSessionVars(ctx, variable.TiDBBuildStatsConcurrency)
}

func getBuildSamplingStatsConcurrency(ctx sessionctx.Context) (int, error) {
return getIntFromSessionVars(ctx, variable.TiDBBuildSamplingStatsConcurrency)
}

var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
var errAnalyzeOOM = errors.Errorf("analyze panic due to memory quota exceeds, please try with smaller samplerate(refer to %d/count)", config.DefRowsForSampleRate)

Expand Down
5 changes: 5 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,15 @@ func TestSetVar(t *testing.T) {

tk.MustExec("set tidb_build_stats_concurrency = 42")
tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows("42"))
tk.MustExec("set tidb_build_sampling_stats_concurrency = 42")
tk.MustQuery(`select @@tidb_build_sampling_stats_concurrency;`).Check(testkit.Rows("42"))
require.Error(t, tk.ExecToErr("set tidb_build_sampling_stats_concurrency = 'abc'"))
require.Error(t, tk.ExecToErr("set tidb_build_stats_concurrency = 'abc'"))
tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows("42"))
tk.MustExec("set tidb_build_stats_concurrency = 257")
tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows(strconv.Itoa(variable.MaxConfigurableConcurrency)))
tk.MustExec("set tidb_build_sampling_stats_concurrency = 257")
tk.MustQuery(`select @@tidb_build_sampling_stats_concurrency;`).Check(testkit.Rows(strconv.Itoa(variable.MaxConfigurableConcurrency)))

tk.MustExec(`set tidb_partition_prune_mode = "static"`)
tk.MustQuery(`select @@tidb_partition_prune_mode;`).Check(testkit.Rows("static"))
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, Value: strconv.Itoa(DefBuildStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildSamplingStatsConcurrency, Value: strconv.Itoa(DefBuildSamplingStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error {
s.AllowCartesianBCJ = TidbOptInt(val, DefOptCartesianBCJ)
return nil
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ const (
// those indices can be scanned concurrently, with the cost of higher system performance impact.
TiDBBuildStatsConcurrency = "tidb_build_stats_concurrency"

// TiDBBuildSamplingStatsConcurrency is used to control the concurrency of build sampling stats task.
TiDBBuildSamplingStatsConcurrency = "tidb_build_sampling_stats_concurrency"

// TiDBDistSQLScanConcurrency is used to set the concurrency of a distsql scan task.
// A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes.
// Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact.
Expand Down Expand Up @@ -1128,6 +1131,7 @@ const (
DefIndexLookupSize = 20000
DefDistSQLScanConcurrency = 15
DefBuildStatsConcurrency = 4
DefBuildSamplingStatsConcurrency = 2
DefAutoAnalyzeRatio = 0.5
DefAutoAnalyzeStartTime = "00:00 +0000"
DefAutoAnalyzeEndTime = "23:59 +0000"
Expand Down

0 comments on commit f4b287d

Please sign in to comment.