Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add warn log for sync stats #36956

Merged
merged 16 commits into from
Aug 22, 2022
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
ExecRetryCount: a.retryCount,
IsExplicitTxn: sessVars.TxnCtx.IsExplicit,
IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0,
IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed,
}
if a.retryCount > 0 {
slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.EnableOptimizeTrace = false
sc.OptimizeTracer = nil
sc.OptimizerCETrace = nil
sc.IsSyncStatsFailed = false

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

Expand Down
9 changes: 9 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
)

type collectPredicateColumnsPoint struct{}
Expand All @@ -41,6 +43,9 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl
if len(predicateColumns) > 0 {
plan.SCtx().UpdateColStatsUsage(predicateColumns)
}
if !histNeeded {
return plan, nil
}
histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns)
histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices)
if histNeeded && len(histNeededItems) > 0 {
Expand Down Expand Up @@ -85,6 +90,8 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout)
if err != nil {
logutil.BgLogger().Warn("SendLoadRequests failed", zap.Error(err))
stmtCtx.IsSyncStatsFailed = true
return handleTimeout(stmtCtx)
}
return nil
Expand All @@ -100,6 +107,8 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
if success {
return true, nil
}
logutil.BgLogger().Warn("SyncWaitStatsLoad failed")
stmtCtx.IsSyncStatsFailed = true
err := handleTimeout(stmtCtx)
return false, err
}
Expand Down
11 changes: 9 additions & 2 deletions planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -259,9 +261,14 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
require.NoError(t, err)
tableInfo := tbl.Meta()
neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
resultCh := make(chan model.TableItemID, 1)
resultCh := make(chan stmtctx.StatsLoadResult, 1)
timeout := time.Duration(1<<63 - 1)
dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full
task := &handle.NeededItemTask{
TableItemID: neededColumn,
ResultCh: resultCh,
ToTimeout: time.Now().Local().Add(timeout),
}
dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full
stmt, err := p.ParseOneStmt("select * from t where c>1", "", "")
require.NoError(t, err)
tk.MustExec("set global tidb_stats_load_pseudo_timeout=false")
Expand Down
35 changes: 34 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtctx

import (
"bytes"
"encoding/json"
"math"
"strconv"
Expand Down Expand Up @@ -293,7 +294,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan model.TableItemID
ResultCh chan StatsLoadResult
// Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple.
Fallback bool
// LoadStartTime is to record the load start time to calculate latency
Expand All @@ -310,6 +311,9 @@ type StatementContext struct {
IsSQLRegistered atomic2.Bool
// IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL.
IsSQLAndPlanRegistered atomic2.Bool

// IsSyncStatsFailed indicates whether any failure happened during sync stats
IsSyncStatsFailed bool
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -1014,3 +1018,32 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) {
fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress))
return fields
}

// StatsLoadResult indicates result for StatsLoad
type StatsLoadResult struct {
Item model.TableItemID
Error error
}

// HasError returns whether result has error
func (r StatsLoadResult) HasError() bool {
return r.Error != nil
}

// ErrorMsg returns StatsLoadResult err msg
func (r StatsLoadResult) ErrorMsg() string {
if r.Error == nil {
return ""
}
b := bytes.NewBufferString("tableID:")
b.WriteString(strconv.FormatInt(r.Item.TableID, 10))
b.WriteString(", id:")
b.WriteString(strconv.FormatInt(r.Item.ID, 10))
b.WriteString(", isIndex:")
b.WriteString(strconv.FormatBool(r.Item.IsIndex))
if r.Error != nil {
b.WriteString(", err:")
b.WriteString(r.Error.Error())
}
return b.String()
}
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,8 @@ const (
SlowLogIsExplicitTxn = "IsExplicitTxn"
// SlowLogIsWriteCacheTable is used to indicate whether writing to the cache table need to wait for the read lock to expire.
SlowLogIsWriteCacheTable = "IsWriteCacheTable"
// SlowLogIsSyncStatsFailed is used to indicate whether any failure happen during sync stats
SlowLogIsSyncStatsFailed = "IsSyncStatsFailed"
)

// GenerateBinaryPlan decides whether we should record binary plan in slow log and stmt summary.
Expand Down Expand Up @@ -2568,6 +2570,7 @@ type SlowQueryLogItems struct {
ResultRows int64
IsExplicitTxn bool
IsWriteCacheTable bool
IsSyncStatsFailed bool
}

// SlowLogFormat uses for formatting slow log.
Expand Down Expand Up @@ -2732,6 +2735,7 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
writeSlowLogItem(&buf, SlowLogResultRows, strconv.FormatInt(logItems.ResultRows, 10))
writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ))
writeSlowLogItem(&buf, SlowLogIsExplicitTxn, strconv.FormatBool(logItems.IsExplicitTxn))
writeSlowLogItem(&buf, SlowLogIsSyncStatsFailed, strconv.FormatBool(logItems.IsSyncStatsFailed))
if s.StmtCtx.WaitLockLeaseTime > 0 {
writeSlowLogItem(&buf, SlowLogIsWriteCacheTable, strconv.FormatBool(logItems.IsWriteCacheTable))
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func TestSlowLogFormat(t *testing.T) {
# Result_rows: 12345
# Succ: true
# IsExplicitTxn: true
# IsSyncStatsFailed: false
# IsWriteCacheTable: true`
sql := "select * from t;"
_, digest := parser.NormalizeDigest(sql)
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ const (
DefTiDBPersistAnalyzeOptions = true
DefTiDBEnableColumnTracking = false
DefTiDBStatsLoadSyncWait = 0
DefTiDBStatsLoadPseudoTimeout = false
DefTiDBStatsLoadPseudoTimeout = true
DefSysdateIsNow = false
DefTiDBEnableMutationChecker = false
DefTiDBTxnAssertionLevel = AssertionOffStr
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{}
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand Down
62 changes: 38 additions & 24 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ type StatsLoad struct {
SubCtxs []sessionctx.Context
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
WorkingColMap map[model.TableItemID][]chan model.TableItemID
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
}

// NeededItemTask represents one needed column/indices with expire time.
type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan model.TableItemID
ResultCh chan stmtctx.StatsLoadResult
}

// SendLoadRequests send neededColumns requests
Expand All @@ -64,9 +64,14 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
}
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems))
for _, col := range remainedItems {
err := h.AppendNeededItem(col, sc.StatsLoad.ResultCh, timeout)
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
for _, item := range remainedItems {
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
}
err := h.AppendNeededItem(task, timeout)
if err != nil {
return err
}
Expand All @@ -80,7 +85,12 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
if len(sc.StatsLoad.NeededItems) <= 0 {
return true
}
var errorMsgs []string
defer func() {
if len(errorMsgs) > 0 {
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
zap.Strings("errors", errorMsgs))
}
sc.StatsLoad.NeededItems = nil
}()
resultCheckMap := map[model.TableItemID]struct{}{}
Expand All @@ -94,7 +104,10 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
select {
case result, ok := <-sc.StatsLoad.ResultCh:
if ok {
delete(resultCheckMap, result)
if result.HasError() {
errorMsgs = append(errorMsgs, result.ErrorMsg())
}
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return true
Expand All @@ -104,6 +117,7 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
logutil.BgLogger().Warn("SyncWaitStatsLoad timeout")
return false
}
}
Expand Down Expand Up @@ -134,9 +148,7 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode
}

// AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one.
func (h *Handle) AppendNeededItem(item model.TableItemID, resultCh chan model.TableItemID, timeout time.Duration) error {
toTimout := time.Now().Local().Add(timeout)
task := &NeededItemTask{TableItemID: item, ToTimeout: toTimout, ResultCh: resultCh}
func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error {
return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout)
}

Expand Down Expand Up @@ -202,34 +214,35 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) {
item := task.TableItemID
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
wrapper.col = col
}
// to avoid duplicated handling in concurrent scenario
working := h.setWorking(task.TableItemID, task.ResultCh)
working := h.setWorking(result.Item, task.ResultCh)
if !working {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
// refresh statsReader to get latest stats
Expand All @@ -238,6 +251,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
result.Error = err
return task, err
}
if item.IsIndex {
Expand All @@ -251,9 +265,9 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
}
h.finishWorking(item)
h.finishWorking(result)
return nil, nil
}

Expand Down Expand Up @@ -425,7 +439,7 @@ func (h *Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *Neede
}

// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact.
func (h *Handle) writeToResultChan(resultCh chan model.TableItemID, rs model.TableItemID) {
func (h *Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack"))
Expand Down Expand Up @@ -466,7 +480,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery()))
}

func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableItemID) bool {
func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
chList, ok := h.StatsLoad.WorkingColMap[item]
Expand All @@ -477,20 +491,20 @@ func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableIte
h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
return false
}
chList = []chan model.TableItemID{}
chList = []chan stmtctx.StatsLoadResult{}
chList = append(chList, resultCh)
h.StatsLoad.WorkingColMap[item] = chList
return true
}

func (h *Handle) finishWorking(item model.TableItemID) {
func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
if chList, ok := h.StatsLoad.WorkingColMap[item]; ok {
if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
list := chList[1:]
for _, ch := range list {
h.writeToResultChan(ch, item)
h.writeToResultChan(ch, result)
}
}
delete(h.StatsLoad.WorkingColMap, item)
delete(h.StatsLoad.WorkingColMap, result.Item)
}
4 changes: 2 additions & 2 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1)
require.Equal(t, neededColumns[0], rs1.Item)
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
require.True(t, ok2)
require.Equal(t, neededColumns[0], rs2)
require.Equal(t, neededColumns[0], rs2.Item)

stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
Expand Down