diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel new file mode 100644 index 0000000000000..ed6e310786a2a --- /dev/null +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -0,0 +1,46 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "syncload", + srcs = ["stats_syncload.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/syncload", + visibility = ["//visibility:public"], + deps = [ + "//pkg/config", + "//pkg/metrics", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics", + "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/types", + "//pkg/types", + "//pkg/util", + "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "syncload_test", + timeout = "short", + srcs = ["stats_syncload_test.go"], + flaky = True, + race = "on", + shard_count = 5, + deps = [ + ":syncload", + "//pkg/config", + "//pkg/parser/model", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/types", + "//pkg/testkit", + "//pkg/util/mathutil", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go new file mode 100644 index 0000000000000..723a2d77b3084 --- /dev/null +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -0,0 +1,548 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncload + +import ( + "fmt" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +// RetryCount is the max retry count for a sync load task. +const RetryCount = 3 + +type statsSyncLoad struct { + statsHandle statstypes.StatsHandle + StatsLoad statstypes.StatsLoad +} + +// NewStatsSyncLoad creates a new StatsSyncLoad. +func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle} + cfg := config.GetGlobalConfig() + s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + return s +} + +type statsWrapper struct { + colInfo *model.ColumnInfo + idxInfo *model.IndexInfo + col *statistics.Column + idx *statistics.Index +} + +// SendLoadRequests send neededColumns requests +func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error { + remainedItems := s.removeHistLoadedColumns(neededHistItems) + + failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { + if sc.OptimizeTracer != nil { + count := val.(int) + if len(remainedItems) != count { + panic("remained items count wrong") + } + } + }) + + if len(remainedItems) <= 0 { + return nil + } + sc.StatsLoad.Timeout = timeout + sc.StatsLoad.NeededItems = remainedItems + sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) + tasks := make([]*statstypes.NeededItemTask, 0) + for _, item := range remainedItems { + task := &statstypes.NeededItemTask{ + Item: item, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: sc.StatsLoad.ResultCh, + } + tasks = append(tasks, task) + } + timer := time.NewTimer(timeout) + defer timer.Stop() + for _, task := range tasks { + select { + case s.StatsLoad.NeededItemsCh <- task: + continue + case <-timer.C: + return errors.New("sync load stats channel is full and timeout sending task to channel") + } + } + sc.StatsLoad.LoadStartTime = time.Now() + return nil +} + +// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout +func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { + if len(sc.StatsLoad.NeededItems) <= 0 { + return nil + } + var errorMsgs []string + defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } + sc.StatsLoad.NeededItems = nil + }() + resultCheckMap := map[model.TableItemID]struct{}{} + for _, col := range sc.StatsLoad.NeededItems { + resultCheckMap[col.TableItemID] = struct{}{} + } + metrics.SyncLoadCounter.Inc() + timer := time.NewTimer(sc.StatsLoad.Timeout) + defer timer.Stop() + for { + select { + case result, ok := <-sc.StatsLoad.ResultCh: + if !ok { + return errors.New("sync load stats channel closed unexpectedly") + } + if result.HasError() { + errorMsgs = append(errorMsgs, result.ErrorMsg()) + } + delete(resultCheckMap, result.Item) + if len(resultCheckMap) == 0 { + metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) + return nil + } + case <-timer.C: + metrics.SyncLoadTimeoutCounter.Inc() + return errors.New("sync load stats timeout") + } + } +} + +// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. +func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.StatsLoadItem) []model.StatsLoadItem { + remainedItems := make([]model.StatsLoadItem, 0, len(neededItems)) + for _, item := range neededItems { + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + continue + } + if item.IsIndex { + _, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + continue + } + _, loadNeeded, _ := tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + } + return remainedItems +} + +// AppendNeededItem appends needed columns/indices to ch, it is only used for test +func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case s.StatsLoad.NeededItemsCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +var errExit = errors.New("Stop loading since domain is closed") + +// SubLoadWorker loads hist data for each column +func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { + defer func() { + exitWg.Done() + logutil.BgLogger().Info("SubLoadWorker exited.") + }() + // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry + var lastTask *statstypes.NeededItemTask + for { + task, err := s.HandleOneTask(sctx, lastTask, exit) + lastTask = task + if err != nil { + switch err { + case errExit: + return + default: + // To avoid the thundering herd effect + // thundering herd effect: Everyone tries to retry a large number of requests simultaneously when a problem occurs. + r := rand.Intn(500) + time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond) + continue + } + } + } +} + +// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + if lastTask == nil { + task, err = s.drainColTask(sctx, exit) + if err != nil { + if err != errExit { + logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) + } + return task, err + } + } else { + task = lastTask + } + result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { + err := s.handleOneItemTask(sctx, task) + return nil, err + }) + timeout := time.Until(task.ToTimeout) + select { + case sr := <-resultChan: + // sr.Val is always nil. + if sr.Err == nil { + task.ResultCh <- result + return nil, nil + } + if !isVaildForRetry(task) { + result.Error = sr.Err + task.ResultCh <- result + return nil, nil + } + return task, sr.Err + case <-time.After(timeout): + if !isVaildForRetry(task) { + result.Error = errors.New("stats loading timeout") + task.ResultCh <- result + return nil, nil + } + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + return task, nil + } +} + +func isVaildForRetry(task *statstypes.NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + item := task.Item.TableItemID + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return nil + } + wrapper := &statsWrapper{} + if item.IsIndex { + index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if !loadNeeded { + return nil + } + if index != nil { + wrapper.idxInfo = index.Info + } else { + wrapper.idxInfo = tbl.ColAndIdxExistenceMap.GetIndex(item.ID) + } + } else { + col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) + if !loadNeeded { + return nil + } + if col != nil { + wrapper.colInfo = col.Info + } else { + wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID) + } + // If this column is not analyzed yet and we don't have it in memory. + // We create a fake one for the pseudo estimation. + if loadNeeded && !analyzed { + wrapper.col = &statistics.Column{ + PhysicalID: item.TableID, + Info: wrapper.colInfo, + Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0), + IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()), + } + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) + return nil + } + } + t := time.Now() + needUpdate := false + wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) + if err != nil { + return err + } + if item.IsIndex { + if wrapper.idxInfo != nil { + needUpdate = true + } + } else { + if wrapper.colInfo != nil { + needUpdate = true + } + } + metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) + if needUpdate { + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) + } + return nil +} + +// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously +func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper, isPkIsHandle bool, fullLoad bool) (*statsWrapper, error) { + failpoint.Inject("mockReadStatsForOnePanic", nil) + failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) + } + }) + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + var hg *statistics.Histogram + var err error + isIndexFlag := int64(0) + hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorage(sctx, &item, w.colInfo) + if err != nil { + return nil, err + } + if hg == nil { + logutil.BgLogger().Error("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID), + zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) + return nil, errors.Trace(fmt.Errorf("fail to get hist meta for this histogram, table_id:%v, hist_id:%v, is_index:%v", item.TableID, item.ID, item.IsIndex)) + } + if item.IsIndex { + isIndexFlag = 1 + } + var cms *statistics.CMSketch + var topN *statistics.TopN + var fms *statistics.FMSketch + if fullLoad { + if item.IsIndex { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } else { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } + cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + if loadFMSketch { + fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + } + } + if item.IsIndex { + idxHist := &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + Info: w.idxInfo, + StatsVer: statsVer, + Flag: flag, + PhysicalID: item.TableID, + } + if statsVer != statistics.Version0 { + if fullLoad { + idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + idxHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + w.idx = idxHist + } else { + colHist := &statistics.Column{ + PhysicalID: item.TableID, + Histogram: *hg, + Info: w.colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: isPkIsHandle && mysql.HasPriKeyFlag(w.colInfo.GetFlag()), + StatsVer: statsVer, + } + if colHist.StatsAvailable() { + if fullLoad { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + w.col = colHist + } + return w, nil +} + +// drainColTask will hang until a column task can return, and either task or error will be returned. +func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { + // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh + for { + select { + case <-exit: + return nil, errExit + case task, ok := <-s.StatsLoad.NeededItemsCh: + if !ok { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // if the task has already timeout, no sql is sync-waiting for it, + // so do not handle it just now, put it to another channel with lower priority + if time.Now().After(task.ToTimeout) { + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + continue + } + return task, nil + case task, ok := <-s.StatsLoad.TimeoutItemsCh: + select { + case <-exit: + return nil, errExit + case task0, ok0 := <-s.StatsLoad.NeededItemsCh: + if !ok0 { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + return task0, nil + default: + if !ok { + return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") + } + // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh + return task, nil + } + } + } +} + +// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. +func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask) { + select { + case taskCh <- task: + default: + } +} + +// writeToChanWithTimeout writes a task to a channel and blocks until timeout. +func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case taskCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. +func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) + } + }() + select { + case resultCh <- rs: + default: + } +} + +// updateCachedItem updates the column/index hist to global statsCache. +func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return false + } + if !item.IsIndex && colHist != nil { + c, ok := tbl.Columns[item.ID] + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if ok && (c.IsFullLoad() || !fullLoaded) { + return false + } + tbl = tbl.Copy() + tbl.Columns[item.ID] = colHist + // If the column is analyzed we refresh the map for the possible change. + if colHist.StatsAvailable() { + tbl.ColAndIdxExistenceMap.InsertCol(item.ID, colHist.Info, true) + } + // All the objects shares the same stats version. Update it here. + if colHist.StatsVer != statistics.Version0 { + tbl.StatsVer = statistics.Version0 + } + } else if item.IsIndex && idxHist != nil { + index, ok := tbl.Indices[item.ID] + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if ok && (index.IsFullLoad() || !fullLoaded) { + return true + } + tbl = tbl.Copy() + tbl.Indices[item.ID] = idxHist + // If the index is analyzed we refresh the map for the possible change. + if idxHist.IsAnalyzed() { + tbl.ColAndIdxExistenceMap.InsertIndex(item.ID, idxHist.Info, true) + // All the objects shares the same stats version. Update it here. + tbl.StatsVer = statistics.Version0 + } + } + s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) + return true +} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go new file mode 100644 index 0000000000000..5c1b41d7fbd65 --- /dev/null +++ b/pkg/statistics/handle/types/interfaces.go @@ -0,0 +1,494 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "golang.org/x/sync/singleflight" +) + +// StatsGC is used to GC unnecessary stats. +type StatsGC interface { + // GCStats will garbage collect the useless stats' info. + // For dropped tables, we will first update their version + // so that other tidb could know that table is deleted. + GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) + + // ClearOutdatedHistoryStats clear outdated historical stats. + // Only for test. + ClearOutdatedHistoryStats() error + + // DeleteTableStatsFromKV deletes table statistics from kv. + // A statsID refers to statistic of a table or a partition. + DeleteTableStatsFromKV(statsIDs []int64) (err error) +} + +// ColStatsTimeInfo records usage information of this column stats. +type ColStatsTimeInfo struct { + LastUsedAt *types.Time // last time the column is used + LastAnalyzedAt *types.Time // last time the column is analyzed +} + +// StatsUsage is used to track the usage of column / index statistics. +type StatsUsage interface { + // Below methods are for predicated columns. + + // LoadColumnStatsUsage returns all columns' usage information. + LoadColumnStatsUsage(loc *time.Location) (map[model.TableItemID]ColStatsTimeInfo, error) + + // GetPredicateColumns returns IDs of predicate columns, which are the columns whose stats are used(needed) when generating query plans. + GetPredicateColumns(tableID int64) ([]int64, error) + + // CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats. + CollectColumnsInExtendedStats(tableID int64) ([]int64, error) + + IndexUsage + + // TODO: extract these function to a new interface only for delta/stats usage, like `IndexUsage`. + // Blow methods are for table delta and stats usage. + + // NewSessionStatsItem allocates a stats collector for a session. + // TODO: use interface{} to avoid cycle import, remove this interface{}. + NewSessionStatsItem() any + + // ResetSessionStatsList resets the sessions stats list. + ResetSessionStatsList() + + // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. + DumpStatsDeltaToKV(dumpAll bool) error + + // DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV. + DumpColStatsUsageToKV() error +} + +// IndexUsage is an interface to define the function of collecting index usage stats. +type IndexUsage interface { + // NewSessionIndexUsageCollector creates a new Collector for a session. + NewSessionIndexUsageCollector() *indexusage.SessionIndexUsageCollector + + // GCIndexUsage removes unnecessary index usage data. + GCIndexUsage() error + + // StartWorker starts for the collector worker. + StartWorker() + + // Close closes and waits for the index usage collector worker. + Close() + + // GetIndexUsage returns the index usage information + GetIndexUsage(tableID int64, indexID int64) indexusage.Sample +} + +// StatsHistory is used to manage historical stats. +type StatsHistory interface { + // RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history. + RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) + + // CheckHistoricalStatsEnable check whether historical stats is enabled. + CheckHistoricalStatsEnable() (enable bool, err error) + + // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history + RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) +} + +// StatsAnalyze is used to handle auto-analyze and manage analyze jobs. +type StatsAnalyze interface { + // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. + InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error + + // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. + DeleteAnalyzeJobs(updateTime time.Time) error + + // CleanupCorruptedAnalyzeJobsOnCurrentInstance cleans up the corrupted analyze job. + // A corrupted analyze job is one that is in a 'pending' or 'running' state, + // but is associated with a TiDB instance that is either not currently running or has been restarted. + // We use current running analyze jobs to check whether the analyze job is corrupted. + CleanupCorruptedAnalyzeJobsOnCurrentInstance(currentRunningProcessIDs map[uint64]struct{}) error + + // CleanupCorruptedAnalyzeJobsOnDeadInstances purges analyze jobs that are associated with non-existent instances. + // This function is specifically designed to handle jobs that have become corrupted due to + // their associated instances being removed from the current cluster. + CleanupCorruptedAnalyzeJobsOnDeadInstances() error + + // HandleAutoAnalyze analyzes the outdated tables. (The change percent of the table exceeds the threshold) + // It also analyzes newly created tables and newly added indexes. + HandleAutoAnalyze() (analyzed bool) + + // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. + CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool +} + +// StatsCache is used to manage all table statistics in memory. +type StatsCache interface { + // Close closes this cache. + Close() + + // Clear clears this cache. + Clear() + + // Update reads stats meta from store and updates the stats map. + Update(is infoschema.InfoSchema) error + + // MemConsumed returns its memory usage. + MemConsumed() (size int64) + + // Get returns the specified table's stats. + Get(tableID int64) (*statistics.Table, bool) + + // Put puts this table stats into the cache. + Put(tableID int64, t *statistics.Table) + + // UpdateStatsCache updates the cache. + UpdateStatsCache(addedTables []*statistics.Table, deletedTableIDs []int64) + + // MaxTableStatsVersion returns the version of the current cache, which is defined as + // the max table stats version the cache has in its lifecycle. + MaxTableStatsVersion() uint64 + + // Values returns all values in this cache. + Values() []*statistics.Table + + // Len returns the length of this cache. + Len() int + + // SetStatsCacheCapacity sets the cache's capacity. + SetStatsCacheCapacity(capBytes int64) + + // Replace replaces this cache. + Replace(cache StatsCache) + + // UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. + UpdateStatsHealthyMetrics() +} + +// StatsLockTable is the table info of which will be locked. +type StatsLockTable struct { + PartitionInfo map[int64]string + // schema name + table name. + FullName string +} + +// StatsLock is used to manage locked stats. +type StatsLock interface { + // LockTables add locked tables id to store. + // - tables: tables that will be locked. + // Return the message of skipped tables and error. + LockTables(tables map[int64]*StatsLockTable) (skipped string, err error) + + // LockPartitions add locked partitions id to store. + // If the whole table is locked, then skip all partitions of the table. + // - tid: table id of which will be locked. + // - tableName: table name of which will be locked. + // - pidNames: partition ids of which will be locked. + // Return the message of skipped tables and error. + // Note: If the whole table is locked, then skip all partitions of the table. + LockPartitions( + tid int64, + tableName string, + pidNames map[int64]string, + ) (skipped string, err error) + + // RemoveLockedTables remove tables from table locked records. + // - tables: tables of which will be unlocked. + // Return the message of skipped tables and error. + RemoveLockedTables(tables map[int64]*StatsLockTable) (skipped string, err error) + + // RemoveLockedPartitions remove partitions from table locked records. + // - tid: table id of which will be unlocked. + // - tableName: table name of which will be unlocked. + // - pidNames: partition ids of which will be unlocked. + // Note: If the whole table is locked, then skip all partitions of the table. + RemoveLockedPartitions( + tid int64, + tableName string, + pidNames map[int64]string, + ) (skipped string, err error) + + // GetLockedTables returns the locked status of the given tables. + // Note: This function query locked tables from store, so please try to batch the query. + GetLockedTables(tableIDs ...int64) (map[int64]struct{}, error) + + // GetTableLockedAndClearForTest for unit test only. + GetTableLockedAndClearForTest() (map[int64]struct{}, error) +} + +// PartitionStatisticLoadTask currently records a partition-level jsontable. +type PartitionStatisticLoadTask struct { + JSONTable *statsutil.JSONTable + PhysicalID int64 +} + +// PersistFunc is used to persist JSONTable in the partition level. +type PersistFunc func(ctx context.Context, jsonTable *statsutil.JSONTable, physicalID int64) error + +// StatsReadWriter is used to read and write stats to the storage. +// TODO: merge and remove some methods. +type StatsReadWriter interface { + // TableStatsFromStorage loads table stats info from storage. + TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) + + // LoadTablePartitionStats loads partition stats info from storage. + LoadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) + + // StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. + StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) + + // LoadNeededHistograms will load histograms for those needed columns/indices and put them into the cache. + LoadNeededHistograms() (err error) + + // ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. + ReloadExtendedStatistics() error + + // SaveStatsToStorage save the stats data to the storage. + SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, + cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) + + // SaveTableStatsToStorage saves the stats of a table to storage. + SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) + + // SaveMetaToStorage saves the stats meta of a table to storage. + SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) + + // InsertColStats2KV inserts columns stats to kv. + InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) + + // InsertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the + // new columns and indices which belong to this table. + InsertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) + + // UpdateStatsVersion will set statistics version to the newest TS, + // then tidb-server will reload automatic. + UpdateStatsVersion() error + + // UpdateStatsMetaVersionForGC updates the version of mysql.stats_meta, + // ensuring it is greater than the last garbage collection (GC) time. + // The GC worker deletes old stats based on a safe time point, + // calculated as now() - 10 * max(stats lease, ddl lease). + // The range [last GC time, safe time point) is chosen to prevent + // the simultaneous deletion of numerous stats, minimizing potential + // performance issues. + // This function ensures the version is updated beyond the last GC time, + // allowing the GC worker to delete outdated stats. + // + // Explanation: + // - ddl lease: 10 + // - stats lease: 3 + // - safe time point: now() - 10 * 10 = now() - 100 + // - now: 200 + // - last GC time: 90 + // - [last GC time, safe time point) = [90, 100) + // - To trigger stats deletion, the version must be updated beyond 90. + // + // This safeguards scenarios where a table remains unchanged for an extended period. + // For instance, if a table was created at time 90, and it's now time 200, + // with the last GC time at 95 and the safe time point at 100, + // updating the version beyond 95 ensures eventual deletion of stats. + UpdateStatsMetaVersionForGC(physicalID int64) (err error) + + // ChangeGlobalStatsID changes the global stats ID. + ChangeGlobalStatsID(from, to int64) (err error) + + // TableStatsToJSON dumps table stats to JSON. + TableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*statsutil.JSONTable, error) + + // DumpStatsToJSON dumps statistic to json. + DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, + historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*statsutil.JSONTable, error) + + // DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history. + // As implemented in getTableHistoricalStatsToJSONWithFallback, if historical stats are nonexistent, it will fall back + // to the latest stats, and these table names (and partition names) will be returned in fallbackTbls. + DumpHistoricalStatsBySnapshot( + dbName string, + tableInfo *model.TableInfo, + snapshot uint64, + ) ( + jt *statsutil.JSONTable, + fallbackTbls []string, + err error, + ) + + // DumpStatsToJSONBySnapshot dumps statistic to json. + DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*statsutil.JSONTable, error) + + // PersistStatsBySnapshot dumps statistic to json and call the function for each partition statistic to persist. + // Notice: + // 1. It might call the function `persist` with nil jsontable. + // 2. It is only used by BR, so partitions' statistic are always dumped. + PersistStatsBySnapshot(ctx context.Context, dbName string, tableInfo *model.TableInfo, snapshot uint64, persist PersistFunc) error + + // LoadStatsFromJSONConcurrently consumes concurrently the statistic task from `taskCh`. + LoadStatsFromJSONConcurrently(ctx context.Context, tableInfo *model.TableInfo, taskCh chan *PartitionStatisticLoadTask, concurrencyForPartition int) error + + // LoadStatsFromJSON will load statistic from JSONTable, and save it to the storage. + // In final, it will also udpate the stats cache. + LoadStatsFromJSON(ctx context.Context, is infoschema.InfoSchema, jsonTbl *statsutil.JSONTable, concurrencyForPartition int) error + + // LoadStatsFromJSONNoUpdate will load statistic from JSONTable, and save it to the storage. + LoadStatsFromJSONNoUpdate(ctx context.Context, is infoschema.InfoSchema, jsonTbl *statsutil.JSONTable, concurrencyForPartition int) error + + // Methods for extended stast. + + // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. + InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) + + // MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. + MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) + + // SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. + SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) +} + +// NeededItemTask represents one needed column/indices with expire time. +type NeededItemTask struct { + ToTimeout time.Time + ResultCh chan stmtctx.StatsLoadResult + Item model.StatsLoadItem + Retry int +} + +// StatsLoad is used to load stats concurrently +// TODO(hawkingrei): Our implementation of loading statistics is flawed. +// Currently, we enqueue tasks that require loading statistics into a channel, +// from which workers retrieve tasks to process. Then, using the singleflight mechanism, +// we filter out duplicate tasks. However, the issue with this approach is that it does +// not filter out all duplicate tasks, but only the duplicates within the number of workers. +// Such an implementation is not reasonable. +// +// We should first filter all tasks through singleflight as shown in the diagram, and then use workers to load stats. +// +// ┌─────────▼──────────▼─────────────▼──────────────▼────────────────▼────────────────────┐ +// │ │ +// │ singleflight │ +// │ │ +// └───────────────────────────────────────────────────────────────────────────────────────┘ +// +// │ │ +// ┌────────────▼──────┐ ┌───────▼───────────┐ +// │ │ │ │ +// │ syncload worker │ │ syncload worker │ +// │ │ │ │ +// └───────────────────┘ └───────────────────┘ +type StatsLoad struct { + NeededItemsCh chan *NeededItemTask + TimeoutItemsCh chan *NeededItemTask + Singleflight singleflight.Group + sync.Mutex +} + +// StatsSyncLoad implement the sync-load feature. +type StatsSyncLoad interface { + // SendLoadRequests sends load requests to the channel. + SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error + + // SyncWaitStatsLoad will wait for the load requests to finish. + SyncWaitStatsLoad(sc *stmtctx.StatementContext) error + + // AppendNeededItem appends a needed item to the channel. + AppendNeededItem(task *NeededItemTask, timeout time.Duration) error + + // SubLoadWorker will start a goroutine to handle the load requests. + SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) + + // HandleOneTask will handle one task. + HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) +} + +// StatsGlobal is used to manage partition table global stats. +type StatsGlobal interface { + // MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID. + MergePartitionStats2GlobalStatsByTableID(sctx sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + physicalID int64, + isIndex bool, + histIDs []int64, + ) (globalStats any, err error) +} + +// DDL is used to handle ddl events. +type DDL interface { + // HandleDDLEvent handles ddl events. + HandleDDLEvent(event *statsutil.DDLEvent) error + // DDLEventCh returns ddl events channel in handle. + DDLEventCh() chan *statsutil.DDLEvent +} + +// StatsHandle is used to manage TiDB Statistics. +type StatsHandle interface { + // Pool is used to get a session or a goroutine to execute stats updating. + statsutil.Pool + + // AutoAnalyzeProcIDGenerator is used to generate auto analyze proc ID. + statsutil.AutoAnalyzeProcIDGenerator + + // LeaseGetter is used to get stats lease. + statsutil.LeaseGetter + + // TableInfoGetter is used to get table meta info. + statsutil.TableInfoGetter + + // GetTableStats retrieves the statistics table from cache, and the cache will be updated by a goroutine. + GetTableStats(tblInfo *model.TableInfo) *statistics.Table + + // GetTableStatsForAutoAnalyze retrieves the statistics table from cache, but it will not return pseudo. + GetTableStatsForAutoAnalyze(tblInfo *model.TableInfo) *statistics.Table + + // GetPartitionStats retrieves the partition stats from cache. + GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table + + // GetPartitionStatsForAutoAnalyze retrieves the partition stats from cache, but it will not return pseudo. + GetPartitionStatsForAutoAnalyze(tblInfo *model.TableInfo, pid int64) *statistics.Table + + // StatsGC is used to do the GC job. + StatsGC + + // StatsUsage is used to handle table delta and stats usage. + StatsUsage + + // StatsHistory is used to manage historical stats. + StatsHistory + + // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. + StatsAnalyze + + // StatsCache is used to manage all table statistics in memory. + StatsCache + + // StatsLock is used to manage locked stats. + StatsLock + + // StatsReadWriter is used to read and write stats to the storage. + StatsReadWriter + + // StatsGlobal is used to manage partition table global stats. + StatsGlobal + + // DDL is used to handle ddl events. + DDL +} diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index c4f30e6ef0de7..aa622557b0c59 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -209,6 +209,19 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Error(t, err1) require.NotNil(t, task1) + select { + case <-stmtCtx1.StatsLoad.ResultCh: + t.Logf("stmtCtx1.ResultCh should not get anything") + t.FailNow() + case <-stmtCtx2.StatsLoad.ResultCh: + t.Logf("stmtCtx2.ResultCh should not get anything") + t.FailNow() + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get anything") + t.FailNow() + default: + } + require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err3) @@ -231,3 +244,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Greater(t, hg.Len()+topn.Num(), 0) } } + +func TestRetry(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + testKit.MustExec("analyze table t") + + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + + h := dom.StatsHandle() + + neededColumns := make([]model.StatsLoadItem, 1) + neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} + timeout := time.Nanosecond * mathutil.MaxInt + + // clear statsCache + h.Clear() + require.NoError(t, dom.StatsHandle().Update(is)) + + // no stats at beginning + stat := h.GetTableStats(tableInfo) + c, ok := stat.Columns[tableInfo.Columns[2].ID] + require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) + + stmtCtx1 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx1, neededColumns, timeout) + stmtCtx2 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx2, neededColumns, timeout) + + exitCh := make(chan struct{}) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)")) + var ( + task1 *types.NeededItemTask + err1 error + ) + + for i := 0; i < syncload.RetryCount; i++ { + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } + } + result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.NoError(t, err1) + require.Nil(t, result) + select { + case <-task1.ResultCh: + default: + t.Logf("task1.ResultCh should get nothing") + t.FailNow() + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) +}