-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
planner: support push part of order property down to the partition table #36108
Changes from 5 commits
7c9e327
3993e00
3106166
2c08dde
39bca97
896a4e0
9f57b9c
6ccdc30
423f599
f940c71
2bc49ff
af0cf54
0575b4c
5006949
027eca6
4b00f0a
174254c
27324cb
bdf9854
51853c7
e9da563
921e702
7a60b86
09fb930
e1b4211
e285406
953273f
6b0699f
2794d3e
1d38fcc
743265c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import ( | |
"github.com/pingcap/tidb/planner/util" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/statistics" | ||
"github.com/pingcap/tidb/table/tables" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/util/chunk" | ||
"github.com/pingcap/tidb/util/collate" | ||
|
@@ -1011,6 +1012,10 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { | |
} | ||
needPushDown := len(cols) > 0 | ||
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 { | ||
newTask, changed := p.pushTopNDownToDynamicPartition(copTask) | ||
if changed { | ||
return newTask | ||
} | ||
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and | ||
// push it to table plan. | ||
var pushedDownTopN *PhysicalTopN | ||
|
@@ -1035,6 +1040,156 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { | |
return attachPlan2Task(p, rootTask) | ||
} | ||
|
||
// pushTopNDownToDynamicPartition is a temp solution for partition table. It actually does the same thing as DataSource's isMatchProp. | ||
func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bool) { | ||
var err error | ||
copTsk = copTsk.copy().(*copTask) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the err isn't assigned anything before |
||
return nil, false | ||
} | ||
if len(copTsk.rootTaskConds) > 0 { | ||
return nil, false | ||
} | ||
colsProp, ok := GetPropByOrderByItems(p.ByItems) | ||
if !ok { | ||
return nil, false | ||
} | ||
allSameOrder, isDesc := colsProp.AllSameOrder() | ||
if !allSameOrder { | ||
return nil, false | ||
} | ||
checkIndexMatchProp := func(idxCols []*expression.Column, idxColLens []int, constColsByCond []bool, colsProp *property.PhysicalProperty) bool { | ||
// If the number of the by-items is bigger than the index columns. We cannot push down since it must not keep order. | ||
if len(idxCols) < len(colsProp.SortItems) { | ||
return false | ||
} | ||
idxPos := 0 | ||
for _, byItem := range colsProp.SortItems { | ||
found := false | ||
for ; idxPos < len(idxCols); idxPos++ { | ||
if idxColLens[idxPos] == types.UnspecifiedLength && idxCols[idxPos].Equal(p.SCtx(), byItem.Col) { | ||
found = true | ||
idxPos++ | ||
break | ||
} | ||
if len(constColsByCond) == 0 || idxPos > len(constColsByCond) || !constColsByCond[idxPos] { | ||
found = false | ||
break | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missed set found = true at the end? (indicating that we are sure that this col is constant, then just go to outer's continued column) In: order by (a,c), once index(a,b,c) exists and b=1 is what actually it already does. |
||
} | ||
if !found { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
var ( | ||
idxScan *PhysicalIndexScan | ||
tblScan *PhysicalTableScan | ||
) | ||
if copTsk.indexPlan != nil { | ||
copTsk.indexPlan, err = copTsk.indexPlan.Clone() | ||
if err != nil { | ||
return nil, false | ||
} | ||
finalIdxScanPlan := copTsk.indexPlan | ||
for len(finalIdxScanPlan.Children()) > 0 && finalIdxScanPlan.Children()[0] != nil { | ||
AilinKid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
finalIdxScanPlan = finalIdxScanPlan.Children()[0] | ||
} | ||
idxScan = finalIdxScanPlan.(*PhysicalIndexScan) | ||
} | ||
if copTsk.tablePlan != nil { | ||
copTsk.tablePlan, err = copTsk.tablePlan.Clone() | ||
if err != nil { | ||
return nil, false | ||
} | ||
finalTblScanPlan := copTsk.tablePlan | ||
if len(finalTblScanPlan.Children()) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/ if / for? |
||
finalTblScanPlan = finalTblScanPlan.Children()[0] | ||
} | ||
tblScan = finalTblScanPlan.(*PhysicalTableScan) | ||
} | ||
if !copTsk.indexPlanFinished { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We didn't consider that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When |
||
// If indexPlan side isn't finished, there's no selection on the table side. | ||
|
||
if idxScan.Table.GetPartitionInfo() == nil { | ||
return nil, false | ||
} | ||
|
||
propMatched := checkIndexMatchProp(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp) | ||
if !propMatched { | ||
return nil, false | ||
} | ||
|
||
// If we reach here, the index matches the order property of the top-n, we could push a limit down. | ||
copTsk.keepOrder = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like we don't need to set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for partitions maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This func uses it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh actually we don't need the keep order, remove it. |
||
idxScan.KeepOrder = true | ||
idxScan.Desc = isDesc | ||
childProfile := copTsk.plan().statsInfo() | ||
newCount := p.Offset + p.Count | ||
stats := deriveLimitStats(childProfile, float64(newCount)) | ||
pushedLimit := PhysicalLimit{ | ||
Count: newCount, | ||
}.Init(p.SCtx(), stats, p.SelectBlockOffset()) | ||
pushedLimit.SetSchema(copTsk.indexPlan.Schema()) | ||
copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) | ||
pushedLimit.cost = copTsk.cost() | ||
if copTsk.tablePlan != nil && !idxScan.Table.IsCommonHandle && copTsk.extraHandleCol == nil { | ||
// The clustered index would always add handle, so we don't need to consider that case. | ||
copTsk.extraHandleCol = &expression.Column{ | ||
RetType: types.NewFieldType(mysql.TypeLonglong), | ||
ID: model.ExtraHandleID, | ||
UniqueID: idxScan.ctx.GetSessionVars().AllocPlanColumnID(), | ||
} | ||
tblScan.Schema().Append(copTsk.extraHandleCol) | ||
tblScan.Columns = append(tblScan.Columns, model.NewExtraHandleColInfo()) | ||
copTsk.needExtraProj = true | ||
copTsk.originSchema = idxScan.dataSourceSchema | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We missed the case that the int primary key is not |
||
} else if copTsk.indexPlan == nil { | ||
|
||
if tblScan.Table.GetPartitionInfo() == nil { | ||
return nil, false | ||
} | ||
|
||
if tblScan.HandleCols == nil { | ||
return nil, false | ||
} | ||
|
||
if tblScan.HandleCols.IsInt() { | ||
pk := tblScan.HandleCols.GetCol(0) | ||
if len(colsProp.SortItems) != 1 || !colsProp.SortItems[0].Col.Equal(p.SCtx(), pk) { | ||
Comment on lines
+1090
to
+1092
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is different from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change only affects tikv part. |
||
return nil, false | ||
} | ||
} else { | ||
idxCols, idxColLens := expression.IndexInfo2PrefixCols(tblScan.Columns, tblScan.Schema().Columns, tables.FindPrimaryIndex(tblScan.Table)) | ||
matched := checkIndexMatchProp(idxCols, idxColLens, nil, colsProp) | ||
if !matched { | ||
return nil, false | ||
} | ||
} | ||
copTsk.keepOrder = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
tblScan.KeepOrder = true | ||
tblScan.Desc = isDesc | ||
childProfile := copTsk.plan().statsInfo() | ||
newCount := p.Offset + p.Count | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we consider there's a selection on the table side? the count here may won't pass the selection There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In such case, the plan should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. get!make sense |
||
stats := deriveLimitStats(childProfile, float64(newCount)) | ||
pushedLimit := PhysicalLimit{ | ||
Count: newCount, | ||
}.Init(p.SCtx(), stats, p.SelectBlockOffset()) | ||
pushedLimit.SetSchema(copTsk.tablePlan.Schema()) | ||
copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) | ||
pushedLimit.cost = copTsk.cost() | ||
} else { | ||
return nil, false | ||
} | ||
|
||
rootTask := copTsk.convertToRootTask(p.ctx) | ||
rootTask.addCost(p.GetCost(rootTask.count(), true)) | ||
p.cost = rootTask.cost() | ||
return attachPlan2Task(p, rootTask), true | ||
} | ||
|
||
func (p *PhysicalProjection) attach2Task(tasks ...task) task { | ||
t := tasks[0].copy() | ||
if cop, ok := t.(*copTask); ok { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more detailed comments since it's a tricky way to modify the plan.
I think we should clarify what the resulting plan would be like and why we have to use this method as a temporary solution.