Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add warn log for sync stats #36956

Merged
merged 16 commits into from
Aug 22, 2022
11 changes: 11 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
)

type collectPredicateColumnsPoint struct{}
Expand All @@ -41,6 +43,9 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl
if len(predicateColumns) > 0 {
plan.SCtx().UpdateColStatsUsage(predicateColumns)
}
if !histNeeded {
return plan, nil
}
histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns)
histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices)
if histNeeded && len(histNeededItems) > 0 {
Expand Down Expand Up @@ -85,6 +90,9 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout)
if err != nil {
_, digest := stmtCtx.SQLDigest()
logutil.BgLogger().Warn("SendLoadRequests failed",
zap.String("digest", digest.String()), zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about write this warn message to slow logs or statement summary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think record error detail in slowlog is a good choice, but I think it's proper to record a flag if sync stats failed in slow log.

return handleTimeout(stmtCtx)
}
return nil
Expand All @@ -100,6 +108,9 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
if success {
return true, nil
}
_, digest := stmtCtx.SQLDigest()
logutil.BgLogger().Warn("SyncWaitStatsLoad failed",
zap.String("digest", digest.String()))
err := handleTimeout(stmtCtx)
return false, err
}
Expand Down
13 changes: 11 additions & 2 deletions planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -259,9 +261,16 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
require.NoError(t, err)
tableInfo := tbl.Meta()
neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
resultCh := make(chan model.TableItemID, 1)
resultCh := make(chan stmtctx.StatsLoadResult, 1)
timeout := time.Duration(1<<63 - 1)
dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full
task := &handle.NeededItemTask{
TableItemResult: stmtctx.StatsLoadResult{
Item: neededColumn,
},
ResultCh: resultCh,
ToTimeout: time.Now().Local().Add(timeout),
}
dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full
stmt, err := p.ParseOneStmt("select * from t where c>1", "", "")
require.NoError(t, err)
tk.MustExec("set global tidb_stats_load_pseudo_timeout=false")
Expand Down
37 changes: 36 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtctx

import (
"bytes"
"encoding/json"
"math"
"strconv"
Expand Down Expand Up @@ -293,7 +294,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan model.TableItemID
ResultCh chan StatsLoadResult
// Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple.
Fallback bool
// LoadStartTime is to record the load start time to calculate latency
Expand Down Expand Up @@ -1014,3 +1015,37 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) {
fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress))
return fields
}

// StatsLoadResult indicates result for StatsLoad
type StatsLoadResult struct {
Item model.TableItemID
Error error
Warn string
}

// HasErrorOrWarn returns whether result has error or warn
func (r StatsLoadResult) HasErrorOrWarn() bool {
return r.Error != nil || len(r.Warn) > 0
}

// ErrorAndWarn returns StatsLoadResult err msg
func (r StatsLoadResult) ErrorAndWarn() string {
if r.Error == nil && len(r.Warn) < 1 {
return ""
}
b := bytes.NewBufferString("tableID:")
b.WriteString(strconv.FormatInt(r.Item.TableID, 10))
b.WriteString(", id:")
b.WriteString(strconv.FormatInt(r.Item.ID, 10))
b.WriteString(", isIndex:")
b.WriteString(strconv.FormatBool(r.Item.IsIndex))
if r.Error != nil {
b.WriteString(", err:")
b.WriteString(r.Error.Error())
}
if len(r.Warn) > 0 {
b.WriteString(", warn:")
b.WriteString(r.Warn)
}
return b.String()
}
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ const (
DefTiDBPersistAnalyzeOptions = true
DefTiDBEnableColumnTracking = false
DefTiDBStatsLoadSyncWait = 0
DefTiDBStatsLoadPseudoTimeout = false
DefTiDBStatsLoadPseudoTimeout = true
DefSysdateIsNow = false
DefTiDBEnableMutationChecker = false
DefTiDBTxnAssertionLevel = AssertionOffStr
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{}
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand Down
71 changes: 45 additions & 26 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ type StatsLoad struct {
SubCtxs []sessionctx.Context
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
WorkingColMap map[model.TableItemID][]chan model.TableItemID
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
}

// NeededItemTask represents one needed column/indices with expire time.
type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan model.TableItemID
TableItemResult stmtctx.StatsLoadResult
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
}

// SendLoadRequests send neededColumns requests
Expand All @@ -64,9 +64,16 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
}
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems))
for _, col := range remainedItems {
err := h.AppendNeededItem(col, sc.StatsLoad.ResultCh, timeout)
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
for _, item := range remainedItems {
task := &NeededItemTask{
TableItemResult: stmtctx.StatsLoadResult{
Item: item,
},
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
}
err := h.AppendNeededItem(task, timeout)
if err != nil {
return err
}
Expand All @@ -80,7 +87,14 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
if len(sc.StatsLoad.NeededItems) <= 0 {
return true
}
_, digest := sc.SQLDigest()
var errorMsgs []string
defer func() {
if len(errorMsgs) > 0 {
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
zap.Strings("errors", errorMsgs),
zap.String("digest", digest.String()))
}
sc.StatsLoad.NeededItems = nil
}()
resultCheckMap := map[model.TableItemID]struct{}{}
Expand All @@ -94,7 +108,10 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
select {
case result, ok := <-sc.StatsLoad.ResultCh:
if ok {
delete(resultCheckMap, result)
if result.HasErrorOrWarn() {
errorMsgs = append(errorMsgs, result.ErrorAndWarn())
}
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return true
Expand All @@ -104,6 +121,7 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
logutil.BgLogger().Warn("SyncWaitStatsLoad timeout", zap.String("digest", digest.String()))
return false
}
}
Expand Down Expand Up @@ -134,9 +152,7 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode
}

// AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one.
func (h *Handle) AppendNeededItem(item model.TableItemID, resultCh chan model.TableItemID, timeout time.Duration) error {
toTimout := time.Now().Local().Add(timeout)
task := &NeededItemTask{TableItemID: item, ToTimeout: toTimout, ResultCh: resultCh}
func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error {
return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout)
}

Expand Down Expand Up @@ -202,34 +218,35 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) {
item := task.TableItemID
result := task.TableItemResult
item := result.Item
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
wrapper.col = col
}
// to avoid duplicated handling in concurrent scenario
working := h.setWorking(task.TableItemID, task.ResultCh)
working := h.setWorking(result.Item, task.ResultCh)
if !working {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
// refresh statsReader to get latest stats
Expand All @@ -238,6 +255,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
task.TableItemResult.Error = err
return task, err
}
if item.IsIndex {
Expand All @@ -251,9 +269,9 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
}
h.finishWorking(item)
h.finishWorking(result)
return nil, nil
}

Expand Down Expand Up @@ -378,6 +396,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
task.TableItemResult.Warn = "drainColTask timeout"
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down Expand Up @@ -425,7 +444,7 @@ func (h *Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *Neede
}

// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact.
func (h *Handle) writeToResultChan(resultCh chan model.TableItemID, rs model.TableItemID) {
func (h *Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack"))
Expand Down Expand Up @@ -466,7 +485,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery()))
}

func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableItemID) bool {
func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
chList, ok := h.StatsLoad.WorkingColMap[item]
Expand All @@ -477,20 +496,20 @@ func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableIte
h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
return false
}
chList = []chan model.TableItemID{}
chList = []chan stmtctx.StatsLoadResult{}
chList = append(chList, resultCh)
h.StatsLoad.WorkingColMap[item] = chList
return true
}

func (h *Handle) finishWorking(item model.TableItemID) {
func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
if chList, ok := h.StatsLoad.WorkingColMap[item]; ok {
if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
list := chList[1:]
for _, ch := range list {
h.writeToResultChan(ch, item)
h.writeToResultChan(ch, result)
}
}
delete(h.StatsLoad.WorkingColMap, item)
delete(h.StatsLoad.WorkingColMap, result.Item)
}
4 changes: 2 additions & 2 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1)
require.Equal(t, neededColumns[0], rs1.Item)
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
require.True(t, ok2)
require.Equal(t, neededColumns[0], rs2)
require.Equal(t, neededColumns[0], rs2.Item)

stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
Expand Down