Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: join resolveIndex won't find its column and amend join's l/rused logic for reversed column ref from join schema to its children #51282

Merged
99 changes: 85 additions & 14 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -882,6 +883,44 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
return tc
}

func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType core.JoinType) *expression.Schema {
colsNeedResolving := joinSchema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if joinType == core.LeftOuterSemiJoin || joinType == core.AntiLeftOuterSemiJoin {
colsNeedResolving--
}
mergedSchema := expression.MergeSchema(lSchema, rSchema)
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, joinSchema.Len())
copy(shallowColSlice, joinSchema.Columns)
joinSchema = expression.NewSchema(shallowColSlice...)
foundCnt := 0
// Here we want to resolve all join schema columns directly as a merged schema, and you know same name
// col in join schema should be separately redirected to corresponded same col in child schema. But two
// column sets are **NOT** always ordered, see comment: https://github.com/pingcap/tidb/pull/45831#discussion_r1481031471
// we are using mapping mechanism instead of moving j forward.
marked := make([]bool, mergedSchema.Len())
for i := 0; i < colsNeedResolving; i++ {
findIdx := -1
for j := 0; j < len(mergedSchema.Columns); j++ {
if !joinSchema.Columns[i].Equal(nil, mergedSchema.Columns[j]) || marked[j] {
continue
}
// resolve to a same unique id one, and it not being marked.
findIdx = j
break
}
if findIdx != -1 {
// valid one.
joinSchema.Columns[i] = joinSchema.Columns[i].Clone().(*expression.Column)
joinSchema.Columns[i].Index = findIdx
marked[findIdx] = true
foundCnt++
}
}
return joinSchema
}

func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
if testCase.useOuterToBuild {
innerExec, outerExec = outerExec, innerExec
Expand All @@ -905,6 +944,10 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
joinSchema.Append(cols0...)
joinSchema.Append(cols1...)
}
// todo: need systematic way to protect.
// physical join should resolveIndices to get right schema column index.
// otherwise, markChildrenUsedColsForTest will fail below.
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), core.InnerJoin)

joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
Expand Down Expand Up @@ -960,25 +1003,39 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)

// markChildrenUsedColsForTest compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputSchema.Columns {
markedOffsets[col.Index] = struct{}{}
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
childrenUsed = make([][]int, 0, len(childSchemas))
markedOffsets := make(map[int]int)
for originalIdx, col := range outputSchema.Columns {
markedOffsets[col.Index] = originalIdx
}
prefixLen := 0
type intPair struct {
first int
second int
}
// for example here.
// left child schema: [col11]
// right child schema: [col21, col22]
// output schema is [col11, col22, col21], if not records the original derived order after physical resolve index.
// the lused will be [0], the rused will be [0,1], while the actual order is dismissed, [1,0] is correct for rused.
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
usedIdxPair := make([]intPair, 0, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
if originalIdx, ok := markedOffsets[prefixLen+i]; ok {
usedIdxPair = append(usedIdxPair, intPair{first: originalIdx, second: i})
}
}
childrenUsed = append(childrenUsed, used)
}
for _, child := range childSchemas {
used := expression.GetUsedList(outputSchema.Columns, child)
childrenUsed = append(childrenUsed, used)
// sort the used idxes according their original indexes derived after resolveIndex.
slices.SortFunc(usedIdxPair, func(a, b intPair) bool {
return a.first < b.first
})
usedIdx := make([]int, 0, len(childSchema.Columns))
for _, one := range usedIdxPair {
usedIdx = append(usedIdx, one.second)
}
childrenUsed = append(childrenUsed, usedIdx)
prefixLen += childSchema.Len()
}
return
}
Expand Down Expand Up @@ -1580,6 +1637,20 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
isOuterJoin: false,
}

var usedIdx [][]int
if tc.childrenUsedSchema != nil {
usedIdx = make([][]int, 0, len(tc.childrenUsedSchema))
for _, childSchema := range tc.childrenUsedSchema {
used := make([]int, 0, len(childSchema))
for idx, one := range childSchema {
if one {
used = append(used, idx)
}
}
usedIdx = append(usedIdx, used)
}
}

mergeJoinExec.joiner = newJoiner(
tc.ctx,
0,
Expand All @@ -1588,7 +1659,7 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
nil,
retTypes(leftExec),
retTypes(rightExec),
tc.childrenUsedSchema,
usedIdx,
false,
)

Expand Down
45 changes: 30 additions & 15 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,14 +755,11 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
end: v.Offset + v.Count,
}

childUsedSchemaLen := v.Children()[0].Schema().Len()
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
for i, used := range childUsedSchema {
if used {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, i)
}
}
if len(e.columnIdxsUsedByChild) == len(childUsedSchema) {
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, childUsedSchema...)
if len(e.columnIdxsUsedByChild) == childUsedSchemaLen {
e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition.
}
return e
Expand Down Expand Up @@ -3055,21 +3052,39 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputCols {
markedOffsets[col.Index] = struct{}{}
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
childrenUsed = make([][]int, 0, len(childSchemas))
markedOffsets := make(map[int]int)
// keep the original maybe reversed order.
for originalIdx, col := range outputCols {
markedOffsets[col.Index] = originalIdx
}
prefixLen := 0
type intPair struct {
first int
second int
}
// for example here.
// left child schema: [col11]
// right child schema: [col21, col22]
// output schema is [col11, col22, col21], if not records the original derived order after physical resolve index.
// the lused will be [0], the rused will be [0,1], while the actual order is dismissed, [1,0] is correct for rused.
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
usedIdxPair := make([]intPair, 0, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
if originalIdx, ok := markedOffsets[prefixLen+i]; ok {
usedIdxPair = append(usedIdxPair, intPair{first: originalIdx, second: i})
}
}
childrenUsed = append(childrenUsed, used)
// sort the used idxes according their original indexes derived after resolveIndex.
slices.SortFunc(usedIdxPair, func(a, b intPair) bool {
return a.first < b.first
})
usedIdx := make([]int, 0, len(childSchema.Columns))
for _, one := range usedIdxPair {
usedIdx = append(usedIdx, one.second)
}
childrenUsed = append(childrenUsed, usedIdx)
prefixLen += childSchema.Len()
}
return
Expand Down
17 changes: 6 additions & 11 deletions executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func JoinerType(j joiner) plannercore.JoinType {

func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
outerIsRight bool, defaultInner []types.Datum, filter []expression.Expression,
lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]bool, isNA bool) joiner {
lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]int, isNA bool) joiner {
base := baseJoiner{
ctx: ctx,
conditions: filter,
Expand All @@ -141,19 +141,14 @@ func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
}
base.selected = make([]bool, 0, chunk.InitialCapacity)
base.isNull = make([]bool, 0, chunk.InitialCapacity)
// lused and rused should be followed with its original order.
// the case is that is join schema rely on the reversed order
// of child's schema, here we should keep it original order.
if childrenUsed != nil {
base.lUsed = make([]int, 0, len(childrenUsed[0])) // make it non-nil
for i, used := range childrenUsed[0] {
if used {
base.lUsed = append(base.lUsed, i)
}
}
base.lUsed = append(base.lUsed, childrenUsed[0]...)
base.rUsed = make([]int, 0, len(childrenUsed[1])) // make it non-nil
for i, used := range childrenUsed[1] {
if used {
base.rUsed = append(base.rUsed, i)
}
}
base.rUsed = append(base.rUsed, childrenUsed[1]...)
logutil.BgLogger().Debug("InlineProjection",
zap.Ints("lUsed", base.lUsed), zap.Ints("rUsed", base.rUsed),
zap.Int("lCount", len(lhsColTypes)), zap.Int("rCount", len(rhsColTypes)))
Expand Down
26 changes: 13 additions & 13 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,19 +1449,19 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_10 9970.00 root ",
"├─TableReader_15 3323.33 root MppVersion: 1, data:ExchangeSender_14",
"│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 pushed down filter:empty, keep order:false, stats:pseudo",
"├─TableReader_19 3323.33 root MppVersion: 1, data:ExchangeSender_18",
"│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 pushed down filter:empty, keep order:false, stats:pseudo",
"└─TableReader_23 3323.33 root MppVersion: 1, data:ExchangeSender_22",
" └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 pushed down filter:empty, keep order:false, stats:pseudo"))
"PartitionUnion_11 9970.00 root ",
"├─TableReader_16 3323.33 root MppVersion: 1, data:ExchangeSender_15",
"│ └─ExchangeSender_15 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_14 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_13 10000.00 mpp[tiflash] table:t1, partition:p0 pushed down filter:empty, keep order:false, stats:pseudo",
"├─TableReader_20 3323.33 root MppVersion: 1, data:ExchangeSender_19",
"│ └─ExchangeSender_19 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_18 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_17 10000.00 mpp[tiflash] table:t1, partition:p1 pushed down filter:empty, keep order:false, stats:pseudo",
"└─TableReader_24 3323.33 root MppVersion: 1, data:ExchangeSender_23",
" └─ExchangeSender_23 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_22 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_21 10000.00 mpp[tiflash] table:t1, partition:p2 pushed down filter:empty, keep order:false, stats:pseudo"))
}

func TestMPPMemoryTracker(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion planner/cascades/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func (opt *Optimizer) FindBestPlan(sctx sessionctx.Context, logical plannercore.
}

func (*Optimizer) onPhasePreprocessing(_ sessionctx.Context, plan plannercore.LogicalPlan) (plannercore.LogicalPlan, error) {
err := plan.PruneColumns(plan.Schema().Columns, nil)
var err error
plan, err = plan.PruneColumns(plan.Schema().Columns, nil)
if err != nil {
return nil, err
}
Expand Down
Loading