From 5514a9b49efd41dca3d43008faf08cdab246bd3d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 20 Oct 2023 00:04:59 +0800 Subject: [PATCH] statstics: correctly handle error when merging global stats (#47770) (#47812) close pingcap/tidb#47771 --- pkg/statistics/handle/globalstats/BUILD.bazel | 2 +- .../handle/globalstats/global_stats_async.go | 107 +++++++++++++----- .../handle/globalstats/globalstats_test.go | 36 ++++++ 3 files changed, 114 insertions(+), 31 deletions(-) diff --git a/pkg/statistics/handle/globalstats/BUILD.bazel b/pkg/statistics/handle/globalstats/BUILD.bazel index 478cd6c2047df..18e2e0052c7fa 100644 --- a/pkg/statistics/handle/globalstats/BUILD.bazel +++ b/pkg/statistics/handle/globalstats/BUILD.bazel @@ -41,7 +41,7 @@ go_test( "topn_bench_test.go", ], flaky = True, - shard_count = 14, + shard_count = 18, deps = [ ":globalstats", "//pkg/config", diff --git a/pkg/statistics/handle/globalstats/global_stats_async.go b/pkg/statistics/handle/globalstats/global_stats_async.go index ad78f4b308973..2d8dcc6dbabf3 100644 --- a/pkg/statistics/handle/globalstats/global_stats_async.go +++ b/pkg/statistics/handle/globalstats/global_stats_async.go @@ -31,7 +31,9 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tiancaiamao/gp" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -81,10 +83,13 @@ type AsyncMergePartitionStats2GlobalStats struct { PartitionDefinition map[int64]model.PartitionDefinition tableInfo map[int64]*model.TableInfo // key is partition id and histID - skipPartition map[skipItem]struct{} + skipPartition map[skipItem]struct{} + // ioWorker meet error, it will close this channel to notify cpuWorker. + ioWorkerExitWhenErrChan chan struct{} + // cpuWorker exit, it will close this channel to notify ioWorker. + cpuWorkerExitChan chan struct{} getTableByPhysicalIDFn getTableByPhysicalIDFunc callWithSCtxFunc callWithSCtxFunc - exitWhenErrChan chan struct{} globalTableInfo *model.TableInfo histIDs []int64 globalStatsNDV []int64 @@ -103,22 +108,23 @@ func NewAsyncMergePartitionStats2GlobalStats( callWithSCtxFunc callWithSCtxFunc) (*AsyncMergePartitionStats2GlobalStats, error) { partitionNum := len(globalTableInfo.Partition.Definitions) return &AsyncMergePartitionStats2GlobalStats{ - callWithSCtxFunc: callWithSCtxFunc, - cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), - fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), - histogramAndTopn: make(chan mergeItem[*StatsWrapper]), - PartitionDefinition: make(map[int64]model.PartitionDefinition), - tableInfo: make(map[int64]*model.TableInfo), - partitionIDs: make([]int64, 0, partitionNum), - exitWhenErrChan: make(chan struct{}), - skipPartition: make(map[skipItem]struct{}), - gpool: gpool, - allPartitionStats: make(map[int64]*statistics.Table), - globalTableInfo: globalTableInfo, - getTableByPhysicalIDFn: getTableByPhysicalIDFn, - histIDs: histIDs, - is: is, - partitionNum: partitionNum, + callWithSCtxFunc: callWithSCtxFunc, + cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), + fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), + histogramAndTopn: make(chan mergeItem[*StatsWrapper]), + PartitionDefinition: make(map[int64]model.PartitionDefinition), + tableInfo: make(map[int64]*model.TableInfo), + partitionIDs: make([]int64, 0, partitionNum), + ioWorkerExitWhenErrChan: make(chan struct{}), + cpuWorkerExitChan: make(chan struct{}), + skipPartition: make(map[skipItem]struct{}), + gpool: gpool, + allPartitionStats: make(map[int64]*statistics.Table), + globalTableInfo: globalTableInfo, + getTableByPhysicalIDFn: getTableByPhysicalIDFn, + histIDs: histIDs, + is: is, + partitionNum: partitionNum, }, nil } @@ -226,25 +232,32 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissin func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) { defer func() { if r := recover(); r != nil { - close(a.exitWhenErrChan) + logutil.BgLogger().Warn("ioWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats")) + close(a.ioWorkerExitWhenErrChan) err = errors.New(fmt.Sprint(r)) } }() err = a.loadFmsketch(sctx, isIndex) if err != nil { - close(a.exitWhenErrChan) + close(a.ioWorkerExitWhenErrChan) return err } close(a.fmsketch) err = a.loadCMsketch(sctx, isIndex) if err != nil { - close(a.exitWhenErrChan) + close(a.ioWorkerExitWhenErrChan) return err } close(a.cmsketch) + failpoint.Inject("PanicSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + panic("test for PanicSameTime") + } + }) err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex) if err != nil { - close(a.exitWhenErrChan) + close(a.ioWorkerExitWhenErrChan) return err } close(a.histogramAndTopn) @@ -254,13 +267,14 @@ func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { defer func() { if r := recover(); r != nil { - close(a.exitWhenErrChan) + logutil.BgLogger().Warn("cpuWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats")) err = errors.New(fmt.Sprint(r)) } + close(a.cpuWorkerExitChan) }() a.dealFMSketch() select { - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return nil default: for i := 0; i < a.globalStats.Num; i++ { @@ -275,10 +289,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem } err = a.dealCMSketch() if err != nil { + logutil.BgLogger().Warn("dealCMSketch failed", zap.Error(err), zap.String("category", "stats")) return err } + failpoint.Inject("PanicSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + panic("test for PanicSameTime") + } + }) err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion) if err != nil { + logutil.BgLogger().Warn("dealHistogramAndTopN failed", zap.Error(err), zap.String("category", "stats")) return err } return nil @@ -345,7 +367,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Cont case a.fmsketch <- mergeItem[*statistics.FMSketch]{ fmsketch, i, }: - case <-a.exitWhenErrChan: + case <-a.cpuWorkerExitChan: + logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats")) return nil } } @@ -375,7 +398,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont case a.cmsketch <- mergeItem[*statistics.CMSketch]{ cmsketch, i, }: - case <-a.exitWhenErrChan: + case <-a.cpuWorkerExitChan: + logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats")) return nil } } @@ -384,6 +408,12 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont } func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error { + failpoint.Inject("ErrorSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + failpoint.Return(errors.New("ErrorSameTime returned error")) + } + }) for i := 0; i < a.globalStats.Num; i++ { hists := make([]*statistics.Histogram, 0, a.partitionNum) topn := make([]*statistics.TopN, 0, a.partitionNum) @@ -410,7 +440,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx session case a.histogramAndTopn <- mergeItem[*StatsWrapper]{ NewStatsWrapper(hists, topn), i, }: - case <-a.exitWhenErrChan: + case <-a.cpuWorkerExitChan: + logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats")) return nil } } @@ -430,13 +461,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() { } else { a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item) } - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return } } } func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error { + failpoint.Inject("dealCMSketchErr", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("dealCMSketch returned error")) + } + }) for { select { case cms, ok := <-a.cmsketch: @@ -451,13 +487,24 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error { return err } } - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return nil } } } func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { + failpoint.Inject("dealHistogramAndTopNErr", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("dealHistogramAndTopNErr returned error")) + } + }) + failpoint.Inject("ErrorSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + failpoint.Return(errors.New("ErrorSameTime returned error")) + } + }) for { select { case item, ok := <-a.histogramAndTopn: @@ -486,7 +533,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stm a.globalStats.Hg[item.idx].Buckets[j].NDV = 0 } a.globalStats.Hg[item.idx].NDV = a.globalStatsNDV[item.idx] - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return nil } } diff --git a/pkg/statistics/handle/globalstats/globalstats_test.go b/pkg/statistics/handle/globalstats/globalstats_test.go index 663667e5735d8..84a534517dd0a 100644 --- a/pkg/statistics/handle/globalstats/globalstats_test.go +++ b/pkg/statistics/handle/globalstats/globalstats_test.go @@ -76,6 +76,24 @@ func TestGlobalStatsPanicInIOWorker(t *testing.T) { simpleTest(t) } +func TestGlobalStatsWithCMSketchErr(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealCMSketchErr" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsWithHistogramAndTopNErr(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealHistogramAndTopNErr" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + func TestGlobalStatsPanicInCPUWorker(t *testing.T) { fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInCPUWorker" require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")")) @@ -85,6 +103,24 @@ func TestGlobalStatsPanicInCPUWorker(t *testing.T) { simpleTest(t) } +func TestGlobalStatsPanicSametime(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicSameTime" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsErrorSametime(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/ErrorSameTime" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + func TestBuildGlobalLevelStats(t *testing.T) { store := testkit.CreateMockStore(t) testKit := testkit.NewTestKit(t, store)