From c1ea5bf4053254c785cbf17da67f7e87728df2bd Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 20 Feb 2023 17:16:18 +0800 Subject: [PATCH 01/10] exeuctor: make sure SelectResult.Close() is called for IndexMerge Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index c4da6edc9f5cb..99b043f47b615 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -361,6 +361,13 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, SetFromInfoSchema(e.ctx.GetInfoSchema()). SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.partialNetDataSizes[workID])) + selectResults := make([]distsql.SelectResult, 0, len(keyRanges)) + defer func() { + // Goroutine may panic and SelectResult.Close() will be ignored unexpectedly. + for _, s := range selectResults { + terror.Call(s.Close) + } + }() for parTblIdx, keyRange := range keyRanges { // check if this executor is closed select { @@ -384,6 +391,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) return } + selectResults = append(selectResults, result) worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -401,6 +409,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } + selectResults = selectResults[:len(selectResults)-1] cancel() e.ctx.StoreQueryFeedback(e.feedbacks[workID]) if fetchErr != nil { @@ -475,6 +484,13 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, partialTableReader.dagPB = e.dagPBs[workID] } + var tableReaderClosed bool + defer func() { + // Goroutine may panic and SelectResult.Close() will be ignored unexpectedly. + if !tableReaderClosed { + terror.Call(worker.tableReader.Close) + } + }() for parTblIdx, tbl := range tbls { // check if this executor is closed select { @@ -490,6 +506,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) break } + tableReaderClosed = false worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -510,6 +527,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, if err = worker.tableReader.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } + tableReaderClosed = true e.ctx.StoreQueryFeedback(e.feedbacks[workID]) if fetchErr != nil { break From 2f1ff229753e01b5e37128f54d9ff9750db95e79 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 20 Feb 2023 21:50:54 +0800 Subject: [PATCH 02/10] refine comments Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 99b043f47b615..79ee029dae73b 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -363,7 +363,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, selectResults := make([]distsql.SelectResult, 0, len(keyRanges)) defer func() { - // Goroutine may panic and SelectResult.Close() will be ignored unexpectedly. + // To make sure SelectResult.Close() is called even got panic in fetchHandles(). for _, s := range selectResults { terror.Call(s.Close) } @@ -486,7 +486,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, var tableReaderClosed bool defer func() { - // Goroutine may panic and SelectResult.Close() will be ignored unexpectedly. + // To make sure SelectResult.Close() is called even got panic in fetchHandles(). if !tableReaderClosed { terror.Call(worker.tableReader.Close) } From d608f4d6148f88e28d3e4785e013279aa5b4ebc4 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 21 Feb 2023 16:50:04 +0800 Subject: [PATCH 03/10] add case Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 8 +++++++ executor/index_merge_reader_test.go | 34 +++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 79ee029dae73b..59741af6c9dd5 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -391,6 +391,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) return } + failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", func(v failpoint.Value) { + time.Sleep(time.Duration(v.(int))) + panic("testIndexMergePartialIndexWorkerCoprLeak") + }) selectResults = append(selectResults, result) worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { @@ -506,6 +510,10 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) break } + failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", func(v failpoint.Value) { + time.Sleep(time.Duration(v.(int))) + panic("testIndexMergePartialTableWorkerCoprLeak") + }) tableReaderClosed = false worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index be1ff66a163ab..328a41bcddc25 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -881,3 +881,37 @@ func TestIndexMergePanic(t *testing.T) { require.NoError(t, failpoint.Disable(fp)) } } + +func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + insertStr := "insert into t1 values(0, 0, 0)" + for i := 1; i < 1000; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", "return(3)")) + + sql := fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;") + res := tk.MustQuery("explain " + sql).Rows() + require.Contains(t, res[1][0], "IndexMerge") + + err := tk.QueryToErr(sql) + require.Contains(t, err.Error(), "testIndexMergePartialTableWorkerCoprLeak") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak")) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak", "return(3)")) + + err = tk.QueryToErr(sql) + require.Contains(t, err.Error(), "testIndexMergePartialIndexWorkerCoprLeak") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak")) +} From 3a86ab3103d3c3de5cce7ba93046aa87c3da2e3b Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 21 Feb 2023 16:54:41 +0800 Subject: [PATCH 04/10] refine comment Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 328a41bcddc25..fc8bf68e3acc5 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -897,21 +897,17 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { tk.MustExec("analyze table t1;") tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + // If got goroutines leak in coprocessor, ci will fail. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", "return(3)")) - sql := fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;") res := tk.MustQuery("explain " + sql).Rows() require.Contains(t, res[1][0], "IndexMerge") - err := tk.QueryToErr(sql) require.Contains(t, err.Error(), "testIndexMergePartialTableWorkerCoprLeak") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak", "return(3)")) - err = tk.QueryToErr(sql) require.Contains(t, err.Error(), "testIndexMergePartialIndexWorkerCoprLeak") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak")) } From 1200adb029e655bd4adb0f750536e52fbae90582 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 21 Feb 2023 17:05:09 +0800 Subject: [PATCH 05/10] fix unit-test Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 2 +- executor/index_merge_reader_test.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 59741af6c9dd5..f1a3a82296cd9 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -391,11 +391,11 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) return } + selectResults = append(selectResults, result) failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", func(v failpoint.Value) { time.Sleep(time.Duration(v.(int))) panic("testIndexMergePartialIndexWorkerCoprLeak") }) - selectResults = append(selectResults, result) worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index fc8bf68e3acc5..333d47e709915 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -897,12 +897,14 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { tk.MustExec("analyze table t1;") tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") - // If got goroutines leak in coprocessor, ci will fail. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", "return(3)")) + var err error sql := fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;") res := tk.MustQuery("explain " + sql).Rows() require.Contains(t, res[1][0], "IndexMerge") - err := tk.QueryToErr(sql) + + // If got goroutines leak in coprocessor, ci will fail. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", "return(3)")) + err = tk.QueryToErr(sql) require.Contains(t, err.Error(), "testIndexMergePartialTableWorkerCoprLeak") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak")) From 15ed8cdcce21233c40c3d6d04ff88b4673ff87cb Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 21 Feb 2023 17:08:48 +0800 Subject: [PATCH 06/10] fix case build Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 333d47e709915..47f5961760f3a 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -898,7 +898,7 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") var err error - sql := fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;") + sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;" res := tk.MustQuery("explain " + sql).Rows() require.Contains(t, res[1][0], "IndexMerge") From 29238526b8dc05c3af29541a9c32562a132292b6 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 22 Feb 2023 11:45:04 +0800 Subject: [PATCH 07/10] fix sleep second Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index f1a3a82296cd9..2ba627b87f641 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -393,7 +393,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } selectResults = append(selectResults, result) failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", func(v failpoint.Value) { - time.Sleep(time.Duration(v.(int))) + time.Sleep(time.Second * time.Duration(v.(int))) panic("testIndexMergePartialIndexWorkerCoprLeak") }) worker.batchSize = e.maxChunkSize @@ -511,7 +511,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, break } failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", func(v failpoint.Value) { - time.Sleep(time.Duration(v.(int))) + time.Sleep(time.Second * time.Duration(v.(int))) panic("testIndexMergePartialTableWorkerCoprLeak") }) tableReaderClosed = false From 56342311142752dcec05c08603c2199b778db469 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 22 Feb 2023 11:55:32 +0800 Subject: [PATCH 08/10] remove slice Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 2ba627b87f641..af5f76bee081f 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -361,11 +361,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, SetFromInfoSchema(e.ctx.GetInfoSchema()). SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.partialNetDataSizes[workID])) - selectResults := make([]distsql.SelectResult, 0, len(keyRanges)) + var notClosedSelectResult distsql.SelectResult defer func() { - // To make sure SelectResult.Close() is called even got panic in fetchHandles(). - for _, s := range selectResults { - terror.Call(s.Close) + if notClosedSelectResult != nil { + terror.Call(notClosedSelectResult.Close) } }() for parTblIdx, keyRange := range keyRanges { @@ -391,7 +390,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) return } - selectResults = append(selectResults, result) + notClosedSelectResult = result failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", func(v failpoint.Value) { time.Sleep(time.Second * time.Duration(v.(int))) panic("testIndexMergePartialIndexWorkerCoprLeak") @@ -413,7 +412,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } - selectResults = selectResults[:len(selectResults)-1] + notClosedSelectResult = nil cancel() e.ctx.StoreQueryFeedback(e.feedbacks[workID]) if fetchErr != nil { From a8445b7c45aae27fe2e493e1b8837c80e965db02 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 22 Feb 2023 12:39:34 +0800 Subject: [PATCH 09/10] fix Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index af5f76bee081f..10c25f4e6e0b2 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -409,10 +409,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } + notClosedSelectResult = nil if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } - notClosedSelectResult = nil cancel() e.ctx.StoreQueryFeedback(e.feedbacks[workID]) if fetchErr != nil { @@ -531,10 +531,10 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // release related resources cancel() + tableReaderClosed = true if err = worker.tableReader.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } - tableReaderClosed = true e.ctx.StoreQueryFeedback(e.feedbacks[workID]) if fetchErr != nil { break From 94066e4857c6af6dbc6b24997e4b6e6ad286f764 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 22 Feb 2023 14:39:50 +0800 Subject: [PATCH 10/10] fix build Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 11 +++-------- executor/index_merge_reader_test.go | 4 ++-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 10c25f4e6e0b2..e958f2fe43a94 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -363,6 +363,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, var notClosedSelectResult distsql.SelectResult defer func() { + // To make sure SelectResult.Close() is called even got panic in fetchHandles(). if notClosedSelectResult != nil { terror.Call(notClosedSelectResult.Close) } @@ -391,10 +392,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, return } notClosedSelectResult = result - failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", func(v failpoint.Value) { - time.Sleep(time.Second * time.Duration(v.(int))) - panic("testIndexMergePartialIndexWorkerCoprLeak") - }) + failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", nil) worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -509,10 +507,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) break } - failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", func(v failpoint.Value) { - time.Sleep(time.Second * time.Duration(v.(int))) - panic("testIndexMergePartialTableWorkerCoprLeak") - }) + failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", nil) tableReaderClosed = false worker.batchSize = e.maxChunkSize if worker.batchSize > worker.maxBatchSize { diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 47f5961760f3a..d30fce71a180e 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -903,12 +903,12 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { require.Contains(t, res[1][0], "IndexMerge") // If got goroutines leak in coprocessor, ci will fail. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", "return(3)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", `panic("testIndexMergePartialTableWorkerCoprLeak")`)) err = tk.QueryToErr(sql) require.Contains(t, err.Error(), "testIndexMergePartialTableWorkerCoprLeak") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak", "return(3)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak", `panic("testIndexMergePartialIndexWorkerCoprLeak")`)) err = tk.QueryToErr(sql) require.Contains(t, err.Error(), "testIndexMergePartialIndexWorkerCoprLeak") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak"))