Skip to content

Commit

Permalink
executor: fix wrong outer join result when filter outer side using in…
Browse files Browse the repository at this point in the history
…dex merge join (#20407) (#20427)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Oct 15, 2020
1 parent c710000 commit 8968d39
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
22 changes: 12 additions & 10 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type innerMergeCtx struct {

type lookUpMergeJoinTask struct {
outerResult *chunk.List
outerMatch [][]bool
outerOrderIdx []chunk.RowPtr

innerResult *chunk.Chunk
Expand Down Expand Up @@ -432,25 +433,23 @@ func (imw *innerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancel

func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJoinTask) (err error) {
numOuterChks := task.outerResult.NumChunks()
var outerMatch [][]bool
if imw.outerMergeCtx.filter != nil {
outerMatch = make([][]bool, numOuterChks)
task.outerMatch = make([][]bool, numOuterChks)
for i := 0; i < numOuterChks; i++ {
chk := task.outerResult.GetChunk(i)
outerMatch[i] = make([]bool, chk.NumRows())
outerMatch[i], err = expression.VectorizedFilter(imw.ctx, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), outerMatch[i])
task.outerMatch[i] = make([]bool, chk.NumRows())
task.outerMatch[i], err = expression.VectorizedFilter(imw.ctx, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), task.outerMatch[i])
if err != nil {
return err
}
}
}
task.memTracker.Consume(int64(cap(task.outerMatch)))
task.outerOrderIdx = make([]chunk.RowPtr, 0, task.outerResult.Len())
for i := 0; i < numOuterChks; i++ {
numRow := task.outerResult.GetChunk(i).NumRows()
for j := 0; j < numRow; j++ {
if len(outerMatch) == 0 || outerMatch[i][j] {
task.outerOrderIdx = append(task.outerOrderIdx, chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)})
}
task.outerOrderIdx = append(task.outerOrderIdx, chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)})
}
}
task.memTracker.Consume(int64(cap(task.outerOrderIdx)))
Expand Down Expand Up @@ -653,8 +652,11 @@ func (imw *innerMergeWorker) constructDatumLookupKeys(task *lookUpMergeJoinTask)
return dLookUpKeys, nil
}

func (imw *innerMergeWorker) constructDatumLookupKey(task *lookUpMergeJoinTask, rowIdx chunk.RowPtr) (*indexJoinLookUpContent, error) {
outerRow := task.outerResult.GetRow(rowIdx)
func (imw *innerMergeWorker) constructDatumLookupKey(task *lookUpMergeJoinTask, idx chunk.RowPtr) (*indexJoinLookUpContent, error) {
if task.outerMatch != nil && !task.outerMatch[idx.ChkIdx][idx.RowIdx] {
return nil, nil
}
outerRow := task.outerResult.GetRow(idx)
sc := imw.ctx.GetSessionVars().StmtCtx
keyLen := len(imw.keyCols)
dLookupKey := make([]types.Datum, 0, keyLen)
Expand Down Expand Up @@ -688,7 +690,7 @@ func (imw *innerMergeWorker) constructDatumLookupKey(task *lookUpMergeJoinTask,
}
dLookupKey = append(dLookupKey, innerValue)
}
return &indexJoinLookUpContent{keys: dLookupKey, row: task.outerResult.GetRow(rowIdx)}, nil
return &indexJoinLookUpContent{keys: dLookupKey, row: task.outerResult.GetRow(idx)}, nil
}

func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLookUpContent) []*indexJoinLookUpContent {
Expand Down
12 changes: 12 additions & 0 deletions executor/index_lookup_merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,15 @@ func (s *testSuiteAgg) TestIndexJoinOnSinglePartitionTable(c *C) {
rows = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexJoin"), Equals, 0)
}

func (s *testSuite9) TestIssue20400(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t, s")
tk.MustExec("create table s(a int, index(a))")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustQuery("select /*+ hash_join(t,s)*/ * from t left join s on t.a=s.a and t.a>1").Check(
testkit.Rows("1 <nil>"))
tk.MustQuery("select /*+ inl_merge_join(t,s)*/ * from t left join s on t.a=s.a and t.a>1").Check(
testkit.Rows("1 <nil>"))
}

0 comments on commit 8968d39

Please sign in to comment.