Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add comments and change isIndex to bool #47062

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
e.Ctx(),
globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID,
info.isIndex, info.histIDs,
info.isIndex == 1,
info.histIDs,
tableAllPartitionStats,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalColStatsBucketNum)
}
// Generate the new column global-stats
newColGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, 0, nil, nil)
newColGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, false, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
if globalIdxStatsBucketNum != 0 {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalIdxStatsBucketNum)
}
newIndexGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, 1, []int64{idx.ID}, nil)
newIndexGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, true, []int64{idx.ID}, nil)
if err != nil {
return err
}
Expand Down
116 changes: 76 additions & 40 deletions statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,24 @@ type GlobalStats struct {
ModifyCount int64
}

func newGlobalStats(histCount int) *GlobalStats {
globalStats := new(GlobalStats)
globalStats.Num = histCount
globalStats.Count = 0
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)
globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num)

return globalStats
}

type (
getTableByPhysicalIDFunc func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool)
loadTablePartitionStatsFunc func(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error)
// GlobalStatusHandler is used to handle the global-level stats.
GlobalStatusHandler struct {
// this gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
// This gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
gpool *gp.Pool
}
)
Expand All @@ -69,14 +81,18 @@ func NewGlobalStatusHandler(gpool *gp.Pool) *GlobalStatusHandler {
}

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo.
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, globalTableInfo *model.TableInfo,
isIndex int, histIDs []int64,
allPartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, loadTablePartitionStatsFn loadTablePartitionStatsFunc) (globalStats *GlobalStats, err error) {
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
) (globalStats *GlobalStats, err error) {
partitionNum := len(globalTableInfo.Partition.Definitions)

// initialized the globalStats
globalStats = new(GlobalStats)
if len(histIDs) == 0 {
for _, col := range globalTableInfo.Columns {
// The virtual generated column stats can not be merged to the global stats.
Expand All @@ -86,17 +102,15 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
histIDs = append(histIDs, col.ID)
}
}
globalStats.Num = len(histIDs)
globalStats.Count = 0
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)
globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num)

// The first dimension of slice is means the number of column or index stats in the globalStats.
// The second dimension of slice is means the number of partition tables.
// Initialized the globalStats.
globalStats = newGlobalStats(len(histIDs))

// Slice Dimensions Explanation
// First dimension: Column or Index Stats
// Second dimension: Partition Tables
// Because all topN and histograms need to be collected before they can be merged.
// So we should store all of the partition-level stats first, and merge them together.
// So we should store all the partition-level stats first, and merge them together.
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
Expand All @@ -116,12 +130,14 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID)
return
}

tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
if allPartitionStats != nil {
partitionStats, ok = allPartitionStats[partitionID]
}
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats

// If preload partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats.
if allPartitionStats == nil || partitionStats == nil || !ok {
var err1 error
partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def)
Expand All @@ -138,12 +154,13 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
}
allPartitionStats[partitionID] = partitionStats
}

for i := 0; i < globalStats.Num; i++ {
hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1)
hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex)
skipPartition := false
if !analyzed {
var missingPart string
if isIndex == 0 {
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
Expand All @@ -155,10 +172,11 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart)
skipPartition = true
}
// partition stats is not empty but column stats(hist, topn) is missing

// Partition stats is not empty but column stats(hist, topN) is missing.
if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) {
var missingPart string
if isIndex == 0 {
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
Expand All @@ -167,14 +185,16 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topn")
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN")
skipPartition = true
}

if i == 0 {
// In a partition, we will only update globalStats.Count once
// In a partition, we will only update globalStats.Count once.
globalStats.Count += partitionStats.RealtimeCount
globalStats.ModifyCount += partitionStats.ModifyCount
}

if !skipPartition {
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
Expand All @@ -184,15 +204,15 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
}
}

// After collect all of the statistics from the partition-level stats,
// After collect all the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
if len(allHg[i]) == 0 {
// If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0`
// correctly. It can avoid unexpected behaviors such as nil pointer panic.
continue
}
// Merge CMSketch
// Merge CMSketch.
globalStats.Cms[i] = allCms[i][0].Copy()
for j := 1; j < len(allCms[i]); j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
Expand All @@ -201,18 +221,21 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
}
}

// Merge topN. We need to merge TopN before merging the histogram.
// Merge topN.
// Note: We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
var poppedTopN []statistics.TopNMeta
wrapper := NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(g.gpool, sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(g.gpool, sc, wrapper,
sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex)
if err != nil {
return
}

// Merge histogram
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], popedTopN, int64(opts[ast.AnalyzeOptNumBuckets]), isIndex == 1)
// Merge histogram.
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN,
int64(opts[ast.AnalyzeOptNumBuckets]), isIndex)
if err != nil {
return
}
Expand All @@ -222,51 +245,64 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
globalStats.Hg[i].Buckets[j].NDV = 0
}

// Update NDV of global-level stats
// Merge FMSketch.
globalStats.Fms[i] = allFms[i][0].Copy()
for j := 1; j < len(allFms[i]); j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
}

// update the NDV
// Update the global NDV.
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Hg[i].NDV = globalStatsNDV
}

return
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
physicalID int64, isIndex int, histIDs []int64,
tablePartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, loadTablePartitionStatsFn loadTablePartitionStatsFunc) (globalStats *GlobalStats, err error) {
// get the partition table IDs
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStatsByTableID(
sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
physicalID int64,
isIndex bool,
histIDs []int64,
tablePartitionStats map[int64]*statistics.Table,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
) (globalStats *GlobalStats, err error) {
// Get the partition table IDs.
globalTable, ok := getTableByPhysicalIDFn(is, physicalID)
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID)
return
}

globalTableInfo := globalTable.Meta()
globalStats, err = g.MergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, tablePartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn)
globalStats, err = g.MergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs,
tablePartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn)
if err != nil {
return
}

if len(globalStats.MissingPartitionStats) > 0 {
var item string
if isIndex == 0 {
if !isIndex {
item = "columns"
} else {
item = "index"
if len(histIDs) > 0 {
item += " " + globalTableInfo.FindIndexNameByID(histIDs[0])
}
}

logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L),
zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats))
}

return
}

Expand Down
21 changes: 15 additions & 6 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,14 @@ func UpdateSCtxVarsForStats(sctx sessionctx.Context) error {
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
func (h *Handle) MergePartitionStats2GlobalStatsByTableID(
sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
physicalID int64, isIndex int, histIDs []int64,
tablePartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) {
physicalID int64,
isIndex bool,
histIDs []int64,
tablePartitionStats map[int64]*statistics.Table,
) (globalStats *globalstats.GlobalStats, err error) {
return h.globalstatushandler.MergePartitionStats2GlobalStatsByTableID(sc, opts, is, physicalID, isIndex, histIDs, tablePartitionStats, h.getTableByPhysicalID, h.loadTablePartitionStats)
}

Expand All @@ -398,9 +402,14 @@ func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDe
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableInfo.
func (h *Handle) mergePartitionStats2GlobalStats(opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema, globalTableInfo *model.TableInfo, isIndex int, histIDs []int64,
allPartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) {
func (h *Handle) mergePartitionStats2GlobalStats(
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
) (globalStats *globalstats.GlobalStats, err error) {
se, err := h.pool.Get()
if err != nil {
return nil, err
Expand Down