Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

variable: add variable to control the concurrency of buildSampling #47582

Merged
merged 6 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}

sc := e.ctx.GetSessionVars().StmtCtx
statsConcurrency, err := getBuildStatsConcurrency(e.ctx)
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx)
if err != nil {
return 0, nil, nil, nil, nil, err
}

// Start workers to merge the result from collectors.
mergeResultCh := make(chan *samplingMergeResult, statsConcurrency)
mergeTaskCh := make(chan []byte, statsConcurrency)
mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency)
mergeTaskCh := make(chan []byte, samplingStatsConcurrency)
var taskEg errgroup.Group
// Start read data from resultHandler and send them to mergeTaskCh.
taskEg.Go(func() (err error) {
Expand All @@ -283,8 +283,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
return readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker)
})
e.samplingMergeWg = &util.WaitGroupWrapper{}
e.samplingMergeWg.Add(statsConcurrency)
for i := 0; i < statsConcurrency; i++ {
e.samplingMergeWg.Add(samplingStatsConcurrency)
for i := 0; i < samplingStatsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i)
}
// Merge the result from collectors.
Expand All @@ -296,7 +296,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
err = getAnalyzePanicErr(r)
}
}()
for mergeWorkerPanicCnt < statsConcurrency {
for mergeWorkerPanicCnt < samplingStatsConcurrency {
mergeResult, ok := <-mergeResultCh
if !ok {
break
Expand Down Expand Up @@ -378,16 +378,16 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
fmSketches = make([]*statistics.FMSketch, 0, totalLen)
buildResultChan := make(chan error, totalLen)
buildTaskChan := make(chan *samplingBuildTask, totalLen)
if totalLen < statsConcurrency {
statsConcurrency = totalLen
if totalLen < samplingStatsConcurrency {
samplingStatsConcurrency = totalLen
}
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(buildResultChan)
sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo))
exitCh := make(chan struct{})
e.samplingBuilderWg.Add(statsConcurrency)
e.samplingBuilderWg.Add(samplingStatsConcurrency)

// Start workers to build stats.
for i := 0; i < statsConcurrency; i++ {
for i := 0; i < samplingStatsConcurrency; i++ {
e.samplingBuilderWg.Run(func() {
e.subBuildWorker(buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh)
})
Expand Down Expand Up @@ -430,7 +430,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
close(buildTaskChan)

panicCnt := 0
for panicCnt < statsConcurrency {
for panicCnt < samplingStatsConcurrency {
err1, ok := <-buildResultChan
if !ok {
break
Expand Down
12 changes: 10 additions & 2 deletions executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,24 @@ import (
"go.uber.org/atomic"
)

func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) {
func getIntFromSessionVars(ctx sessionctx.Context, name string) (int, error) {
sessionVars := ctx.GetSessionVars()
concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), variable.TiDBBuildStatsConcurrency)
concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), name)
if err != nil {
return 0, err
}
c, err := strconv.ParseInt(concurrency, 10, 64)
return int(c), err
}

func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) {
return getIntFromSessionVars(ctx, variable.TiDBBuildStatsConcurrency)
}

func getBuildSamplingStatsConcurrency(ctx sessionctx.Context) (int, error) {
return getIntFromSessionVars(ctx, variable.TiDBBuildSamplingStatsConcurrency)
}

var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
var errAnalyzeOOM = errors.Errorf("analyze panic due to memory quota exceeds, please try with smaller samplerate(refer to %d/count)", config.DefRowsForSampleRate)

Expand Down
5 changes: 5 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,15 @@ func TestSetVar(t *testing.T) {

tk.MustExec("set tidb_build_stats_concurrency = 42")
tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows("42"))
tk.MustExec("set tidb_build_sampling_stats_concurrency = 42")
tk.MustQuery(`select @@tidb_build_sampling_stats_concurrency;`).Check(testkit.Rows("42"))
require.Error(t, tk.ExecToErr("set tidb_build_sampling_stats_concurrency = 'abc'"))
require.Error(t, tk.ExecToErr("set tidb_build_stats_concurrency = 'abc'"))
tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows("42"))
tk.MustExec("set tidb_build_stats_concurrency = 257")
tk.MustQuery(`select @@tidb_build_stats_concurrency;`).Check(testkit.Rows(strconv.Itoa(variable.MaxConfigurableConcurrency)))
tk.MustExec("set tidb_build_sampling_stats_concurrency = 257")
tk.MustQuery(`select @@tidb_build_sampling_stats_concurrency;`).Check(testkit.Rows(strconv.Itoa(variable.MaxConfigurableConcurrency)))

tk.MustExec(`set tidb_partition_prune_mode = "static"`)
tk.MustQuery(`select @@tidb_partition_prune_mode;`).Check(testkit.Rows("static"))
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, Value: strconv.Itoa(DefBuildStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildSamplingStatsConcurrency, Value: strconv.Itoa(DefBuildSamplingStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error {
s.AllowCartesianBCJ = TidbOptInt(val, DefOptCartesianBCJ)
return nil
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ const (
// those indices can be scanned concurrently, with the cost of higher system performance impact.
TiDBBuildStatsConcurrency = "tidb_build_stats_concurrency"

// TiDBBuildSamplingStatsConcurrency is used to control the concurrency of build sampling stats task.
TiDBBuildSamplingStatsConcurrency = "tidb_build_sampling_stats_concurrency"

// TiDBDistSQLScanConcurrency is used to set the concurrency of a distsql scan task.
// A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes.
// Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact.
Expand Down Expand Up @@ -1128,6 +1131,7 @@ const (
DefIndexLookupSize = 20000
DefDistSQLScanConcurrency = 15
DefBuildStatsConcurrency = 4
DefBuildSamplingStatsConcurrency = 2
DefAutoAnalyzeRatio = 0.5
DefAutoAnalyzeStartTime = "00:00 +0000"
DefAutoAnalyzeEndTime = "23:59 +0000"
Expand Down