Skip to content

Commit

Permalink
*: sync wait stats loading for stable plan (#30026)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrysan authored Dec 31, 2021
1 parent 0943e10 commit 48fce5e
Show file tree
Hide file tree
Showing 20 changed files with 1,145 additions and 35 deletions.
38 changes: 29 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ const (
DefTableColumnCountLimit = 1017
// DefMaxOfTableColumnCountLimit is maximum limitation of the number of columns in a table
DefMaxOfTableColumnCountLimit = 4096
// DefStatsLoadConcurrencyLimit is limit of the concurrency of stats-load
DefStatsLoadConcurrencyLimit = 1
// DefMaxOfStatsLoadConcurrencyLimit is maximum limitation of the concurrency of stats-load
DefMaxOfStatsLoadConcurrencyLimit = 128
// DefStatsLoadQueueSizeLimit is limit of the size of stats-load request queue
DefStatsLoadQueueSizeLimit = 1
// DefMaxOfStatsLoadQueueSizeLimit is maximum limitation of the size of stats-load request queue
DefMaxOfStatsLoadQueueSizeLimit = 100000
)

// Valid config maps
Expand Down Expand Up @@ -483,11 +491,13 @@ type Performance struct {
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
}

// PlanCache is the PlanCache section of the config.
Expand Down Expand Up @@ -702,10 +712,12 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down Expand Up @@ -1001,6 +1013,14 @@ func (c *Config) Valid() error {
c.Security.SpilledFileEncryptionMethod, SpilledFileEncryptionMethodPlaintext, SpilledFileEncryptionMethodAES128CTR)
}

// check stats load config
if c.Performance.StatsLoadConcurrency < DefStatsLoadConcurrencyLimit || c.Performance.StatsLoadConcurrency > DefMaxOfStatsLoadConcurrencyLimit {
return fmt.Errorf("stats-load-concurrency should be [%d, %d]", DefStatsLoadConcurrencyLimit, DefMaxOfStatsLoadConcurrencyLimit)
}
if c.Performance.StatsLoadQueueSize < DefStatsLoadQueueSizeLimit || c.Performance.StatsLoadQueueSize > DefMaxOfStatsLoadQueueSizeLimit {
return fmt.Errorf("stats-load-queue-size should be [%d, %d]", DefStatsLoadQueueSizeLimit, DefMaxOfStatsLoadQueueSizeLimit)
}

// test log level
l := zap.NewAtomicLevel()
return l.UnmarshalText([]byte(c.Log.Level))
Expand Down
21 changes: 21 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,24 @@ func TestConfigExample(t *testing.T) {
}
}
}

func TestStatsLoadLimit(t *testing.T) {
conf := NewConfig()
checkConcurrencyValid := func(concurrency int, shouldBeValid bool) {
conf.Performance.StatsLoadConcurrency = uint(concurrency)
require.Equal(t, shouldBeValid, conf.Valid() == nil)
}
checkConcurrencyValid(DefStatsLoadConcurrencyLimit, true)
checkConcurrencyValid(DefStatsLoadConcurrencyLimit-1, false)
checkConcurrencyValid(DefMaxOfStatsLoadConcurrencyLimit, true)
checkConcurrencyValid(DefMaxOfStatsLoadConcurrencyLimit+1, false)
conf = NewConfig()
checkQueueSizeValid := func(queueSize int, shouldBeValid bool) {
conf.Performance.StatsLoadQueueSize = uint(queueSize)
require.Equal(t, shouldBeValid, conf.Valid() == nil)
}
checkQueueSizeValid(DefStatsLoadQueueSizeLimit, true)
checkQueueSizeValid(DefStatsLoadQueueSizeLimit-1, false)
checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit, true)
checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit+1, false)
}
10 changes: 10 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,16 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
return nil
}

// StartLoadStatsSubWorkers starts sub workers with new sessions to load stats concurrently
func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
statsHandle := do.StatsHandle()
for i, ctx := range ctxList {
statsHandle.StatsLoad.SubCtxs[i] = ctx
do.wg.Add(1)
go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg)
}
}

func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
id := do.ddl.OwnerManager().ID()
var statsOwner owner.Manager
Expand Down
9 changes: 9 additions & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
variable.PersistAnalyzeOptions.Store(variable.TiDBOptOn(sVal))
case variable.TiDBEnableColumnTracking:
variable.EnableColumnTracking.Store(variable.TiDBOptOn(sVal))
case variable.TiDBStatsLoadSyncWait:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.StatsLoadSyncWait.Store(val)
case variable.TiDBStatsLoadPseudoTimeout:
variable.StatsLoadPseudoTimeout.Store(variable.TiDBOptOn(sVal))
}
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", name), zap.Error(err))
Expand Down
4 changes: 4 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func RegisterMetrics() {
prometheus.MustRegister(HandleJobHistogram)
prometheus.MustRegister(SignificantFeedbackCounter)
prometheus.MustRegister(FastAnalyzeHistogram)
prometheus.MustRegister(SyncLoadCounter)
prometheus.MustRegister(SyncLoadTimeoutCounter)
prometheus.MustRegister(SyncLoadHistogram)
prometheus.MustRegister(ReadStatsHistogram)
prometheus.MustRegister(JobsGauge)
prometheus.MustRegister(KeepAliveCounter)
prometheus.MustRegister(LoadPrivilegeCounter)
Expand Down
34 changes: 34 additions & 0 deletions metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,38 @@ var (
Help: "Bucketed histogram of some stats in fast analyze.",
Buckets: prometheus.ExponentialBuckets(1, 2, 16),
}, []string{LblSQLType, LblType})

SyncLoadCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "sync_load_total",
Help: "Counter of sync load.",
})

SyncLoadTimeoutCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "sync_load_timeout_total",
Help: "Counter of sync load timeout.",
})

SyncLoadHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "sync_load_latency_millis",
Help: "Bucketed histogram of latency time (ms) of sync load.",
Buckets: prometheus.ExponentialBuckets(1, 2, 22), // 1ms ~ 1h
})

ReadStatsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "read_stats_latency_millis",
Help: "Bucketed histogram of latency time (ms) of stats read during sync-load.",
Buckets: prometheus.ExponentialBuckets(1, 2, 22), // 1ms ~ 1h
})
)
11 changes: 6 additions & 5 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ const (
flagPredicatePushDown
flagEliminateOuterJoin
flagPartitionProcessor
flagCollectPredicateColumnsPoint
flagPushDownAgg
flagPushDownTopN
flagSyncWaitStatsLoadPoint
flagJoinReOrder
flagPrunColumnsAgain
)
Expand All @@ -80,8 +82,10 @@ var optRuleList = []logicalOptRule{
&ppdSolver{},
&outerJoinEliminator{},
&partitionProcessor{},
&collectPredicateColumnsPoint{},
&aggregationPushDownSolver{},
&pushDownTopNOptimizer{},
&syncWaitStatsLoadPoint{},
&joinReOrderSolver{},
&columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
}
Expand Down Expand Up @@ -257,18 +261,15 @@ func checkStableResultMode(sctx sessionctx.Context) bool {

// DoOptimize optimizes a logical plan to a physical plan.
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) {
// TODO: move it to the logic of sync load hist-needed columns.
if variable.EnableColumnTracking.Load() {
predicateColumns, _ := CollectColumnStatsUsage(logic, true, false)
sctx.UpdateColStatsUsage(predicateColumns)
}
// if there is something after flagPrunColumns, do flagPrunColumnsAgain
if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns {
flag |= flagPrunColumnsAgain
}
if checkStableResultMode(sctx) {
flag |= flagStabilizeResults
}
flag |= flagCollectPredicateColumnsPoint
flag |= flagSyncWaitStatsLoadPoint
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, 0, err
Expand Down
112 changes: 112 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
)

type collectPredicateColumnsPoint struct{}

func (c collectPredicateColumnsPoint) optimize(ctx context.Context, plan LogicalPlan, op *logicalOptimizeOp) (LogicalPlan, error) {
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, nil
}
predicateNeeded := variable.EnableColumnTracking.Load()
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait * time.Millisecond.Nanoseconds()
histNeeded := syncWait > 0
predicateColumns, histNeededColumns := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded)
if len(predicateColumns) > 0 {
plan.SCtx().UpdateColStatsUsage(predicateColumns)
}
if histNeeded && len(histNeededColumns) > 0 {
err := RequestLoadColumnStats(plan.SCtx(), histNeededColumns, syncWait)
return plan, err
}
return plan, nil
}

func (c collectPredicateColumnsPoint) name() string {
return "collect_predicate_columns_point"
}

type syncWaitStatsLoadPoint struct{}

func (s syncWaitStatsLoadPoint) optimize(ctx context.Context, plan LogicalPlan, op *logicalOptimizeOp) (LogicalPlan, error) {
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, nil
}
_, err := SyncWaitStatsLoad(plan)
return plan, err
}

func (s syncWaitStatsLoadPoint) name() string {
return "sync_wait_stats_load_point"
}

const maxDuration = 1<<63 - 1

// RequestLoadColumnStats send requests to stats handle
func RequestLoadColumnStats(ctx sessionctx.Context, neededColumns []model.TableColumnID, syncWait int64) error {
stmtCtx := ctx.GetSessionVars().StmtCtx
hintMaxExecutionTime := int64(stmtCtx.MaxExecutionTime)
if hintMaxExecutionTime <= 0 {
hintMaxExecutionTime = maxDuration
}
sessMaxExecutionTime := int64(ctx.GetSessionVars().MaxExecutionTime)
if sessMaxExecutionTime <= 0 {
sessMaxExecutionTime = maxDuration
}
waitTime := mathutil.MinInt64(syncWait, mathutil.MinInt64(hintMaxExecutionTime, sessMaxExecutionTime))
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededColumns, timeout)
if err != nil {
return handleTimeout(stmtCtx)
}
return nil
}

// SyncWaitStatsLoad sync-wait for stats load until timeout
func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
stmtCtx := plan.SCtx().GetSessionVars().StmtCtx
if stmtCtx.StatsLoad.Fallback {
return false, nil
}
success := domain.GetDomain(plan.SCtx()).StatsHandle().SyncWaitStatsLoad(stmtCtx)
if success {
return true, nil
}
err := handleTimeout(stmtCtx)
return false, err
}

func handleTimeout(stmtCtx *stmtctx.StatementContext) error {
err := errors.New("Timeout when sync-load full stats for needed columns")
if variable.StatsLoadPseudoTimeout.Load() {
stmtCtx.AppendWarning(err)
stmtCtx.StatsLoad.Fallback = true
return nil
}
return err
}
Loading

0 comments on commit 48fce5e

Please sign in to comment.