Skip to content

Commit

Permalink
planner: add warn log for sync stats (#36956)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Aug 22, 2022
1 parent aaf0613 commit 4cf7eee
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 32 deletions.
8 changes: 8 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
ExecRetryCount: a.retryCount,
IsExplicitTxn: sessVars.TxnCtx.IsExplicit,
IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0,
IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed,
}
failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) {
if val.(bool) {
if !slowItems.IsSyncStatsFailed {
panic("isSyncStatsFailed should be true")
}
}
})
if a.retryCount > 0 {
slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
}
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.EnableOptimizeTrace = false
sc.OptimizeTracer = nil
sc.OptimizerCETrace = nil
sc.IsSyncStatsFailed = false

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

Expand Down
9 changes: 9 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
)

type collectPredicateColumnsPoint struct{}
Expand All @@ -41,6 +43,9 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl
if len(predicateColumns) > 0 {
plan.SCtx().UpdateColStatsUsage(predicateColumns)
}
if !histNeeded {
return plan, nil
}
histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns)
histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices)
if histNeeded && len(histNeededItems) > 0 {
Expand Down Expand Up @@ -85,6 +90,8 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout)
if err != nil {
logutil.BgLogger().Warn("SendLoadRequests failed", zap.Error(err))
stmtCtx.IsSyncStatsFailed = true
return handleTimeout(stmtCtx)
}
return nil
Expand All @@ -100,6 +107,8 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
if success {
return true, nil
}
logutil.BgLogger().Warn("SyncWaitStatsLoad failed")
stmtCtx.IsSyncStatsFailed = true
err := handleTimeout(stmtCtx)
return false, err
}
Expand Down
20 changes: 17 additions & 3 deletions planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -259,15 +262,26 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
require.NoError(t, err)
tableInfo := tbl.Meta()
neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
resultCh := make(chan model.TableItemID, 1)
resultCh := make(chan stmtctx.StatsLoadResult, 1)
timeout := time.Duration(1<<63 - 1)
dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full
stmt, err := p.ParseOneStmt("select * from t where c>1", "", "")
task := &handle.NeededItemTask{
TableItemID: neededColumn,
ResultCh: resultCh,
ToTimeout: time.Now().Local().Add(timeout),
}
dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full
sql := "select * from t where c>1"
stmt, err := p.ParseOneStmt(sql, "", "")
require.NoError(t, err)
tk.MustExec("set global tidb_stats_load_pseudo_timeout=false")
_, _, err = planner.Optimize(context.TODO(), ctx, stmt, is)
require.Error(t, err) // fail sql for timeout when pseudo=false

tk.MustExec("set global tidb_stats_load_pseudo_timeout=true")
require.NoError(t, failpoint.Enable("github.com/pingcap/executor/assertSyncStatsFailed", `return(true)`))
tk.MustExec(sql) // not fail sql for timeout when pseudo=true
failpoint.Disable("github.com/pingcap/executor/assertSyncStatsFailed")

plan, _, err := planner.Optimize(context.TODO(), ctx, stmt, is)
require.NoError(t, err) // not fail sql for timeout when pseudo=true
switch pp := plan.(type) {
Expand Down
33 changes: 32 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtctx

import (
"bytes"
"encoding/json"
"math"
"strconv"
Expand Down Expand Up @@ -293,7 +294,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan model.TableItemID
ResultCh chan StatsLoadResult
// Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple.
Fallback bool
// LoadStartTime is to record the load start time to calculate latency
Expand All @@ -310,6 +311,9 @@ type StatementContext struct {
IsSQLRegistered atomic2.Bool
// IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL.
IsSQLAndPlanRegistered atomic2.Bool

// IsSyncStatsFailed indicates whether any failure happened during sync stats
IsSyncStatsFailed bool
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -1014,3 +1018,30 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) {
fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress))
return fields
}

// StatsLoadResult indicates result for StatsLoad
type StatsLoadResult struct {
Item model.TableItemID
Error error
}

// HasError returns whether result has error
func (r StatsLoadResult) HasError() bool {
return r.Error != nil
}

// ErrorMsg returns StatsLoadResult err msg
func (r StatsLoadResult) ErrorMsg() string {
if r.Error == nil {
return ""
}
b := bytes.NewBufferString("tableID:")
b.WriteString(strconv.FormatInt(r.Item.TableID, 10))
b.WriteString(", id:")
b.WriteString(strconv.FormatInt(r.Item.ID, 10))
b.WriteString(", isIndex:")
b.WriteString(strconv.FormatBool(r.Item.IsIndex))
b.WriteString(", err:")
b.WriteString(r.Error.Error())
return b.String()
}
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,8 @@ const (
SlowLogIsExplicitTxn = "IsExplicitTxn"
// SlowLogIsWriteCacheTable is used to indicate whether writing to the cache table need to wait for the read lock to expire.
SlowLogIsWriteCacheTable = "IsWriteCacheTable"
// SlowLogIsSyncStatsFailed is used to indicate whether any failure happen during sync stats
SlowLogIsSyncStatsFailed = "IsSyncStatsFailed"
)

// GenerateBinaryPlan decides whether we should record binary plan in slow log and stmt summary.
Expand Down Expand Up @@ -2568,6 +2570,7 @@ type SlowQueryLogItems struct {
ResultRows int64
IsExplicitTxn bool
IsWriteCacheTable bool
IsSyncStatsFailed bool
}

// SlowLogFormat uses for formatting slow log.
Expand Down Expand Up @@ -2732,6 +2735,7 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
writeSlowLogItem(&buf, SlowLogResultRows, strconv.FormatInt(logItems.ResultRows, 10))
writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ))
writeSlowLogItem(&buf, SlowLogIsExplicitTxn, strconv.FormatBool(logItems.IsExplicitTxn))
writeSlowLogItem(&buf, SlowLogIsSyncStatsFailed, strconv.FormatBool(logItems.IsSyncStatsFailed))
if s.StmtCtx.WaitLockLeaseTime > 0 {
writeSlowLogItem(&buf, SlowLogIsWriteCacheTable, strconv.FormatBool(logItems.IsWriteCacheTable))
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func TestSlowLogFormat(t *testing.T) {
# Result_rows: 12345
# Succ: true
# IsExplicitTxn: true
# IsSyncStatsFailed: false
# IsWriteCacheTable: true`
sql := "select * from t;"
_, digest := parser.NormalizeDigest(sql)
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ const (
DefTiDBPersistAnalyzeOptions = true
DefTiDBEnableColumnTracking = false
DefTiDBStatsLoadSyncWait = 0
DefTiDBStatsLoadPseudoTimeout = false
DefTiDBStatsLoadPseudoTimeout = true
DefSysdateIsNow = false
DefTiDBEnableMutationChecker = false
DefTiDBTxnAssertionLevel = AssertionOffStr
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{}
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 4cf7eee

Please sign in to comment.