Skip to content

Commit

Permalink
statstics: correctly handle error when merging global stats (#47770) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 19, 2023
1 parent 84332a7 commit 5514a9b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ go_test(
"topn_bench_test.go",
],
flaky = True,
shard_count = 14,
shard_count = 18,
deps = [
":globalstats",
"//pkg/config",
Expand Down
107 changes: 77 additions & 30 deletions pkg/statistics/handle/globalstats/global_stats_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -81,10 +83,13 @@ type AsyncMergePartitionStats2GlobalStats struct {
PartitionDefinition map[int64]model.PartitionDefinition
tableInfo map[int64]*model.TableInfo
// key is partition id and histID
skipPartition map[skipItem]struct{}
skipPartition map[skipItem]struct{}
// ioWorker meet error, it will close this channel to notify cpuWorker.
ioWorkerExitWhenErrChan chan struct{}
// cpuWorker exit, it will close this channel to notify ioWorker.
cpuWorkerExitChan chan struct{}
getTableByPhysicalIDFn getTableByPhysicalIDFunc
callWithSCtxFunc callWithSCtxFunc
exitWhenErrChan chan struct{}
globalTableInfo *model.TableInfo
histIDs []int64
globalStatsNDV []int64
Expand All @@ -103,22 +108,23 @@ func NewAsyncMergePartitionStats2GlobalStats(
callWithSCtxFunc callWithSCtxFunc) (*AsyncMergePartitionStats2GlobalStats, error) {
partitionNum := len(globalTableInfo.Partition.Definitions)
return &AsyncMergePartitionStats2GlobalStats{
callWithSCtxFunc: callWithSCtxFunc,
cmsketch: make(chan mergeItem[*statistics.CMSketch], 5),
fmsketch: make(chan mergeItem[*statistics.FMSketch], 5),
histogramAndTopn: make(chan mergeItem[*StatsWrapper]),
PartitionDefinition: make(map[int64]model.PartitionDefinition),
tableInfo: make(map[int64]*model.TableInfo),
partitionIDs: make([]int64, 0, partitionNum),
exitWhenErrChan: make(chan struct{}),
skipPartition: make(map[skipItem]struct{}),
gpool: gpool,
allPartitionStats: make(map[int64]*statistics.Table),
globalTableInfo: globalTableInfo,
getTableByPhysicalIDFn: getTableByPhysicalIDFn,
histIDs: histIDs,
is: is,
partitionNum: partitionNum,
callWithSCtxFunc: callWithSCtxFunc,
cmsketch: make(chan mergeItem[*statistics.CMSketch], 5),
fmsketch: make(chan mergeItem[*statistics.FMSketch], 5),
histogramAndTopn: make(chan mergeItem[*StatsWrapper]),
PartitionDefinition: make(map[int64]model.PartitionDefinition),
tableInfo: make(map[int64]*model.TableInfo),
partitionIDs: make([]int64, 0, partitionNum),
ioWorkerExitWhenErrChan: make(chan struct{}),
cpuWorkerExitChan: make(chan struct{}),
skipPartition: make(map[skipItem]struct{}),
gpool: gpool,
allPartitionStats: make(map[int64]*statistics.Table),
globalTableInfo: globalTableInfo,
getTableByPhysicalIDFn: getTableByPhysicalIDFn,
histIDs: histIDs,
is: is,
partitionNum: partitionNum,
}, nil
}

Expand Down Expand Up @@ -226,25 +232,32 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissin
func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) {
defer func() {
if r := recover(); r != nil {
close(a.exitWhenErrChan)
logutil.BgLogger().Warn("ioWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats"))
close(a.ioWorkerExitWhenErrChan)
err = errors.New(fmt.Sprint(r))
}
}()
err = a.loadFmsketch(sctx, isIndex)
if err != nil {
close(a.exitWhenErrChan)
close(a.ioWorkerExitWhenErrChan)
return err
}
close(a.fmsketch)
err = a.loadCMsketch(sctx, isIndex)
if err != nil {
close(a.exitWhenErrChan)
close(a.ioWorkerExitWhenErrChan)
return err
}
close(a.cmsketch)
failpoint.Inject("PanicSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
panic("test for PanicSameTime")
}
})
err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex)
if err != nil {
close(a.exitWhenErrChan)
close(a.ioWorkerExitWhenErrChan)
return err
}
close(a.histogramAndTopn)
Expand All @@ -254,13 +267,14 @@ func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context,
func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) {
defer func() {
if r := recover(); r != nil {
close(a.exitWhenErrChan)
logutil.BgLogger().Warn("cpuWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats"))
err = errors.New(fmt.Sprint(r))
}
close(a.cpuWorkerExitChan)
}()
a.dealFMSketch()
select {
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return nil
default:
for i := 0; i < a.globalStats.Num; i++ {
Expand All @@ -275,10 +289,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem
}
err = a.dealCMSketch()
if err != nil {
logutil.BgLogger().Warn("dealCMSketch failed", zap.Error(err), zap.String("category", "stats"))
return err
}
failpoint.Inject("PanicSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
panic("test for PanicSameTime")
}
})
err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion)
if err != nil {
logutil.BgLogger().Warn("dealHistogramAndTopN failed", zap.Error(err), zap.String("category", "stats"))
return err
}
return nil
Expand Down Expand Up @@ -345,7 +367,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Cont
case a.fmsketch <- mergeItem[*statistics.FMSketch]{
fmsketch, i,
}:
case <-a.exitWhenErrChan:
case <-a.cpuWorkerExitChan:
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
return nil
}
}
Expand Down Expand Up @@ -375,7 +398,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont
case a.cmsketch <- mergeItem[*statistics.CMSketch]{
cmsketch, i,
}:
case <-a.exitWhenErrChan:
case <-a.cpuWorkerExitChan:
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
return nil
}
}
Expand All @@ -384,6 +408,12 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont
}

func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error {
failpoint.Inject("ErrorSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
failpoint.Return(errors.New("ErrorSameTime returned error"))
}
})
for i := 0; i < a.globalStats.Num; i++ {
hists := make([]*statistics.Histogram, 0, a.partitionNum)
topn := make([]*statistics.TopN, 0, a.partitionNum)
Expand All @@ -410,7 +440,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx session
case a.histogramAndTopn <- mergeItem[*StatsWrapper]{
NewStatsWrapper(hists, topn), i,
}:
case <-a.exitWhenErrChan:
case <-a.cpuWorkerExitChan:
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
return nil
}
}
Expand All @@ -430,13 +461,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() {
} else {
a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item)
}
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return
}
}
}

func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error {
failpoint.Inject("dealCMSketchErr", func(val failpoint.Value) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("dealCMSketch returned error"))
}
})
for {
select {
case cms, ok := <-a.cmsketch:
Expand All @@ -451,13 +487,24 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error {
return err
}
}
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return nil
}
}
}

func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) {
failpoint.Inject("dealHistogramAndTopNErr", func(val failpoint.Value) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("dealHistogramAndTopNErr returned error"))
}
})
failpoint.Inject("ErrorSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
failpoint.Return(errors.New("ErrorSameTime returned error"))
}
})
for {
select {
case item, ok := <-a.histogramAndTopn:
Expand Down Expand Up @@ -486,7 +533,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stm
a.globalStats.Hg[item.idx].Buckets[j].NDV = 0
}
a.globalStats.Hg[item.idx].NDV = a.globalStatsNDV[item.idx]
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return nil
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/statistics/handle/globalstats/globalstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ func TestGlobalStatsPanicInIOWorker(t *testing.T) {
simpleTest(t)
}

func TestGlobalStatsWithCMSketchErr(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealCMSketchErr"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestGlobalStatsWithHistogramAndTopNErr(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealHistogramAndTopNErr"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestGlobalStatsPanicInCPUWorker(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInCPUWorker"
require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")"))
Expand All @@ -85,6 +103,24 @@ func TestGlobalStatsPanicInCPUWorker(t *testing.T) {
simpleTest(t)
}

func TestGlobalStatsPanicSametime(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicSameTime"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestGlobalStatsErrorSametime(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/ErrorSameTime"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestBuildGlobalLevelStats(t *testing.T) {
store := testkit.CreateMockStore(t)
testKit := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit 5514a9b

Please sign in to comment.