Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add warn log for sync stats #36956

Merged
merged 16 commits into from
Aug 22, 2022
5 changes: 5 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl
if len(predicateColumns) > 0 {
plan.SCtx().UpdateColStatsUsage(predicateColumns)
}
if !histNeeded {
return plan, nil
}
histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns)
histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices)
// For now, RequestLoadStats won't return due to handleTimeout will cache timeout error and transfer into warning
if histNeeded && len(histNeededItems) > 0 {
err := RequestLoadStats(plan.SCtx(), histNeededItems, syncWait)
return plan, err
Expand All @@ -60,6 +64,7 @@ func (s syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, nil
}
// For now, SyncWaitStatsLoad won't return error due to handleTimeout will handle the timeout error and transfer into warning
_, err := SyncWaitStatsLoad(plan)
return plan, err
}
Expand Down
9 changes: 8 additions & 1 deletion planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -261,7 +262,13 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
resultCh := make(chan model.TableItemID, 1)
timeout := time.Duration(1<<63 - 1)
dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full
task := &handle.NeededItemTask{
TableItemID: neededColumn,
ResultCh: resultCh,
ToTimeout: time.Now().Local().Add(timeout),
Digest: &parser.Digest{},
}
dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full
stmt, err := p.ParseOneStmt("select * from t where c>1", "", "")
require.NoError(t, err)
tk.MustExec("set global tidb_stats_load_pseudo_timeout=false")
Expand Down
3 changes: 0 additions & 3 deletions statistics/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ func (c *Column) IsInvalid(sctx sessionctx.Context, collPseudo bool) bool {
}
if sctx != nil {
stmtctx := sctx.GetSessionVars().StmtCtx
if stmtctx != nil && stmtctx.StatsLoad.Fallback {
return true
}
if c.IsLoadNeeded() && stmtctx != nil {
if stmtctx.StatsLoad.Timeout > 0 {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down
35 changes: 30 additions & 5 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -54,6 +55,7 @@ type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan model.TableItemID
Digest *parser.Digest
}

// SendLoadRequests send neededColumns requests
Expand All @@ -65,9 +67,22 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems))
for _, col := range remainedItems {
err := h.AppendNeededItem(col, sc.StatsLoad.ResultCh, timeout)
for _, item := range remainedItems {
_, digest := sc.SQLDigest()
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
Digest: digest,
}
err := h.AppendNeededItem(task, timeout)
if err != nil {
logutil.BgLogger().Warn("SendLoadRequests failed",
zap.Int64("tableID", item.TableID),
zap.Int64("id", item.ID),
zap.Bool("isIndex", item.IsIndex),
zap.String("digest", digest.String()),
zap.Error(err))
return err
}
}
Expand Down Expand Up @@ -104,6 +119,8 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
_, digest := sc.SQLDigest()
logutil.BgLogger().Warn("SyncWaitStatsLoad timeout", zap.String("digest", digest.String()))
return false
}
}
Expand Down Expand Up @@ -134,9 +151,7 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode
}

// AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one.
func (h *Handle) AppendNeededItem(item model.TableItemID, resultCh chan model.TableItemID, timeout time.Duration) error {
toTimout := time.Now().Local().Add(timeout)
task := &NeededItemTask{TableItemID: item, ToTimeout: toTimout, ResultCh: resultCh}
func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error {
return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout)
}

Expand Down Expand Up @@ -238,6 +253,11 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
logutil.BgLogger().Warn("read stats error",
zap.Int64("tableID", task.TableItemID.TableID),
zap.Int64("id", task.TableItemID.ID),
zap.Bool("isIndex", task.TableItemID.IsIndex),
zap.String("digest", task.Digest.String()))
return task, err
}
if item.IsIndex {
Expand Down Expand Up @@ -378,6 +398,11 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
logutil.BgLogger().Warn("task timeout",
zap.Int64("tableID", task.TableItemID.TableID),
zap.Int64("id", task.TableItemID.ID),
zap.Bool("isIndex", task.TableItemID.IsIndex),
zap.String("digest", task.Digest.String()))
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down