Skip to content

Commit

Permalink
executor: return error as expected when indexHashJoin occur error or …
Browse files Browse the repository at this point in the history
…panic in handleTask (#31323)

close #31129
  • Loading branch information
XuHuaiyu authored Jan 7, 2022
1 parent 7a5b715 commit c27f8f6
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 19 deletions.
70 changes: 51 additions & 19 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
if err != nil {
task = &indexHashJoinTask{err: err}
if ow.keepOuterOrder {
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 1)
// The outerBuilder and innerFetcher run concurrently, we may
// get 2 errors at simultaneously. Thus the capacity of task.resultCh
// needs to be initialized to 2 to avoid waiting.
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 2)
ow.pushToChan(ctx, task, ow.taskCh)
}
ow.pushToChan(ctx, task, ow.innerCh)
Expand Down Expand Up @@ -520,19 +523,26 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() {
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
})
if joinResult.err != nil {
resultCh <- joinResult
return
}
// When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last
// joinResult will be checked when the a task has been processed, thus we do
// not need to check it here again.
if resultCh == iw.resultCh && joinResult.chk != nil && joinResult.chk.NumRows() > 0 {
select {
case resultCh <- joinResult:
case <-ctx.Done():
// When task.keepOuterOrder is TRUE (resultCh != iw.resultCh):
// - the last joinResult will be handled when the task has been processed,
// thus we DO NOT need to check it here again.
// - we DO NOT check the error here neither, because:
// - if the error is from task.err, the main thread will check the error of each task
// - if the error is from handleTask, the error will be handled in handleTask
// We should not check `task != nil && !task.keepOuterOrder` here since it's
// possible that `join.chk.NumRows > 0` is true even if task == nil.
if resultCh == iw.resultCh {
if joinResult.err != nil {
resultCh <- joinResult
return
}
if joinResult.chk != nil && joinResult.chk.NumRows() > 0 {
select {
case resultCh <- joinResult:
case <-ctx.Done():
return
}
}
}
}

Expand All @@ -550,6 +560,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil)
if iw.stats != nil {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -599,19 +610,26 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task
return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents)
}

func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) {
func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(resultCh chan *indexHashJoinResult, err error) {
defer func() {
iw.wg.Done()
iw.lookup.workerWg.Done()
}()
if r != nil {
iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)}
if err != nil {
resultCh <- &indexHashJoinResult{err: err}
}
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
defer func() {
iw.memTracker.Consume(-iw.memTracker.BytesConsumed())
if task.keepOuterOrder {
if err != nil {
joinResult.err = err
resultCh <- joinResult
}
close(resultCh)
}
}()
var joinStartTime time.Time
if iw.stats != nil {
Expand All @@ -631,9 +649,21 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
iw.lookup.workerWg.Add(1)
iw.buildHashTableForOuterResult(ctx, task, h)
},
iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
func(r interface{}) {
var err error
if r != nil {
err = errors.Errorf("%v", r)
}
iw.handleHashJoinInnerWorkerPanic(resultCh, err)
},
)
err = iw.fetchInnerResults(ctx, task.lookUpJoinTask)
iw.wg.Wait()
// check error after wg.Wait to make sure error message can be sent to
// resultCh even if panic happen in buildHashTableForOuterResult.
failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() {
err = errors.New("IndexHashJoinFetchInnerResultsErr")
})
if err != nil {
return err
}
Expand Down Expand Up @@ -783,13 +813,15 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind
joinResult.src <- joinResult.chk
}
}
close(resultCh)
}()
for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ {
for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ {
row := chk.GetRow(j)
ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
err = iw.collectMatchedInnerPtrs4OuterRows(ctx, row, ptr, task, h, iw.joinKeyBuf)
failpoint.Inject("TestIssue31129", func() {
err = errors.New("TestIssue31129")
})
if err != nil {
return err
}
Expand Down
49 changes: 49 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2679,3 +2679,52 @@ func (s *testSuiteJoinSerial) TestIssue30211(c *C) {
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error()
c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue)
}

func (s *testSuiteJoinSerial) TestIssue31129(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_init_chunk_size=2")
tk.MustExec("set @@tidb_index_join_batch_size=10")
tk.MustExec("DROP TABLE IF EXISTS t, s")
tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly
tk.MustExec("create table t(pk int primary key, a int)")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%d, %d)", i, i))
}
tk.MustExec("create table s(a int primary key)")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into s values(%d)", i))
}
tk.MustExec("analyze table t")
tk.MustExec("analyze table s")

// Test IndexNestedLoopHashJoin keepOrder.
fpName := "github.com/pingcap/tidb/executor/TestIssue31129"
c.Assert(failpoint.Enable(fpName, "return"), IsNil)
err := tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk")
c.Assert(strings.Contains(err.Error(), "TestIssue31129"), IsTrue)
c.Assert(failpoint.Disable(fpName), IsNil)

// Test IndexNestedLoopHashJoin build hash table panic.
fpName = "github.com/pingcap/tidb/executor/IndexHashJoinBuildHashTablePanic"
c.Assert(failpoint.Enable(fpName, `panic("IndexHashJoinBuildHashTablePanic")`), IsNil)
err = tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk")
c.Assert(strings.Contains(err.Error(), "IndexHashJoinBuildHashTablePanic"), IsTrue)
c.Assert(failpoint.Disable(fpName), IsNil)

// Test IndexNestedLoopHashJoin fetch inner fail.
fpName = "github.com/pingcap/tidb/executor/IndexHashJoinFetchInnerResultsErr"
c.Assert(failpoint.Enable(fpName, "return"), IsNil)
err = tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk")
c.Assert(strings.Contains(err.Error(), "IndexHashJoinFetchInnerResultsErr"), IsTrue)
c.Assert(failpoint.Disable(fpName), IsNil)

// Test IndexNestedLoopHashJoin build hash table panic and IndexNestedLoopHashJoin fetch inner fail at the same time.
fpName1, fpName2 := "github.com/pingcap/tidb/executor/IndexHashJoinBuildHashTablePanic", "github.com/pingcap/tidb/executor/IndexHashJoinFetchInnerResultsErr"
c.Assert(failpoint.Enable(fpName1, `panic("IndexHashJoinBuildHashTablePanic")`), IsNil)
c.Assert(failpoint.Enable(fpName2, "return"), IsNil)
err = tk.QueryToErr("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk")
c.Assert(strings.Contains(err.Error(), "IndexHashJoinBuildHashTablePanic"), IsTrue)
c.Assert(failpoint.Disable(fpName1), IsNil)
c.Assert(failpoint.Disable(fpName2), IsNil)
}

0 comments on commit c27f8f6

Please sign in to comment.