Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#41610
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed Feb 22, 2023
1 parent 73d82e3 commit b3a3e46
Show file tree
Hide file tree
Showing 2 changed files with 333 additions and 0 deletions.
24 changes: 24 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetMemTracker(e.memTracker).
SetFromInfoSchema(e.ctx.GetInfoSchema())

var notClosedSelectResult distsql.SelectResult
defer func() {
// To make sure SelectResult.Close() is called even got panic in fetchHandles().
if notClosedSelectResult != nil {
terror.Call(notClosedSelectResult.Close)
}
}()
for parTblIdx, keyRange := range keyRanges {
// check if this executor is closed
select {
Expand All @@ -331,6 +338,8 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
worker.syncErr(e.resultCh, err)
return
}
notClosedSelectResult = result
failpoint.Inject("testIndexMergePartialIndexWorkerCoprLeak", nil)
worker.batchSize = e.maxChunkSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand All @@ -345,6 +354,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again
e.feedbacks[workID].Invalidate()
}
notClosedSelectResult = nil
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
}
Expand Down Expand Up @@ -409,7 +419,18 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
partialTableReader.dagPB = e.dagPBs[workID]
}

<<<<<<< HEAD
for _, tbl := range tbls {
=======
var tableReaderClosed bool
defer func() {
// To make sure SelectResult.Close() is called even got panic in fetchHandles().
if !tableReaderClosed {
terror.Call(worker.tableReader.Close)
}
}()
for parTblIdx, tbl := range tbls {
>>>>>>> 4aa3a95f9f (exeuctor: fix coprocessor goroutine leak for IndexMerge (#41610))
// check if this executor is closed
select {
case <-e.finished:
Expand All @@ -424,6 +445,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
worker.syncErr(e.resultCh, err)
break
}
failpoint.Inject("testIndexMergePartialTableWorkerCoprLeak", nil)
tableReaderClosed = false
worker.batchSize = e.maxChunkSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand All @@ -441,6 +464,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,

// release related resources
cancel()
tableReaderClosed = true
if err = worker.tableReader.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
}
Expand Down
309 changes: 309 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,312 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) {

// TODO: add support for index merge reader in dynamic tidb_partition_prune_mode
}
<<<<<<< HEAD
=======

func TestIndexMergeIntersectionConcurrency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

// Default is tidb_executor_concurrency.
res = tk.MustQuery("select @@tidb_executor_concurrency;").Sort().Rows()
defExecCon := res[0][0].(string)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", fmt.Sprintf("return(%s)", defExecCon)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency"))
}()
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

tk.MustExec("set tidb_executor_concurrency = 10")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
// workerCnt = min(part_num, concurrency)
tk.MustExec("set tidb_executor_concurrency = 20")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_executor_concurrency = 2")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(2)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(9)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_index_merge_intersection_concurrency = 21")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_index_merge_intersection_concurrency = 3")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

// Concurrency only works for dynamic pruning partition table, so real concurrency is 1.
tk.MustExec("set tidb_partition_prune_mode = 'static'")
tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

// Concurrency only works for dynamic pruning partition table. so real concurrency is 1.
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
}

func TestIntersectionWithDifferentConcurrency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var execCon []int
tblSchemas := []string{
// partition table
"create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;",
// non-partition table
"create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));",
}

for tblIdx, tblSchema := range tblSchemas {
if tblIdx == 0 {
// Test different intersectionProcessWorker with partition table(10 partitions).
execCon = []int{1, 3, 10, 11, 20}
} else {
// Default concurrency.
execCon = []int{5}
}
tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
tk.MustExec(tblSchema)

const queryCnt int = 10
const rowCnt int = 1000
curRowCnt := 0
insertStr := "insert into t1 values"
for i := 0; i < rowCnt; i++ {
if i != 0 {
insertStr += ", "
}
insertStr += fmt.Sprintf("(%d, %d, %d)", i, rand.Int(), rand.Int())
curRowCnt++
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1")

for _, concurrency := range execCon {
tk.MustExec(fmt.Sprintf("set tidb_executor_concurrency = %d", concurrency))
for i := 0; i < 2; i++ {
if i == 0 {
// Dynamic mode.
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
require.Contains(t, res.Rows()[1][0], "IndexMerge")
} else {
tk.MustExec("set tidb_partition_prune_mode = 'static'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
if tblIdx == 0 {
// partition table
require.Contains(t, res.Rows()[1][0], "PartitionUnion")
require.Contains(t, res.Rows()[2][0], "IndexMerge")
} else {
require.Contains(t, res.Rows()[1][0], "IndexMerge")
}
}
for i := 0; i < queryCnt; i++ {
c3 := rand.Intn(1024)
res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows()
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res)
}

// In tranaction
for i := 0; i < queryCnt; i++ {
tk.MustExec("begin;")
r := rand.Intn(3)
if r == 0 {
tk.MustExec(fmt.Sprintf("update t1 set c3 = %d where c1 = %d", rand.Int(), rand.Intn(rowCnt)))
} else if r == 1 {
tk.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", rand.Intn(rowCnt)))
} else if r == 2 {
tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", curRowCnt, rand.Int(), rand.Int()))
curRowCnt++
}
c3 := rand.Intn(1024)
res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows()
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res)
tk.MustExec("commit;")
}
}
}
tk.MustExec("drop table t1")
}
}

func TestIntersectionWorkerPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

// Test panic in intersection.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", `panic("testIndexMergeIntersectionWorkerPanic")`))
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
require.Contains(t, err.Error(), "testIndexMergeIntersectionWorkerPanic")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic"))
}

func TestIntersectionMemQuota(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(pk varchar(100) primary key, c1 int, c2 int, index idx1(c1), index idx2(c2))")

insertStr := "insert into t1 values"
for i := 0; i < 20; i++ {
if i != 0 {
insertStr += ", "
}
insertStr += fmt.Sprintf("('%s', %d, %d)", testutil.RandStringRunes(100), 1, 1)
}
tk.MustExec(insertStr)
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

tk.MustExec("set global tidb_mem_oom_action='CANCEL'")
defer tk.MustExec("set global tidb_mem_oom_action = DEFAULT")
tk.MustExec("set @@tidb_mem_quota_query = 4000")
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024")
require.Contains(t, err.Error(), "Out Of Memory Quota!")
}

func TestIndexMergePanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
tk.MustExec("insert into t1 values(1, 1, 1), (100, 100, 100)")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly", "return(true)"))
tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly"))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")

minV := 200
maxV := 1000
runSQL := func(fp string) {
var sql string
v1 := rand.Intn(maxV-minV) + minV
v2 := rand.Intn(maxV-minV) + minV
if !strings.Contains(fp, "Intersection") {
sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2)
} else {
sql = fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c3 < %d and c2 < %d", v1, v2)
}
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")
err := tk.QueryToErr(sql)
require.Contains(t, err.Error(), fp)
}

packagePath := "github.com/pingcap/tidb/executor/"
panicFPPaths := []string{
packagePath + "testIndexMergePanicPartialIndexWorker",
packagePath + "testIndexMergePanicPartialTableWorker",

packagePath + "testIndexMergePanicProcessWorkerUnion",
packagePath + "testIndexMergePanicProcessWorkerIntersection",
packagePath + "testIndexMergePanicPartitionTableIntersectionWorker",

packagePath + "testIndexMergePanicTableScanWorker",
}
for _, fp := range panicFPPaths {
fmt.Println("handling failpoint: ", fp)
if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") {
// When mockSleepBeforeStartTableReader is enabled, will not read real data. This is to avoid leaking goroutines in coprocessor.
// But should disable mockSleepBeforeStartTableReader for testIndexMergePanicTableScanWorker.
// Because finalTableScanWorker need task.doneCh to pass error, so need partialIndexWorker/partialTableWorker runs normally.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader", "return(1000)"))
}
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
runSQL(fp)
require.NoError(t, failpoint.Disable(fp))
}
if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader"))
}
}

errFPPaths := []string{
packagePath + "testIndexMergeErrorPartialIndexWorker",
packagePath + "testIndexMergeErrorPartialTableWorker",
}
for _, fp := range errFPPaths {
fmt.Println("handling failpoint: ", fp)
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`return("%s")`, fp)))
for i := 0; i < 100; i++ {
runSQL(fp)
}
require.NoError(t, failpoint.Disable(fp))
}
}

func TestIndexMergeCoprGoroutinesLeak(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")

var err error
sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;"
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")

// If got goroutines leak in coprocessor, ci will fail.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak", `panic("testIndexMergePartialTableWorkerCoprLeak")`))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergePartialTableWorkerCoprLeak")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialTableWorkerCoprLeak"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak", `panic("testIndexMergePartialIndexWorkerCoprLeak")`))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergePartialIndexWorkerCoprLeak")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak"))
}
>>>>>>> 4aa3a95f9f (exeuctor: fix coprocessor goroutine leak for IndexMerge (#41610))

0 comments on commit b3a3e46

Please sign in to comment.