From c27f8f697fc5b409af08ea9e6032f6fe3a744d28 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Fri, 7 Jan 2022 15:14:37 +0800 Subject: [PATCH] executor: return error as expected when indexHashJoin occur error or panic in handleTask (#31323) close pingcap/tidb#31129 --- executor/index_lookup_hash_join.go | 70 ++++++++++++++++++++++-------- executor/join_test.go | 49 +++++++++++++++++++++ 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 056e321b8a294..1e0e6282cc77b 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -343,7 +343,10 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { if err != nil { task = &indexHashJoinTask{err: err} if ow.keepOuterOrder { - task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 1) + // The outerBuilder and innerFetcher run concurrently, we may + // get 2 errors at simultaneously. Thus the capacity of task.resultCh + // needs to be initialized to 2 to avoid waiting. + task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 2) ow.pushToChan(ctx, task, ow.taskCh) } ow.pushToChan(ctx, task, ow.innerCh) @@ -520,19 +523,26 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() { joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr") }) - if joinResult.err != nil { - resultCh <- joinResult - return - } - // When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last - // joinResult will be checked when the a task has been processed, thus we do - // not need to check it here again. - if resultCh == iw.resultCh && joinResult.chk != nil && joinResult.chk.NumRows() > 0 { - select { - case resultCh <- joinResult: - case <-ctx.Done(): + // When task.keepOuterOrder is TRUE (resultCh != iw.resultCh): + // - the last joinResult will be handled when the task has been processed, + // thus we DO NOT need to check it here again. + // - we DO NOT check the error here neither, because: + // - if the error is from task.err, the main thread will check the error of each task + // - if the error is from handleTask, the error will be handled in handleTask + // We should not check `task != nil && !task.keepOuterOrder` here since it's + // possible that `join.chk.NumRows > 0` is true even if task == nil. + if resultCh == iw.resultCh { + if joinResult.err != nil { + resultCh <- joinResult return } + if joinResult.chk != nil && joinResult.chk.NumRows() > 0 { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return + } + } } } @@ -550,6 +560,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde } func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { + failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil) if iw.stats != nil { start := time.Now() defer func() { @@ -599,19 +610,26 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents) } -func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) { +func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(resultCh chan *indexHashJoinResult, err error) { defer func() { iw.wg.Done() iw.lookup.workerWg.Done() }() - if r != nil { - iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)} + if err != nil { + resultCh <- &indexHashJoinResult{err: err} } } -func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) { defer func() { iw.memTracker.Consume(-iw.memTracker.BytesConsumed()) + if task.keepOuterOrder { + if err != nil { + joinResult.err = err + resultCh <- joinResult + } + close(resultCh) + } }() var joinStartTime time.Time if iw.stats != nil { @@ -631,9 +649,21 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH iw.lookup.workerWg.Add(1) iw.buildHashTableForOuterResult(ctx, task, h) }, - iw.handleHashJoinInnerWorkerPanic) - err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) + func(r interface{}) { + var err error + if r != nil { + err = errors.Errorf("%v", r) + } + iw.handleHashJoinInnerWorkerPanic(resultCh, err) + }, + ) + err = iw.fetchInnerResults(ctx, task.lookUpJoinTask) iw.wg.Wait() + // check error after wg.Wait to make sure error message can be sent to + // resultCh even if panic happen in buildHashTableForOuterResult. + failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() { + err = errors.New("IndexHashJoinFetchInnerResultsErr") + }) if err != nil { return err } @@ -783,13 +813,15 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind joinResult.src <- joinResult.chk } } - close(resultCh) }() for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ { for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ { row := chk.GetRow(j) ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} err = iw.collectMatchedInnerPtrs4OuterRows(ctx, row, ptr, task, h, iw.joinKeyBuf) + failpoint.Inject("TestIssue31129", func() { + err = errors.New("TestIssue31129") + }) if err != nil { return err } diff --git a/executor/join_test.go b/executor/join_test.go index a11e71332d07f..d607a9b3062da 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2679,3 +2679,52 @@ func (s *testSuiteJoinSerial) TestIssue30211(c *C) { err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) } + +func (s *testSuiteJoinSerial) TestIssue31129(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_init_chunk_size=2") + tk.MustExec("set @@tidb_index_join_batch_size=10") + tk.MustExec("DROP TABLE IF EXISTS t, s") + tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly + tk.MustExec("create table t(pk int primary key, a int)") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%d, %d)", i, i)) + } + tk.MustExec("create table s(a int primary key)") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into s values(%d)", i)) + } + tk.MustExec("analyze table t") + tk.MustExec("analyze table s") + + // Test IndexNestedLoopHashJoin keepOrder. + fpName := "github.com/pingcap/tidb/executor/TestIssue31129" + c.Assert(failpoint.Enable(fpName, "return"), IsNil) + err := tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk") + c.Assert(strings.Contains(err.Error(), "TestIssue31129"), IsTrue) + c.Assert(failpoint.Disable(fpName), IsNil) + + // Test IndexNestedLoopHashJoin build hash table panic. + fpName = "github.com/pingcap/tidb/executor/IndexHashJoinBuildHashTablePanic" + c.Assert(failpoint.Enable(fpName, `panic("IndexHashJoinBuildHashTablePanic")`), IsNil) + err = tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk") + c.Assert(strings.Contains(err.Error(), "IndexHashJoinBuildHashTablePanic"), IsTrue) + c.Assert(failpoint.Disable(fpName), IsNil) + + // Test IndexNestedLoopHashJoin fetch inner fail. + fpName = "github.com/pingcap/tidb/executor/IndexHashJoinFetchInnerResultsErr" + c.Assert(failpoint.Enable(fpName, "return"), IsNil) + err = tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk") + c.Assert(strings.Contains(err.Error(), "IndexHashJoinFetchInnerResultsErr"), IsTrue) + c.Assert(failpoint.Disable(fpName), IsNil) + + // Test IndexNestedLoopHashJoin build hash table panic and IndexNestedLoopHashJoin fetch inner fail at the same time. + fpName1, fpName2 := "github.com/pingcap/tidb/executor/IndexHashJoinBuildHashTablePanic", "github.com/pingcap/tidb/executor/IndexHashJoinFetchInnerResultsErr" + c.Assert(failpoint.Enable(fpName1, `panic("IndexHashJoinBuildHashTablePanic")`), IsNil) + c.Assert(failpoint.Enable(fpName2, "return"), IsNil) + err = tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk") + c.Assert(strings.Contains(err.Error(), "IndexHashJoinBuildHashTablePanic"), IsTrue) + c.Assert(failpoint.Disable(fpName1), IsNil) + c.Assert(failpoint.Disable(fpName2), IsNil) +}