Skip to content

Commit

Permalink
[enhancement](Nereids) two phase read for topn (#18829)
Browse files Browse the repository at this point in the history
add two phase read topn opt, the legacy planner's PR are:
- #15642
- #16460
- #16848

TODO:
we forbid limit(sort(project(scan))) since be core when plan has a project on the scan.
we need to remove this restirction after we fix be bug
  • Loading branch information
morrySnow authored Apr 21, 2023
1 parent fc63747 commit b84bd15
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SlotDescriptor {
private boolean isAgg;
private boolean isMultiRef;
// If set to false, then such slots will be ignored during
// materialize them.Used to optmize to read less data and less memory usage
// materialize them.Used to optimize to read less data and less memory usage
private boolean needMaterialize = true;

public SlotDescriptor(SlotId id, TupleDescriptor parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,23 @@
* </STRONG>
*/
public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {

private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
protected StatsErrorEstimator statsErrorEstimator;
PlanTranslatorContext context;

private final StatsErrorEstimator statsErrorEstimator;
private final PlanTranslatorContext context;

public PhysicalPlanTranslator() {
this(null, null);
}

public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
this.context = context;
this.statsErrorEstimator = statsErrorEstimator;
public PhysicalPlanTranslator(PlanTranslatorContext context) {
this(context, null);
}

public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
this.context = context;
return translatePlan(physicalPlan);
this.statsErrorEstimator = statsErrorEstimator;
}

/**
Expand Down Expand Up @@ -497,8 +499,16 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
.addAll(olapScan.getOutput())
.addAll(filterSlotsOfSelectedIndex(olapScan.getNonUserVisibleOutput(), olapScan))
.build();
Set<ExprId> deferredMaterializedExprIds = Collections.emptySet();
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
deferredMaterializedExprIds = (Set<ExprId>) (olapScan
.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).get());
}
OlapTable olapTable = olapScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, context);
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, deferredMaterializedExprIds, context);
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
injectRowIdColumnSlot(tupleDescriptor);
}

// Use column with the same name in selected materialized index meta for slot desc,
// to get the correct col unique id.
Expand All @@ -513,7 +523,6 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
}
});
}

tupleDescriptor.setTable(olapTable);

OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
Expand Down Expand Up @@ -875,7 +884,7 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
Expand Down Expand Up @@ -929,6 +938,15 @@ private SortNode translateSortNode(AbstractPhysicalSort<? extends Plan> sort, Pl
SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, true);
sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, oldOrderingExprList);
if (sort.getMutableState(PhysicalTopN.TWO_PHASE_READ_OPT).isPresent()) {
sortNode.setUseTwoPhaseReadOpt(true);
sortNode.getSortInfo().setUseTwoPhaseRead();
injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
TupleDescriptor childTuple = childNode.getOutputTupleDesc() != null
? childNode.getOutputTupleDesc() : context.getTupleDesc(childNode.getTupleIds().get(0));
SlotDescriptor childRowIdDesc = childTuple.getSlots().get(childTuple.getSlots().size() - 1);
sortNode.getResolvedTupleExprs().add(new SlotRef(childRowIdDesc));
}
if (sort.getStats() != null) {
sortNode.setCardinality((long) sort.getStats().getRowCount());
}
Expand Down Expand Up @@ -1093,7 +1111,13 @@ public PlanFragment visitPhysicalHashJoin(
continue;
}
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
SlotDescriptor sd;
if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
// TODO: temporary code for two phase read, should remove it after refactor
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
} else {
sd = context.createSlotDesc(intermediateDescriptor, sf);
}
leftIntermediateSlotDescriptor.add(sd);
}
} else if (hashJoin.getOtherJoinConjuncts().isEmpty()
Expand All @@ -1103,7 +1127,13 @@ public PlanFragment visitPhysicalHashJoin(
continue;
}
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
SlotDescriptor sd;
if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
// TODO: temporary code for two phase read, should remove it after refactor
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
} else {
sd = context.createSlotDesc(intermediateDescriptor, sf);
}
rightIntermediateSlotDescriptor.add(sd);
}
} else {
Expand All @@ -1112,9 +1142,15 @@ public PlanFragment visitPhysicalHashJoin(
continue;
}
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
SlotDescriptor sd;
if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
// TODO: temporary code for two phase read, should remove it after refactor
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
} else {
sd = context.createSlotDesc(intermediateDescriptor, sf);
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
}
}
leftIntermediateSlotDescriptor.add(sd);
}
Expand All @@ -1123,9 +1159,15 @@ public PlanFragment visitPhysicalHashJoin(
continue;
}
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
SlotDescriptor sd;
if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
// TODO: temporary code for two phase read, should remove it after refactor
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
} else {
sd = context.createSlotDesc(intermediateDescriptor, sf);
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
}
}
rightIntermediateSlotDescriptor.add(sd);
}
Expand Down Expand Up @@ -1280,15 +1322,27 @@ public PlanFragment visitPhysicalNestedLoopJoin(
continue;
}
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
SlotDescriptor sd;
if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
// TODO: temporary code for two phase read, should remove it after refactor
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
} else {
sd = context.createSlotDesc(intermediateDescriptor, sf);
}
leftIntermediateSlotDescriptor.add(sd);
}
for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
if (!rightSlotDescriptor.isMaterialized()) {
continue;
}
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
SlotDescriptor sd;
if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
// TODO: temporary code for two phase read, should remove it after refactor
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
} else {
sd = context.createSlotDesc(intermediateDescriptor, sf);
}
rightIntermediateSlotDescriptor.add(sd);
}

Expand Down Expand Up @@ -1405,6 +1459,17 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
inputPlanNode.setProjectList(execExprList);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
// TODO: this is a temporary scheme to support two phase read when has project.
// we need to refactor all topn opt into rbo stage.
if (inputPlanNode instanceof OlapScanNode) {
ArrayList<SlotDescriptor> slots = context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
SlotDescriptor lastSlot = slots.get(slots.size() - 1);
if (lastSlot.getColumn() != null && lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
inputPlanNode.getProjectList().add(new SlotRef(lastSlot));
injectRowIdColumnSlot(tupleDescriptor);
requiredSlotIdSet.add(lastSlot.getId());
}
}

if (inputPlanNode instanceof ScanNode) {
updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, context);
Expand Down Expand Up @@ -1739,6 +1804,19 @@ private void extractExecSlot(Expr root, Set<SlotId> slotIdList) {
}
}

private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table,
Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
for (Slot slot : slotList) {
SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
if (deferredMaterializedExprIds.contains(slot.getExprId())) {
slotDescriptor.setNeedMaterialize(false);
}
}
return tupleDescriptor;
}

private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
Expand Down Expand Up @@ -2153,4 +2231,16 @@ private void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan ph
statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
}
}

private SlotDescriptor injectRowIdColumnSlot(TupleDescriptor tupleDesc) {
SlotDescriptor slotDesc = context.addSlotDesc(tupleDesc);
LOG.debug("inject slot {}", slotDesc);
String name = Column.ROWID_COL;
Column col = new Column(name, Type.STRING, false, null, false, "", "rowid column");
slotDesc.setType(Type.STRING);
slotDesc.setColumn(col);
slotDesc.setIsNullable(false);
slotDesc.setIsMaterialized(true);
return slotDesc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,6 @@ public TupleDescriptor getBufferedTupleForWindow() {
return bufferedTupleForWindow;
}

public void setBufferedTupleForWindow(TupleDescriptor bufferedTupleForWindow) {
this.bufferedTupleForWindow = bufferedTupleForWindow;
}

/**
* Create SlotDesc and add it to the mappings from expression to the stales expr.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public List<PlanPostProcessor> getProcessors() {
}
builder.add(new Validator());
builder.add(new TopNScanOpt());
builder.add(new TwoPhaseReadOpt());
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
Expand All @@ -36,45 +35,52 @@
*/

public class TopNScanOpt extends PlanPostProcessor {

@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
}
long threshold = getTopNOptLimitThreshold();
if (threshold == -1 || topN.getLimit() > threshold) {
return topN;
}
if (topN.getOrderKeys().isEmpty()) {
return topN;
}
Expression firstKey = topN.getOrderKeys().get(0).getExpr();

// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
}
// if firstKey's column is not present, it means the firstKey is not a original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
// a cast expr which is not from tbl1 and its column is not present.
// On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
// so its column is present which is valid for topN optimize
// see Alias::toSlot() method to get how column info is passed around by alias of slotReference
if (!(firstKey instanceof SlotReference) || !((SlotReference) firstKey).getColumn().isPresent()) {
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
if (!firstKey.isColumnFromTable()) {
return topN;
}
if (firstKey.getDataType().isStringLikeType()
|| firstKey.getDataType().isFloatType()
|| firstKey.getDataType().isDoubleType()) {
return topN;
}
while (child != null && (child instanceof Project || child instanceof Filter)) {

PhysicalOlapScan olapScan;
while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
}
if (child instanceof PhysicalOlapScan) {
PhysicalOlapScan scan = (PhysicalOlapScan) child;
if (scan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
}
if (!(child instanceof PhysicalOlapScan)) {
return topN;
}
olapScan = (PhysicalOlapScan) child;

if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
}

return topN;
}

Expand Down
Loading

0 comments on commit b84bd15

Please sign in to comment.