Skip to content

Commit

Permalink
exeuctor: fix coprocessor goroutine leak for IndexMerge (#41610) (#41655
Browse files Browse the repository at this point in the history
)

close #41605
  • Loading branch information
ti-chi-bot authored Feb 27, 2023
1 parent f913141 commit 32ab7eb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
20 changes: 20 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,13 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.partialNetDataSizes[workID]))

var notClosedSelectResult distsql.SelectResult
defer func() {
// To make sure SelectResult.Close() is called even got panic in fetchHandles().
if notClosedSelectResult != nil {
terror.Call(notClosedSelectResult.Close)
}
}()
for parTblIdx, keyRange := range keyRanges {
// check if this executor is closed
select {
Expand All @@ -388,6 +395,8 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
syncErr(ctx, e.finished, fetchCh, err)
return
}
notClosedSelectResult = result
failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", nil)
worker.batchSize = e.maxChunkSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand All @@ -402,6 +411,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again
e.feedbacks[workID].Invalidate()
}
notClosedSelectResult = nil
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
}
Expand Down Expand Up @@ -479,6 +489,13 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
partialTableReader.dagPB = e.dagPBs[workID]
}

var tableReaderClosed bool
defer func() {
// To make sure SelectResult.Close() is called even got panic in fetchHandles().
if !tableReaderClosed {
terror.Call(worker.tableReader.Close)
}
}()
for parTblIdx, tbl := range tbls {
// check if this executor is closed
select {
Expand All @@ -494,6 +511,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
syncErr(ctx, e.finished, fetchCh, err)
break
}
failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", nil)
tableReaderClosed = false
worker.batchSize = e.maxChunkSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand All @@ -511,6 +530,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,

// release related resources
cancel()
tableReaderClosed = true
if err = worker.tableReader.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
}
Expand Down
32 changes: 32 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,3 +881,35 @@ func TestIndexMergePanic(t *testing.T) {
require.NoError(t, failpoint.Disable(fp))
}
}

func TestIndexMergeCoprGoroutinesLeak(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")

var err error
sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;"
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")

// If got goroutines leak in coprocessor, ci will fail.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", `panic("testIndexMergePartialTableWorkerCoprLeak")`))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergePartialTableWorkerCoprLeak")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak", `panic("testIndexMergePartialIndexWorkerCoprLeak")`))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergePartialIndexWorkerCoprLeak")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak"))
}

0 comments on commit 32ab7eb

Please sign in to comment.