Skip to content

Commit

Permalink
[Bug](sort) disable 2phase read for sort by expressions exclude slotref
Browse files Browse the repository at this point in the history
```
create table tbl1 (k1 varchar(100), k2 string) distributed by hash(k1) buckets 1 properties("replication_num" = "1");

insert into tbl1 values(1, "alice");

select cast(k1 as INT) as id from tbl1 order by id limit 2;
```

The above query could pass `checkEnableTwoPhaseRead` since the order by element is SlotRef but actually it's an function call expr
  • Loading branch information
eldenmoon committed Feb 7, 2023
1 parent bed1ab7 commit 5095013
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
18 changes: 16 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class SelectStmt extends QueryStmt {
// For quick get condition for point query
private Map<SlotRef, Expr> eqPredicates;

boolean isTwoPhaseOptEnabled = false;

public SelectStmt(ValueList valueList, ArrayList<OrderByElement> orderByElement, LimitElement limitElement) {
super(orderByElement, limitElement);
this.valueList = valueList;
Expand Down Expand Up @@ -603,6 +605,7 @@ public void analyze(Analyzer analyzer) throws UserException {
// rest of resultExprs will be marked as `INVALID`, such columns will
// be prevent from reading from ScanNode.Those columns will be finally
// read by the second fetch phase
isTwoPhaseOptEnabled = true;
LOG.debug("two phase read optimize enabled");
// Expr.analyze(resultExprs, analyzer);
Set<SlotRef> resultSlots = Sets.newHashSet();
Expand All @@ -620,10 +623,12 @@ public void analyze(Analyzer analyzer) throws UserException {
// invalid slots will be pruned from reading from ScanNode
slot.setInvalid();
}

LOG.debug("resultsSlots {}", resultSlots);
LOG.debug("orderingSlots {}", orderingSlots);
LOG.debug("conjuntSlots {}", conjuntSlots);
}
checkAndSetPointQuery();
if (evaluateOrderBy) {
createSortTupleInfo(analyzer);
}
Expand All @@ -648,6 +653,10 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}

public boolean isTwoPhaseReadOptEnabled() {
return isTwoPhaseOptEnabled;
}

// Check whether enable two phase read optimize, if enabled query will be devieded into two phase read:
// 1. read conjuncts columns and order by columns along with an extra RowId column from ScanNode
// 2. sort and filter data, and get final RowId column, spawn RPC to other BE to fetch final data
Expand Down Expand Up @@ -707,11 +716,12 @@ public boolean checkEnableTwoPhaseRead(Analyzer analyzer) {
// Rethink? implement more generic to support all exprs
LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
LOG.debug("getOrderByElements {}", getOrderByElements());
for (OrderByElement orderby : getOrderByElements()) {
if (!(orderby.getExpr() instanceof SlotRef)) {
for (Expr sortExpr : sortInfo.getOrderingExprs()) {
if (!(sortExpr instanceof SlotRef)) {
return false;
}
}
isTwoPhaseOptEnabled = true;
return true;
}

Expand Down Expand Up @@ -2376,6 +2386,10 @@ public Map<SlotRef, Expr> getPointQueryEQPredicates() {
return eqPredicates;
}

public boolean isPointQueryShortCircuit() {
return isPointQuery;
}

// Check if it is a point query and set EQUAL predicates
public boolean checkAndSetPointQuery() {
if (isPointQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue
LOG.debug("this isn't block query");
}
// Check SelectStatement if optimization condition satisfied
if (selectStmt.checkAndSetPointQuery()) {
if (selectStmt.isPointQueryShortCircuit()) {
// Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2
// such query will use direct RPC to do point query
LOG.debug("it's a point query");
Expand All @@ -276,7 +276,7 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue
analyzer.getPrepareStmt().cacheSerializedDescriptorTable(olapScanNode.getDescTable());
analyzer.getPrepareStmt().cacheSerializedOutputExprs(rootFragment.getOutputExprs());
}
} else if (selectStmt.checkEnableTwoPhaseRead(analyzer)) {
} else if (selectStmt.isTwoPhaseReadOptEnabled()) {
// Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...`
injectRowIdColumnSlot();
}
Expand Down
32 changes: 32 additions & 0 deletions regression-test/data/query_p0/sort/sort.out
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,35 @@ true

-- !sql --

-- !sql --
1
2
3
4

-- !sql --
0
0
1
1

-- !sql --
1
2
3
4

-- !sql --
1
2
3
4

-- !sql --
1
2

-- !sql --
1
2

13 changes: 13 additions & 0 deletions regression-test/suites/query_p0/sort/sort.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,17 @@ suite("sort") {
LIMIT 110
OFFSET 130
"""

sql """drop table if exists tbl1"""
sql """create table tbl1 (k1 varchar(100), k2 string) distributed by hash(k1) buckets 1 properties("replication_num" = "1");"""
sql """insert into tbl1 values(1, "alice");"""
sql """insert into tbl1 values(2, "bob");"""
sql """insert into tbl1 values(3, "mark");"""
sql """insert into tbl1 values(4, "thor");"""
qt_sql """select cast(k1 as INT) as id from tbl1 order by id;"""
qt_sql """select cast(k1 as INT) % 2 as id from tbl1 order by id;"""
qt_sql """select cast(k1 as BIGINT) as id from tbl1 order by id;"""
qt_sql """select cast(k1 as STRING) as id from tbl1 order by id;"""
qt_sql """select cast(k1 as INT) as id from tbl1 order by id limit 2"""
qt_sql """select cast(k1 as STRING) as id from tbl1 order by id limit 2"""
}

0 comments on commit 5095013

Please sign in to comment.