Skip to content

Commit

Permalink
[enhancement](Nereids) two phase read for topn
Browse files Browse the repository at this point in the history
  • Loading branch information
morrySnow committed Apr 19, 2023
1 parent 0b379de commit e5361b4
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SlotDescriptor {
private boolean isAgg;
private boolean isMultiRef;
// If set to false, then such slots will be ignored during
// materialize them.Used to optmize to read less data and less memory usage
// materialize them.Used to optimize to read less data and less memory usage
private boolean needMaterialize = true;

public SlotDescriptor(SlotId id, TupleDescriptor parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,23 @@
* </STRONG>
*/
public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {

private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
protected StatsErrorEstimator statsErrorEstimator;
PlanTranslatorContext context;

private final StatsErrorEstimator statsErrorEstimator;
private final PlanTranslatorContext context;

public PhysicalPlanTranslator() {
this(null, null);
}

public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
this.context = context;
this.statsErrorEstimator = statsErrorEstimator;
public PhysicalPlanTranslator(PlanTranslatorContext context) {
this(context, null);
}

public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
this.context = context;
return translatePlan(physicalPlan);
this.statsErrorEstimator = statsErrorEstimator;
}

/**
Expand Down Expand Up @@ -497,8 +499,16 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
.addAll(olapScan.getOutput())
.addAll(filterSlotsOfSelectedIndex(olapScan.getNonUserVisibleOutput(), olapScan))
.build();
Set<ExprId> deferredMaterializedExprIds = Collections.emptySet();
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
deferredMaterializedExprIds = (Set<ExprId>) (olapScan
.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).get());
}
OlapTable olapTable = olapScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, context);
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, deferredMaterializedExprIds, context);
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
injectRowIdColumnSlot(tupleDescriptor);
}

// Use column with the same name in selected materialized index meta for slot desc,
// to get the correct col unique id.
Expand All @@ -513,7 +523,6 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
}
});
}

tupleDescriptor.setTable(olapTable);

OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
Expand Down Expand Up @@ -875,7 +884,7 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
Expand Down Expand Up @@ -929,6 +938,15 @@ private SortNode translateSortNode(AbstractPhysicalSort<? extends Plan> sort, Pl
SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, true);
sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, oldOrderingExprList);
if (sort.getMutableState(PhysicalTopN.TWO_PHASE_READ_OPT).isPresent()) {
sortNode.setUseTwoPhaseReadOpt(true);
sortNode.getSortInfo().setUseTwoPhaseRead();
injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
TupleDescriptor childTuple = childNode.getOutputTupleDesc() != null
? childNode.getOutputTupleDesc() : context.getTupleDesc(childNode.getTupleIds().get(0));
SlotDescriptor childRowIdDesc = childTuple.getSlots().get(childTuple.getSlots().size() - 1);
sortNode.getResolvedTupleExprs().add(new SlotRef(childRowIdDesc));
}
if (sort.getStats() != null) {
sortNode.setCardinality((long) sort.getStats().getRowCount());
}
Expand Down Expand Up @@ -1405,6 +1423,17 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
inputPlanNode.setProjectList(execExprList);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
// TODO: this is a temporary scheme to support two phase read when has project.
// we need to refactor all topn opt into rbo stage.
if (inputPlanNode instanceof OlapScanNode) {
ArrayList<SlotDescriptor> slots = context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
SlotDescriptor lastSlot = slots.get(slots.size() - 1);
if (lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
inputPlanNode.getProjectList().add(new SlotRef(lastSlot));
}
injectRowIdColumnSlot(tupleDescriptor);
requiredSlotIdSet.add(lastSlot.getId());
}

if (inputPlanNode instanceof ScanNode) {
updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, context);
Expand Down Expand Up @@ -1739,6 +1768,19 @@ private void extractExecSlot(Expr root, Set<SlotId> slotIdList) {
}
}

private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table,
Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
for (Slot slot : slotList) {
SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
if (deferredMaterializedExprIds.contains(slot.getExprId())) {
slotDescriptor.setNeedMaterialize(false);
}
}
return tupleDescriptor;
}

private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
Expand Down Expand Up @@ -2153,4 +2195,16 @@ private void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan ph
statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
}
}

private SlotDescriptor injectRowIdColumnSlot(TupleDescriptor tupleDesc) {
SlotDescriptor slotDesc = context.addSlotDesc(tupleDesc);
LOG.debug("inject slot {}", slotDesc);
String name = Column.ROWID_COL;
Column col = new Column(name, Type.STRING, false, null, false, "", "rowid column");
slotDesc.setType(Type.STRING);
slotDesc.setColumn(col);
slotDesc.setIsNullable(false);
slotDesc.setIsMaterialized(true);
return slotDesc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public List<PlanPostProcessor> getProcessors() {
}
builder.add(new Validator());
builder.add(new TopNScanOpt());
builder.add(new TwoPhaseReadOpt());
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
Expand All @@ -36,45 +35,52 @@
*/

public class TopNScanOpt extends PlanPostProcessor {

@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
}
long threshold = getTopNOptLimitThreshold();
if (threshold == -1 || topN.getLimit() > threshold) {
return topN;
}
if (topN.getOrderKeys().isEmpty()) {
return topN;
}
Expression firstKey = topN.getOrderKeys().get(0).getExpr();

// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
}
// if firstKey's column is not present, it means the firstKey is not a original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
// a cast expr which is not from tbl1 and its column is not present.
// On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
// so its column is present which is valid for topN optimize
// see Alias::toSlot() method to get how column info is passed around by alias of slotReference
if (!(firstKey instanceof SlotReference) || !((SlotReference) firstKey).getColumn().isPresent()) {
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
if (!firstKey.isColumnFromTable()) {
return topN;
}
if (firstKey.getDataType().isStringLikeType()
|| firstKey.getDataType().isFloatType()
|| firstKey.getDataType().isDoubleType()) {
return topN;
}
while (child != null && (child instanceof Project || child instanceof Filter)) {

PhysicalOlapScan olapScan;
while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
}
if (child instanceof PhysicalOlapScan) {
PhysicalOlapScan scan = (PhysicalOlapScan) child;
if (scan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
}
if (!(child instanceof PhysicalOlapScan)) {
return topN;
}
olapScan = (PhysicalOlapScan) child;

if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
}

return topN;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.processor.post;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* two phase read opt
* refer to:
* https://github.com/apache/doris/pull/15642
* https://github.com/apache/doris/pull/16460
* https://github.com/apache/doris/pull/16848
*/

public class TwoPhaseReadOpt extends PlanPostProcessor {

@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
}
if (topN.getOrderKeys().isEmpty()) {
return topN;
}

// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topN.getLimit() > topNOptLimitThreshold) {
return topN;
}
if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
return topN;
}

PhysicalOlapScan olapScan;
PhysicalProject<Plan> project = null;
PhysicalFilter<Plan> filter = null;
while (child instanceof Project || child instanceof Filter) {
if (child instanceof Filter) {
filter = (PhysicalFilter<Plan>) child;
}
if (child instanceof Project) {
project = (PhysicalProject<Plan>) child;
// TODO: remove this after fix two phase read on project core
return topN;
}
child = child.child(0);
}
if (!(child instanceof PhysicalOlapScan)) {
return topN;
}
olapScan = (PhysicalOlapScan) child;

// all order key must column from table
if (!olapScan.getTable().getEnableLightSchemaChange()) {
return topN;
}

Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
if (project != null) {
for (Expression e : project.getProjects()) {
if (e.isSlot()) {
Slot slot = (Slot) e;
projectRevertedMap.put(slot.getExprId(), slot.getExprId());
} else if (e instanceof Alias) {
Alias alias = (Alias) e;
if (alias.child().isSlot()) {
Slot slot = (Slot) alias.child();
projectRevertedMap.put(alias.getExprId(), slot.getExprId());
}
}
}
}
Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(olapScan.getOutputExprIdSet());
if (filter != null) {
filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
}
topN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.map(projectRevertedMap::get)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
topN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.forEach(deferredMaterializedExprIds::remove);
topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);

return topN;
}

private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
return -1;
}
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ public boolean isSlot() {
return this instanceof Slot;
}

public boolean isColumnFromTable() {
return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
*/
public class PhysicalOlapScan extends PhysicalRelation implements OlapScan {

public static final String DEFERRED_MATERIALIZED_SLOTS = "deferred_materialized_slots";

private final OlapTable olapTable;
private final DistributionSpec distributionSpec;
private final long selectedIndexId;
Expand Down
Loading

0 comments on commit e5361b4

Please sign in to comment.