Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](sort) disable 2phase read for sort by expressions exclude slotref #16460

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class SelectStmt extends QueryStmt {
// For quick get condition for point query
private Map<SlotRef, Expr> eqPredicates;

boolean isTwoPhaseOptEnabled = false;

public SelectStmt(ValueList valueList, ArrayList<OrderByElement> orderByElement, LimitElement limitElement) {
super(orderByElement, limitElement);
this.valueList = valueList;
Expand Down Expand Up @@ -603,6 +605,7 @@ public void analyze(Analyzer analyzer) throws UserException {
// rest of resultExprs will be marked as `INVALID`, such columns will
// be prevent from reading from ScanNode.Those columns will be finally
// read by the second fetch phase
isTwoPhaseOptEnabled = true;
LOG.debug("two phase read optimize enabled");
// Expr.analyze(resultExprs, analyzer);
Set<SlotRef> resultSlots = Sets.newHashSet();
Expand All @@ -620,10 +623,12 @@ public void analyze(Analyzer analyzer) throws UserException {
// invalid slots will be pruned from reading from ScanNode
slot.setInvalid();
}

LOG.debug("resultsSlots {}", resultSlots);
LOG.debug("orderingSlots {}", orderingSlots);
LOG.debug("conjuntSlots {}", conjuntSlots);
}
checkAndSetPointQuery();
if (evaluateOrderBy) {
createSortTupleInfo(analyzer);
}
Expand All @@ -648,6 +653,10 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}

public boolean isTwoPhaseReadOptEnabled() {
return isTwoPhaseOptEnabled;
}

// Check whether enable two phase read optimize, if enabled query will be devieded into two phase read:
// 1. read conjuncts columns and order by columns along with an extra RowId column from ScanNode
// 2. sort and filter data, and get final RowId column, spawn RPC to other BE to fetch final data
Expand Down Expand Up @@ -707,11 +716,12 @@ public boolean checkEnableTwoPhaseRead(Analyzer analyzer) {
// Rethink? implement more generic to support all exprs
LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
LOG.debug("getOrderByElements {}", getOrderByElements());
for (OrderByElement orderby : getOrderByElements()) {
if (!(orderby.getExpr() instanceof SlotRef)) {
for (Expr sortExpr : sortInfo.getOrderingExprs()) {
if (!(sortExpr instanceof SlotRef)) {
return false;
}
}
isTwoPhaseOptEnabled = true;
return true;
}

Expand Down Expand Up @@ -2376,6 +2386,10 @@ public Map<SlotRef, Expr> getPointQueryEQPredicates() {
return eqPredicates;
}

public boolean isPointQueryShortCircuit() {
return isPointQuery;
}

// Check if it is a point query and set EQUAL predicates
public boolean checkAndSetPointQuery() {
if (isPointQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue
LOG.debug("this isn't block query");
}
// Check SelectStatement if optimization condition satisfied
if (selectStmt.checkAndSetPointQuery()) {
if (selectStmt.isPointQueryShortCircuit()) {
// Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2
// such query will use direct RPC to do point query
LOG.debug("it's a point query");
Expand All @@ -276,7 +276,7 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue
analyzer.getPrepareStmt().cacheSerializedDescriptorTable(olapScanNode.getDescTable());
analyzer.getPrepareStmt().cacheSerializedOutputExprs(rootFragment.getOutputExprs());
}
} else if (selectStmt.checkEnableTwoPhaseRead(analyzer)) {
} else if (selectStmt.isTwoPhaseReadOptEnabled()) {
// Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...`
injectRowIdColumnSlot();
}
Expand Down
32 changes: 32 additions & 0 deletions regression-test/data/query_p0/sort/sort.out
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,35 @@ true

-- !sql --

-- !sql --
1
2
3
4

-- !sql --
0
0
1
1

-- !sql --
1
2
3
4

-- !sql --
1
2
3
4

-- !sql --
1
2

-- !sql --
1
2

13 changes: 13 additions & 0 deletions regression-test/suites/query_p0/sort/sort.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,17 @@ suite("sort") {
LIMIT 110
OFFSET 130
"""

sql """drop table if exists tbl1"""
sql """create table tbl1 (k1 varchar(100), k2 string) distributed by hash(k1) buckets 1 properties("replication_num" = "1");"""
sql """insert into tbl1 values(1, "alice");"""
sql """insert into tbl1 values(2, "bob");"""
sql """insert into tbl1 values(3, "mark");"""
sql """insert into tbl1 values(4, "thor");"""
qt_sql """select cast(k1 as INT) as id from tbl1 order by id;"""
qt_sql """select cast(k1 as INT) % 2 as id from tbl1 order by id;"""
qt_sql """select cast(k1 as BIGINT) as id from tbl1 order by id;"""
qt_sql """select cast(k1 as STRING) as id from tbl1 order by id;"""
qt_sql """select cast(k1 as INT) as id from tbl1 order by id limit 2"""
qt_sql """select cast(k1 as STRING) as id from tbl1 order by id limit 2"""
}