Skip to content

Commit

Permalink
planner: fix join resolveIndex won't find its column from children sc…
Browse files Browse the repository at this point in the history
…hema and amend join's lused and rused logic for reversed column ref from join schema to its children (#51258)

close #42588
  • Loading branch information
ti-chi-bot authored Feb 23, 2024
1 parent e73d335 commit 206723f
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 141 deletions.
100 changes: 86 additions & 14 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package executor

import (
"cmp"
"context"
"encoding/base64"
"fmt"
"math/rand"
"os"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -884,6 +886,44 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
return tc
}

func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType core.JoinType) *expression.Schema {
colsNeedResolving := joinSchema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if joinType == core.LeftOuterSemiJoin || joinType == core.AntiLeftOuterSemiJoin {
colsNeedResolving--
}
mergedSchema := expression.MergeSchema(lSchema, rSchema)
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, joinSchema.Len())
copy(shallowColSlice, joinSchema.Columns)
joinSchema = expression.NewSchema(shallowColSlice...)
foundCnt := 0
// Here we want to resolve all join schema columns directly as a merged schema, and you know same name
// col in join schema should be separately redirected to corresponded same col in child schema. But two
// column sets are **NOT** always ordered, see comment: https://github.com/pingcap/tidb/pull/45831#discussion_r1481031471
// we are using mapping mechanism instead of moving j forward.
marked := make([]bool, mergedSchema.Len())
for i := 0; i < colsNeedResolving; i++ {
findIdx := -1
for j := 0; j < len(mergedSchema.Columns); j++ {
if !joinSchema.Columns[i].Equal(nil, mergedSchema.Columns[j]) || marked[j] {
continue
}
// resolve to a same unique id one, and it not being marked.
findIdx = j
break
}
if findIdx != -1 {
// valid one.
joinSchema.Columns[i] = joinSchema.Columns[i].Clone().(*expression.Column)
joinSchema.Columns[i].Index = findIdx
marked[findIdx] = true
foundCnt++
}
}
return joinSchema
}

func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Executor) *HashJoinExec {
if testCase.useOuterToBuild {
innerExec, outerExec = outerExec, innerExec
Expand All @@ -907,6 +947,10 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec
joinSchema.Append(cols0...)
joinSchema.Append(cols1...)
}
// todo: need systematic way to protect.
// physical join should resolveIndices to get right schema column index.
// otherwise, markChildrenUsedColsForTest will fail below.
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), core.InnerJoin)

joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
Expand Down Expand Up @@ -962,25 +1006,39 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec

// markChildrenUsedColsForTest compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputSchema.Columns {
markedOffsets[col.Index] = struct{}{}
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
childrenUsed = make([][]int, 0, len(childSchemas))
markedOffsets := make(map[int]int)
for originalIdx, col := range outputSchema.Columns {
markedOffsets[col.Index] = originalIdx
}
prefixLen := 0
type intPair struct {
first int
second int
}
// for example here.
// left child schema: [col11]
// right child schema: [col21, col22]
// output schema is [col11, col22, col21], if not records the original derived order after physical resolve index.
// the lused will be [0], the rused will be [0,1], while the actual order is dismissed, [1,0] is correct for rused.
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
usedIdxPair := make([]intPair, 0, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
if originalIdx, ok := markedOffsets[prefixLen+i]; ok {
usedIdxPair = append(usedIdxPair, intPair{first: originalIdx, second: i})
}
}
childrenUsed = append(childrenUsed, used)
}
for _, child := range childSchemas {
used := expression.GetUsedList(outputSchema.Columns, child)
childrenUsed = append(childrenUsed, used)
// sort the used idxes according their original indexes derived after resolveIndex.
slices.SortFunc(usedIdxPair, func(a, b intPair) int {
return cmp.Compare(a.first, b.first)
})
usedIdx := make([]int, 0, len(childSchema.Columns))
for _, one := range usedIdxPair {
usedIdx = append(usedIdx, one.second)
}
childrenUsed = append(childrenUsed, usedIdx)
prefixLen += childSchema.Len()
}
return
}
Expand Down Expand Up @@ -1582,6 +1640,20 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
isOuterJoin: false,
}

var usedIdx [][]int
if tc.childrenUsedSchema != nil {
usedIdx = make([][]int, 0, len(tc.childrenUsedSchema))
for _, childSchema := range tc.childrenUsedSchema {
used := make([]int, 0, len(childSchema))
for idx, one := range childSchema {
if one {
used = append(used, idx)
}
}
usedIdx = append(usedIdx, used)
}
}

mergeJoinExec.joiner = newJoiner(
tc.ctx,
0,
Expand All @@ -1590,7 +1662,7 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
nil,
exec.RetTypes(leftExec),
exec.RetTypes(rightExec),
tc.childrenUsedSchema,
usedIdx,
false,
)

Expand Down
45 changes: 30 additions & 15 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,11 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) exec.Executor
end: v.Offset + v.Count,
}

childUsedSchemaLen := v.Children()[0].Schema().Len()
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
for i, used := range childUsedSchema {
if used {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, i)
}
}
if len(e.columnIdxsUsedByChild) == len(childUsedSchema) {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, childUsedSchema...)
if len(e.columnIdxsUsedByChild) == childUsedSchemaLen {
e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition.
}
return e
Expand Down Expand Up @@ -2914,21 +2911,39 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputCols {
markedOffsets[col.Index] = struct{}{}
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
childrenUsed = make([][]int, 0, len(childSchemas))
markedOffsets := make(map[int]int)
// keep the original maybe reversed order.
for originalIdx, col := range outputCols {
markedOffsets[col.Index] = originalIdx
}
prefixLen := 0
type intPair struct {
first int
second int
}
// for example here.
// left child schema: [col11]
// right child schema: [col21, col22]
// output schema is [col11, col22, col21], if not records the original derived order after physical resolve index.
// the lused will be [0], the rused will be [0,1], while the actual order is dismissed, [1,0] is correct for rused.
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
usedIdxPair := make([]intPair, 0, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
if originalIdx, ok := markedOffsets[prefixLen+i]; ok {
usedIdxPair = append(usedIdxPair, intPair{first: originalIdx, second: i})
}
}
childrenUsed = append(childrenUsed, used)
// sort the used idxes according their original indexes derived after resolveIndex.
slices.SortFunc(usedIdxPair, func(a, b intPair) int {
return cmp.Compare(a.first, b.first)
})
usedIdx := make([]int, 0, len(childSchema.Columns))
for _, one := range usedIdxPair {
usedIdx = append(usedIdx, one.second)
}
childrenUsed = append(childrenUsed, usedIdx)
prefixLen += childSchema.Len()
}
return
Expand Down
17 changes: 6 additions & 11 deletions pkg/executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func JoinerType(j joiner) plannercore.JoinType {

func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
outerIsRight bool, defaultInner []types.Datum, filter []expression.Expression,
lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]bool, isNA bool) joiner {
lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]int, isNA bool) joiner {
base := baseJoiner{
ctx: ctx,
conditions: filter,
Expand All @@ -141,19 +141,14 @@ func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
}
base.selected = make([]bool, 0, chunk.InitialCapacity)
base.isNull = make([]bool, 0, chunk.InitialCapacity)
// lused and rused should be followed with its original order.
// the case is that is join schema rely on the reversed order
// of child's schema, here we should keep it original order.
if childrenUsed != nil {
base.lUsed = make([]int, 0, len(childrenUsed[0])) // make it non-nil
for i, used := range childrenUsed[0] {
if used {
base.lUsed = append(base.lUsed, i)
}
}
base.lUsed = append(base.lUsed, childrenUsed[0]...)
base.rUsed = make([]int, 0, len(childrenUsed[1])) // make it non-nil
for i, used := range childrenUsed[1] {
if used {
base.rUsed = append(base.rUsed, i)
}
}
base.rUsed = append(base.rUsed, childrenUsed[1]...)
logutil.BgLogger().Debug("InlineProjection",
zap.Ints("lUsed", base.lUsed), zap.Ints("rUsed", base.rUsed),
zap.Int("lCount", len(lhsColTypes)), zap.Int("rCount", len(rhsColTypes)))
Expand Down
26 changes: 13 additions & 13 deletions pkg/executor/test/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,19 +1446,19 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_10 9970.00 root ",
"├─TableReader_15 3323.33 root MppVersion: 2, data:ExchangeSender_14",
"│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 pushed down filter:empty, keep order:false, stats:pseudo",
"├─TableReader_19 3323.33 root MppVersion: 2, data:ExchangeSender_18",
"│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 pushed down filter:empty, keep order:false, stats:pseudo",
"└─TableReader_23 3323.33 root MppVersion: 2, data:ExchangeSender_22",
" └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 pushed down filter:empty, keep order:false, stats:pseudo"))
"PartitionUnion_11 9970.00 root ",
"├─TableReader_16 3323.33 root MppVersion: 2, data:ExchangeSender_15",
"│ └─ExchangeSender_15 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_14 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_13 10000.00 mpp[tiflash] table:t1, partition:p0 pushed down filter:empty, keep order:false, stats:pseudo",
"├─TableReader_20 3323.33 root MppVersion: 2, data:ExchangeSender_19",
"│ └─ExchangeSender_19 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_18 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_17 10000.00 mpp[tiflash] table:t1, partition:p1 pushed down filter:empty, keep order:false, stats:pseudo",
"└─TableReader_24 3323.33 root MppVersion: 2, data:ExchangeSender_23",
" └─ExchangeSender_23 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_22 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_21 10000.00 mpp[tiflash] table:t1, partition:p2 pushed down filter:empty, keep order:false, stats:pseudo"))
}

func TestMPPMemoryTracker(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (opt *Optimizer) FindBestPlan(sctx sessionctx.Context, logical plannercore.
}

func (*Optimizer) onPhasePreprocessing(_ sessionctx.Context, plan plannercore.LogicalPlan) (plannercore.LogicalPlan, error) {
err := plan.PruneColumns(plan.Schema().Columns, nil)
plan, err := plan.PruneColumns(plan.Schema().Columns, nil)
if err != nil {
return nil, err
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ type LogicalPlan interface {
// Because it might change the root if the having clause exists, we need to return a plan that represents a new root.
PredicatePushDown([]expression.Expression, *logicalOptimizeOp) ([]expression.Expression, LogicalPlan)

// PruneColumns prunes the unused columns.
PruneColumns([]*expression.Column, *logicalOptimizeOp) error
// PruneColumns prunes the unused columns, and return the new logical plan if changed, otherwise it's same.
PruneColumns([]*expression.Column, *logicalOptimizeOp) (LogicalPlan, error)

// findBestTask converts the logical plan to the physical plan. It's a new interface.
// It is called recursively from the parent to the children to create the result physical plan.
Expand Down Expand Up @@ -759,11 +759,16 @@ func (*baseLogicalPlan) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
}

// PruneColumns implements LogicalPlan interface.
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp) error {
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp) (LogicalPlan, error) {
if len(p.children) == 0 {
return nil
return p.self, nil
}
var err error
p.children[0], err = p.children[0].PruneColumns(parentUsedCols, opt)
if err != nil {
return nil, err
}
return p.children[0].PruneColumns(parentUsedCols, opt)
return p.self, nil
}

// Schema implements Plan Schema interface.
Expand Down
37 changes: 22 additions & 15 deletions pkg/planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,26 +141,33 @@ func (p *PhysicalHashJoin) ResolveIndicesItself() (err error) {
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue

// Here we want to resolve all join schema columns directly as a merged schema, and you know same name
// col in join schema should be separately redirected to corresponded same col in child schema. But two
// column sets are **NOT** always ordered, see comment: https://github.com/pingcap/tidb/pull/45831#discussion_r1481031471
// we are using mapping mechanism instead of moving j forward.
marked := make([]bool, mergedSchema.Len())
for i := 0; i < colsNeedResolving; i++ {
findIdx := -1
for j := 0; j < len(mergedSchema.Columns); j++ {
if !p.schema.Columns[i].Equal(p.SCtx(), mergedSchema.Columns[j]) || marked[j] {
continue
}
// resolve to a same unique id one, and it not being marked.
findIdx = j
break
}
if findIdx != -1 {
// valid one.
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = findIdx
marked[findIdx] = true
foundCnt++
}
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}

return
}

Expand Down
Loading

0 comments on commit 206723f

Please sign in to comment.