From 42ff5e6ec63a549e9769526c0523ad0049b4b0fa Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Fri, 11 Nov 2022 10:47:54 +0800 Subject: [PATCH] executor: split hashjoin into workers(part1) (#39063) ref pingcap/tidb#39061 --- executor/benchmark_test.go | 10 +- executor/builder.go | 14 +-- executor/join.go | 242 +++++++++++++++++++------------------ 3 files changed, 137 insertions(+), 129 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 5ea706c81fc61..0cb1f7284159a 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -915,14 +915,16 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) probeKeys = append(probeKeys, cols1[keyIdx]) } e := &HashJoinExec{ - baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), + baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), + probeSideTupleFetcher: probeSideTupleFetcher{ + probeSideExec: outerExec, + }, concurrency: uint(testCase.concurrency), joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin isOuterJoin: false, buildKeys: joinKeys, probeKeys: probeKeys, buildSideExec: innerExec, - probeSideExec: outerExec, buildSideEstCount: float64(testCase.rows), useOuterToBuild: testCase.useOuterToBuild, } @@ -930,9 +932,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) - e.joiners = make([]joiner, e.concurrency) + e.probeWorker.joiners = make([]joiner, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues, + e.probeWorker.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues, nil, lhsTypes, rhsTypes, childrenUsedSchema, false) } memLimit := int64(-1) diff --git a/executor/builder.go b/executor/builder.go index 70d91e64137a4..5c1fd12535e3b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1419,25 +1419,25 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo // update the buildSideEstCount due to changing the build side if v.InnerChildIdx == 1 { e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.LeftConditions } else { e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.RightConditions leftIsBuildSide = false } if defaultValues == nil { - defaultValues = make([]types.Datum, e.probeSideExec.Schema().Len()) + defaultValues = make([]types.Datum, e.probeSideTupleFetcher.probeSideExec.Schema().Len()) } } else { if v.InnerChildIdx == 0 { e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.RightConditions } else { e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.LeftConditions leftIsBuildSide = false } @@ -1448,9 +1448,9 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo isNAJoin := len(v.LeftNAJoinKeys) > 0 e.buildSideEstCount = b.buildSideEstCount(v) childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) - e.joiners = make([]joiner, e.concurrency) + e.probeWorker.joiners = make([]joiner, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, + e.probeWorker.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin) } executorCountHashJoinExec.Inc() diff --git a/executor/join.go b/executor/join.go index 79f40c05db3a8..456d337830e78 100644 --- a/executor/join.go +++ b/executor/join.go @@ -46,11 +46,35 @@ var ( _ Executor = &NestedLoopApplyExec{} ) +// probeSideTupleFetcher reads tuples from probeSideExec and send them to probeWorkers. +type probeSideTupleFetcher struct { + probeSideExec Executor + probeChkResourceCh chan *probeChkResource + probeResultChs []chan *chunk.Chunk + requiredRows int64 +} + +type probeWorker struct { + // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently + buildSideRows [][]chunk.Row + buildSideRowPtrs [][]chunk.RowPtr + + // We build individual joiner for each join worker when use chunk-based + // execution, to avoid the concurrency of joiner.chk and joiner.selected. + joiners []joiner + rowIters []*chunk.Iterator4Slice + // for every naaj probe worker, pre-allocate the int slice for store the join column index to check. + needCheckBuildRowPos [][]int + needCheckProbeRowPos [][]int + rowContainerForProbe []*hashRowContainer +} + // HashJoinExec implements the hash join algorithm. type HashJoinExec struct { baseExecutor - probeSideExec Executor + probeSideTupleFetcher + probeWorker buildSideExec Executor buildSideEstCount float64 outerFilter expression.CNFExprs @@ -68,21 +92,13 @@ type HashJoinExec struct { buildFinished chan error // closeCh add a lock for closing executor. - closeCh chan struct{} - worker util.WaitGroupWrapper - waiter util.WaitGroupWrapper - joinType plannercore.JoinType - requiredRows int64 + closeCh chan struct{} + worker util.WaitGroupWrapper + waiter util.WaitGroupWrapper + joinType plannercore.JoinType - // We build individual joiner for each join worker when use chunk-based - // execution, to avoid the concurrency of joiner.chk and joiner.selected. - joiners []joiner - - probeChkResourceCh chan *probeChkResource - probeResultChs []chan *chunk.Chunk - joinChkResourceCh []chan *chunk.Chunk - joinResultCh chan *hashjoinWorkerResult - rowContainerForProbe []*hashRowContainer + joinChkResourceCh []chan *chunk.Chunk + joinResultCh chan *hashjoinWorkerResult memTracker *memory.Tracker // track memory usage. diskTracker *disk.Tracker // track disk usage. @@ -96,16 +112,6 @@ type HashJoinExec struct { finished atomic.Bool stats *hashJoinRuntimeStats - - // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently - buildSideRows [][]chunk.Row - buildSideRowPtrs [][]chunk.RowPtr - - // for every naaj probe worker, pre-allocate the int slice for store the join column index to check. - needCheckBuildRowPos [][]int - needCheckProbeRowPos [][]int - - rowIters []*chunk.Iterator4Slice } // probeChkResource stores the result of the join probe side fetch worker, @@ -139,27 +145,27 @@ func (e *HashJoinExec) Close() error { if e.joinResultCh != nil { channel.Clear(e.joinResultCh) } - if e.probeChkResourceCh != nil { - close(e.probeChkResourceCh) - channel.Clear(e.probeChkResourceCh) + if e.probeSideTupleFetcher.probeChkResourceCh != nil { + close(e.probeSideTupleFetcher.probeChkResourceCh) + channel.Clear(e.probeSideTupleFetcher.probeChkResourceCh) } - for i := range e.probeResultChs { - channel.Clear(e.probeResultChs[i]) + for i := range e.probeSideTupleFetcher.probeResultChs { + channel.Clear(e.probeSideTupleFetcher.probeResultChs[i]) } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) channel.Clear(e.joinChkResourceCh[i]) } - e.probeChkResourceCh = nil + e.probeSideTupleFetcher.probeChkResourceCh = nil e.joinChkResourceCh = nil terror.Call(e.rowContainer.Close) e.waiter.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] - e.buildSideRows = nil - e.buildSideRowPtrs = nil - e.needCheckBuildRowPos = nil - e.needCheckProbeRowPos = nil + e.probeWorker.buildSideRows = nil + e.probeWorker.buildSideRowPtrs = nil + e.probeWorker.needCheckBuildRowPos = nil + e.probeWorker.needCheckProbeRowPos = nil if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat } @@ -187,14 +193,14 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.finished.Store(false) if e.probeTypes == nil { - e.probeTypes = retTypes(e.probeSideExec) + e.probeTypes = retTypes(e.probeSideTupleFetcher.probeSideExec) } if e.buildTypes == nil { e.buildTypes = retTypes(e.buildSideExec) } if e.runtimeStats != nil { e.stats = &hashJoinRuntimeStats{ - concurrent: cap(e.joiners), + concurrent: cap(e.probeWorker.joiners), } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -215,17 +221,17 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { select { case <-e.closeCh: return - case probeSideResource, ok = <-e.probeChkResourceCh: + case probeSideResource, ok = <-e.probeSideTupleFetcher.probeChkResourceCh: if !ok { return } } probeSideResult := probeSideResource.chk if e.isOuterJoin { - required := int(atomic.LoadInt64(&e.requiredRows)) + required := int(atomic.LoadInt64(&e.probeSideTupleFetcher.requiredRows)) probeSideResult.SetRequiredRows(required, e.maxChunkSize) } - err := Next(ctx, e.probeSideExec, probeSideResult) + err := Next(ctx, e.probeSideTupleFetcher.probeSideExec, probeSideResult) failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { e.joinResultCh <- &hashjoinWorkerResult{ @@ -253,8 +259,8 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { } // after building is finished. the hash null bucket slice is allocated and determined. // copy it for multi probe worker. - for i := range e.rowContainerForProbe { - e.rowContainerForProbe[i].hashNANullBucket = e.rowContainer.hashNANullBucket + for i := range e.probeWorker.rowContainerForProbe { + e.probeWorker.rowContainerForProbe[i].hashNANullBucket = e.rowContainer.hashNANullBucket } hasWaitedForBuild = true } @@ -320,21 +326,21 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu } func (e *HashJoinExec) initializeForProbe() { - // e.probeResultChs is for transmitting the chunks which store the data of + // e.probeSideTupleFetcher.probeResultChs is for transmitting the chunks which store the data of // probeSideExec, it'll be written by probe side worker goroutine, and read by join // workers. - e.probeResultChs = make([]chan *chunk.Chunk, e.concurrency) + e.probeSideTupleFetcher.probeResultChs = make([]chan *chunk.Chunk, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeResultChs[i] = make(chan *chunk.Chunk, 1) + e.probeSideTupleFetcher.probeResultChs[i] = make(chan *chunk.Chunk, 1) } // e.probeChkResourceCh is for transmitting the used probeSideExec chunks from // join workers to probeSideExec worker. - e.probeChkResourceCh = make(chan *probeChkResource, e.concurrency) + e.probeSideTupleFetcher.probeChkResourceCh = make(chan *probeChkResource, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeChkResourceCh <- &probeChkResource{ - chk: newFirstChunk(e.probeSideExec), - dest: e.probeResultChs[i], + e.probeSideTupleFetcher.probeChkResourceCh <- &probeChkResource{ + chk: newFirstChunk(e.probeSideTupleFetcher.probeSideExec), + dest: e.probeSideTupleFetcher.probeResultChs[i], } } @@ -350,10 +356,10 @@ func (e *HashJoinExec) initializeForProbe() { // thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) - e.buildSideRows = make([][]chunk.Row, e.concurrency) - e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency) - e.needCheckBuildRowPos = make([][]int, e.concurrency) - e.needCheckProbeRowPos = make([][]int, e.concurrency) + e.probeWorker.buildSideRows = make([][]chunk.Row, e.concurrency) + e.probeWorker.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency) + e.probeWorker.needCheckBuildRowPos = make([][]int, e.concurrency) + e.probeWorker.needCheckProbeRowPos = make([][]int, e.concurrency) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { @@ -383,8 +389,8 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { } func (e *HashJoinExec) handleProbeSideFetcherPanic(r interface{}) { - for i := range e.probeResultChs { - close(e.probeResultChs[i]) + for i := range e.probeSideTupleFetcher.probeResultChs { + close(e.probeSideTupleFetcher.probeResultChs[i]) } if r != nil { e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)} @@ -414,7 +420,7 @@ func (e *HashJoinExec) handleUnmatchedRowsFromHashTable(workerID uint) { } for j := 0; j < chk.NumRows(); j++ { if !e.outerMatchedStatus[i].UnsafeIsSet(j) { // process unmatched outer rows - e.joiners[workerID].onMissMatch(false, chk.GetRow(j), joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, chk.GetRow(j), joinResult.chk) } if joinResult.chk.IsFull() { e.joinResultCh <- joinResult @@ -469,7 +475,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo // Read and filter probeSideResult, and join the probeSideResult with the build side rows. emptyProbeSideResult := &probeChkResource{ - dest: e.probeResultChs[workerID], + dest: e.probeSideTupleFetcher.probeResultChs[workerID], } hCtx := &hashContext{ allTypes: e.probeTypes, @@ -483,7 +489,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo select { case <-e.closeCh: return - case probeSideResult, ok = <-e.probeResultChs[workerID]: + case probeSideResult, ok = <-e.probeSideTupleFetcher.probeResultChs[workerID]: } failpoint.Inject("ConsumeRandomPanic", nil) if !ok { @@ -491,9 +497,9 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo } start := time.Now() if e.useOuterToBuild { - ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, e.rowContainerForProbe[workerID], joinResult) + ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, e.probeWorker.rowContainerForProbe[workerID], joinResult) } else { - ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, e.rowContainerForProbe[workerID], joinResult, selected) + ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, e.probeWorker.rowContainerForProbe[workerID], joinResult, selected) } probeTime += int64(time.Since(start)) if !ok { @@ -501,7 +507,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo } probeSideResult.Reset() emptyProbeSideResult.chk = probeSideResult - e.probeChkResourceCh <- emptyProbeSideResult + e.probeSideTupleFetcher.probeChkResourceCh <- emptyProbeSideResult } // note joinResult.chk may be nil when getNewJoinResult fails in loops if joinResult == nil { @@ -515,8 +521,8 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { var err error - e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], true) - buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID] + e.probeWorker.buildSideRows[workerID], e.probeWorker.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID], e.probeWorker.buildSideRowPtrs[workerID], true) + buildSideRows, rowsPtrs := e.probeWorker.buildSideRows[workerID], e.probeWorker.buildSideRowPtrs[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -525,12 +531,12 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui return true, joinResult } - iter := e.rowIters[workerID] + iter := e.probeWorker.rowIters[workerID] iter.Reset(buildSideRows) var outerMatchStatus []outerRowStatusFlag rowIdx, ok := 0, false for iter.Begin(); iter.Current() != iter.End(); { - outerMatchStatus, err = e.joiners[workerID].tryToMatchOuters(iter, probeSideRow, joinResult.chk, outerMatchStatus) + outerMatchStatus, err = e.probeWorker.joiners[workerID].tryToMatchOuters(iter, probeSideRow, joinResult.chk, outerMatchStatus) if err != nil { joinResult.err = err return false, joinResult @@ -564,17 +570,17 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // because AntiLeftOuterSemiJoin cares about the scalar value. If we both have a match from null // bucket and same key bucket, we should return the result as from same-key bucket // rather than from null bucket. - e.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID]) - buildSideRows := e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID]) + buildSideRows := e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.rowIters[workerID] + iter1 := e.probeWorker.rowIters[workerID] iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftNotNullRightNotNull) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftNotNullRightNotNull) if err != nil { joinResult.err = err return false, joinResult @@ -594,8 +600,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe } } // step2: match the null bucket secondly. - e.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.buildSideRows[workerID], e.needCheckBuildRowPos[workerID], e.needCheckProbeRowPos[workerID]) - buildSideRows = e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) + buildSideRows = e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -603,13 +609,13 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe if len(buildSideRows) == 0 { // when reach here, it means we couldn't find a valid same key match from same-key bucket yet // and the null bucket is empty. so the result should be . - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.rowIters[workerID] + iter2 := e.probeWorker.rowIters[workerID] iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftNotNullRightHasNull) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftNotNullRightHasNull) if err != nil { joinResult.err = err return false, joinResult @@ -631,7 +637,7 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // case1: x NOT IN (empty set): if other key bucket don't have the valid rows yet. // case2: x NOT IN (l,m,n...): if other key bucket do have the valid rows. // both cases mean the result should be - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } // when left side has null values, all we want is to find a valid build side rows (past other condition) @@ -639,17 +645,17 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // case1: NOT IN (empty set): ----------------------> result is . // case2: NOT IN (at least a valid inner row) ------------------> result is . // Step1: match null bucket (assumption that null bucket is quite smaller than all hash table bucket rows) - e.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.buildSideRows[workerID], e.needCheckBuildRowPos[workerID], e.needCheckProbeRowPos[workerID]) - buildSideRows := e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) + buildSideRows := e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.rowIters[workerID] + iter1 := e.probeWorker.rowIters[workerID] iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftHasNullRightHasNull) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftHasNullRightHasNull) if err != nil { joinResult.err = err return false, joinResult @@ -669,8 +675,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe } } // Step2: match all hash table bucket build rows (use probeKeyNullBits to filter if any). - e.buildSideRows[workerID], err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.buildSideRows[workerID], e.needCheckBuildRowPos[workerID], e.needCheckProbeRowPos[workerID]) - buildSideRows = e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) + buildSideRows = e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -678,13 +684,13 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe if len(buildSideRows) == 0 { // when reach here, it means we couldn't return it quickly in null bucket, and same-bucket is empty, // which means x NOT IN (empty set) or x NOT IN (l,m,n), the result should be - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.rowIters[workerID] + iter2 := e.probeWorker.rowIters[workerID] iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftHasNullRightNotNull) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftHasNullRightNotNull) if err != nil { joinResult.err = err return false, joinResult @@ -705,7 +711,7 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // step3: if we couldn't return it quickly in null bucket and all hash bucket, here means only one cases: // case1: NOT IN (empty set): // empty set comes from no rows from all bucket can pass other condition. the result should be - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } @@ -719,17 +725,17 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey if probeKeyNullBits == nil { // step1: match null bucket first. // need fetch the "valid" rows every time. (nullBits map check is necessary) - e.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.buildSideRows[workerID], e.needCheckBuildRowPos[workerID], e.needCheckProbeRowPos[workerID]) - buildSideRows := e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) + buildSideRows := e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.rowIters[workerID] + iter1 := e.probeWorker.rowIters[workerID] iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -749,8 +755,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey } } // step2: then same key bucket. - e.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID]) - buildSideRows = e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID]) + buildSideRows = e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -758,13 +764,13 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey if len(buildSideRows) == 0 { // when reach here, it means we couldn't return it quickly in null bucket, and same-bucket is empty, // which means x NOT IN (empty set), accept the rhs row. - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.rowIters[workerID] + iter2 := e.probeWorker.rowIters[workerID] iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -786,7 +792,7 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey // case1: x NOT IN (empty set): if other key bucket don't have the valid rows yet. // case2: x NOT IN (l,m,n...): if other key bucket do have the valid rows. // both cases should accept the rhs row. - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } // when left side has null values, all we want is to find a valid build side rows (passed from other condition) @@ -794,17 +800,17 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey // case1: NOT IN (empty set): ----------------------> accept rhs row. // case2: NOT IN (at least a valid inner row) ------------------> unknown result, refuse rhs row. // Step1: match null bucket (assumption that null bucket is quite smaller than all hash table bucket rows) - e.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.buildSideRows[workerID], e.needCheckBuildRowPos[workerID], e.needCheckProbeRowPos[workerID]) - buildSideRows := e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) + buildSideRows := e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.rowIters[workerID] + iter1 := e.probeWorker.rowIters[workerID] iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -824,8 +830,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey } } // Step2: match all hash table bucket build rows. - e.buildSideRows[workerID], err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.buildSideRows[workerID], e.needCheckBuildRowPos[workerID], e.needCheckProbeRowPos[workerID]) - buildSideRows = e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) + buildSideRows = e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -833,13 +839,13 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey if len(buildSideRows) == 0 { // when reach here, it means we couldn't return it quickly in null bucket, and same-bucket is empty, // which means NOT IN (empty set) or NOT IN (no valid rows) accept the rhs row. - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.rowIters[workerID] + iter2 := e.probeWorker.rowIters[workerID] iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk) + matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -860,7 +866,7 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey // step3: if we couldn't return it quickly in null bucket and all hash bucket, here means only one cases: // case1: NOT IN (empty set): // empty set comes from no rows from all bucket can pass other condition. we should accept the rhs row. - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } @@ -900,21 +906,21 @@ func (e *HashJoinExec) joinNAAJMatchProbeSideRow2Chunk(workerID uint, probeKey u func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { var err error - e.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID]) - buildSideRows := e.buildSideRows[workerID] + e.probeWorker.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID]) + buildSideRows := e.probeWorker.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) == 0 { - e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter := e.rowIters[workerID] + iter := e.probeWorker.rowIters[workerID] iter.Reset(buildSideRows) hasMatch, hasNull, ok := false, false, false for iter.Begin(); iter.Current() != iter.End(); { - matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk) + matched, isNull, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -931,7 +937,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin } } if !hasMatch { - e.joiners[workerID].onMissMatch(hasNull, probeSideRow, joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(hasNull, probeSideRow, joinResult.chk) } return true, joinResult } @@ -1005,7 +1011,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx if isNAAJ { if !selected[i] { // since this is the case of using inner to build, so for an outer row unselected, we should fill the result when it's outer join. - e.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) } if hCtx.naHasNull[i] { // here means the probe join connecting column has null value in it and this is special for matching all the hash buckets @@ -1027,7 +1033,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx } else { // since this is the case of using inner to build, so for an outer row unselected, we should fill the result when it's outer join. if !selected[i] || hCtx.hasNull[i] { // process unmatched probe side rows - e.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) + e.probeWorker.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) } else { // process matched probe side rows probeKey, probeRow := hCtx.hashVals[i].Sum64(), probeSideChk.GetRow(i) ok, joinResult = e.joinMatchedProbeSideRow2Chunk(workerID, probeKey, probeRow, hCtx, rowContainer, joinResult) @@ -1106,16 +1112,16 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { } e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec)) // we shallow copies rowContainer for each probe worker to avoid lock contention - e.rowContainerForProbe = make([]*hashRowContainer, e.concurrency) + e.probeWorker.rowContainerForProbe = make([]*hashRowContainer, e.concurrency) for i := uint(0); i < e.concurrency; i++ { if i == 0 { - e.rowContainerForProbe[i] = e.rowContainer + e.probeWorker.rowContainerForProbe[i] = e.rowContainer } else { - e.rowContainerForProbe[i] = e.rowContainer.ShallowCopy() + e.probeWorker.rowContainerForProbe[i] = e.rowContainer.ShallowCopy() } } for i := uint(0); i < e.concurrency; i++ { - e.rowIters = append(e.rowIters, chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice)) + e.probeWorker.rowIters = append(e.probeWorker.rowIters, chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice)) } e.worker.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End() @@ -1125,7 +1131,7 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { e.prepared = true } if e.isOuterJoin { - atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) + atomic.StoreInt64(&e.probeSideTupleFetcher.requiredRows, int64(req.RequiredRows())) } req.Reset()