Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Aug 12, 2022
1 parent 305268d commit ec9517b
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 51 deletions.
8 changes: 8 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
)

type collectPredicateColumnsPoint struct{}
Expand Down Expand Up @@ -90,6 +92,9 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout)
if err != nil {
_, digest := stmtCtx.SQLDigest()
logutil.BgLogger().Warn("SendLoadRequests failed",
zap.String("digest", digest.String()), zap.Error(err))
return handleTimeout(stmtCtx)
}
return nil
Expand All @@ -105,6 +110,9 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
if success {
return true, nil
}
_, digest := stmtCtx.SQLDigest()
logutil.BgLogger().Warn("SyncWaitStatsLoad failed",
zap.String("digest", digest.String()))
err := handleTimeout(stmtCtx)
return false, err
}
Expand Down
13 changes: 8 additions & 5 deletions planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -260,13 +261,15 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
require.NoError(t, err)
tableInfo := tbl.Meta()
neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
resultCh := make(chan model.TableItemID, 1)
resultCh := make(chan stmtctx.StatsLoadResult, 1)
timeout := time.Duration(1<<63 - 1)
task := &handle.NeededItemTask{
TableItemID: neededColumn,
ResultCh: resultCh,
ToTimeout: time.Now().Local().Add(timeout),
Digest: &parser.Digest{},
TableItemResult: stmtctx.StatsLoadResult{
Item: neededColumn,
},
ResultCh: resultCh,
ToTimeout: time.Now().Local().Add(timeout),
Digest: &parser.Digest{},
}
dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full
stmt, err := p.ParseOneStmt("select * from t where c>1", "", "")
Expand Down
37 changes: 36 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtctx

import (
"bytes"
"encoding/json"
"math"
"strconv"
Expand Down Expand Up @@ -293,7 +294,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan model.TableItemID
ResultCh chan StatsLoadResult
// Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple.
Fallback bool
// LoadStartTime is to record the load start time to calculate latency
Expand Down Expand Up @@ -1014,3 +1015,37 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) {
fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress))
return fields
}

// StatsLoadResult indicates result for StatsLoad
type StatsLoadResult struct {
Item model.TableItemID
Error error
Warn string
}

// HasErrorOrWarn returns whether result has error or warn
func (r StatsLoadResult) HasErrorOrWarn() bool {
return r.Error != nil || len(r.Warn) > 0
}

// ErrorAndWarn returns StatsLoadResult err msg
func (r StatsLoadResult) ErrorAndWarn() string {
if r.Error == nil && len(r.Warn) < 1 {
return ""
}
b := bytes.NewBufferString("tableID:")
b.WriteString(strconv.FormatInt(r.Item.TableID, 10))
b.WriteString(", id:")
b.WriteString(strconv.FormatInt(r.Item.ID, 10))
b.WriteString(", isIndex:")
b.WriteString(strconv.FormatBool(r.Item.IsIndex))
if r.Error != nil {
b.WriteString(", err:")
b.WriteString(r.Error.Error())
}
if len(r.Warn) > 0 {
b.WriteString(", warn:")
b.WriteString(r.Warn)
}
return b.String()
}
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ const (
DefTiDBPersistAnalyzeOptions = true
DefTiDBEnableColumnTracking = false
DefTiDBStatsLoadSyncWait = 0
DefTiDBStatsLoadPseudoTimeout = false
DefTiDBStatsLoadPseudoTimeout = true
DefSysdateIsNow = false
DefTiDBEnableMutationChecker = false
DefTiDBTxnAssertionLevel = AssertionOffStr
Expand Down
3 changes: 3 additions & 0 deletions statistics/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (c *Column) IsInvalid(sctx sessionctx.Context, collPseudo bool) bool {
}
if sctx != nil {
stmtctx := sctx.GetSessionVars().StmtCtx
if stmtctx != nil && stmtctx.StatsLoad.Fallback {
return true
}
if c.IsLoadNeeded() && stmtctx != nil {
if stmtctx.StatsLoad.Timeout > 0 {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{}
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand Down
84 changes: 41 additions & 43 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ type StatsLoad struct {
SubCtxs []sessionctx.Context
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
WorkingColMap map[model.TableItemID][]chan model.TableItemID
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
}

// NeededItemTask represents one needed column/indices with expire time.
type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan model.TableItemID
Digest *parser.Digest
TableItemResult stmtctx.StatsLoadResult
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Digest *parser.Digest
}

// SendLoadRequests send neededColumns requests
Expand All @@ -66,23 +66,19 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
}
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems))
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
for _, item := range remainedItems {
_, digest := sc.SQLDigest()
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
Digest: digest,
TableItemResult: stmtctx.StatsLoadResult{
Item: item,
},
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
Digest: digest,
}
err := h.AppendNeededItem(task, timeout)
if err != nil {
logutil.BgLogger().Warn("SendLoadRequests failed",
zap.Int64("tableID", item.TableID),
zap.Int64("id", item.ID),
zap.Bool("isIndex", item.IsIndex),
zap.String("digest", digest.String()),
zap.Error(err))
return err
}
}
Expand All @@ -95,7 +91,14 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
if len(sc.StatsLoad.NeededItems) <= 0 {
return true
}
_, digest := sc.SQLDigest()
var errorMsgs []string
defer func() {
if len(errorMsgs) > 0 {
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
zap.Strings("errors", errorMsgs),
zap.String("digest", digest.String()))
}
sc.StatsLoad.NeededItems = nil
}()
resultCheckMap := map[model.TableItemID]struct{}{}
Expand All @@ -109,7 +112,10 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
select {
case result, ok := <-sc.StatsLoad.ResultCh:
if ok {
delete(resultCheckMap, result)
if result.HasErrorOrWarn() {
errorMsgs = append(errorMsgs, result.ErrorAndWarn())
}
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return true
Expand All @@ -119,7 +125,6 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
_, digest := sc.SQLDigest()
logutil.BgLogger().Warn("SyncWaitStatsLoad timeout", zap.String("digest", digest.String()))
return false
}
Expand Down Expand Up @@ -217,34 +222,35 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) {
item := task.TableItemID
result := task.TableItemResult
item := result.Item
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
wrapper.col = col
}
// to avoid duplicated handling in concurrent scenario
working := h.setWorking(task.TableItemID, task.ResultCh)
working := h.setWorking(result.Item, task.ResultCh)
if !working {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
// refresh statsReader to get latest stats
Expand All @@ -253,11 +259,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
logutil.BgLogger().Warn("read stats error",
zap.Int64("tableID", task.TableItemID.TableID),
zap.Int64("id", task.TableItemID.ID),
zap.Bool("isIndex", task.TableItemID.IsIndex),
zap.String("digest", task.Digest.String()))
task.TableItemResult.Error = err
return task, err
}
if item.IsIndex {
Expand All @@ -271,9 +273,9 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
h.writeToResultChan(task.ResultCh, item)
h.writeToResultChan(task.ResultCh, result)
}
h.finishWorking(item)
h.finishWorking(result)
return nil, nil
}

Expand Down Expand Up @@ -398,11 +400,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
logutil.BgLogger().Warn("task timeout",
zap.Int64("tableID", task.TableItemID.TableID),
zap.Int64("id", task.TableItemID.ID),
zap.Bool("isIndex", task.TableItemID.IsIndex),
zap.String("digest", task.Digest.String()))
task.TableItemResult.Warn = "drainColTask timeout"
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down Expand Up @@ -450,7 +448,7 @@ func (h *Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *Neede
}

// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact.
func (h *Handle) writeToResultChan(resultCh chan model.TableItemID, rs model.TableItemID) {
func (h *Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack"))
Expand Down Expand Up @@ -491,7 +489,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery()))
}

func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableItemID) bool {
func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
chList, ok := h.StatsLoad.WorkingColMap[item]
Expand All @@ -502,20 +500,20 @@ func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableIte
h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
return false
}
chList = []chan model.TableItemID{}
chList = []chan stmtctx.StatsLoadResult{}
chList = append(chList, resultCh)
h.StatsLoad.WorkingColMap[item] = chList
return true
}

func (h *Handle) finishWorking(item model.TableItemID) {
func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
if chList, ok := h.StatsLoad.WorkingColMap[item]; ok {
if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
list := chList[1:]
for _, ch := range list {
h.writeToResultChan(ch, item)
h.writeToResultChan(ch, result)
}
}
delete(h.StatsLoad.WorkingColMap, item)
delete(h.StatsLoad.WorkingColMap, result.Item)
}

0 comments on commit ec9517b

Please sign in to comment.