-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38978][SQL] DS V2 supports push down OFFSET operator #36295
Conversation
ping @huaxingao cc @cloud-fan |
* @group typedrel | ||
* @since 3.4.0 | ||
*/ | ||
def limitAndOffset(n: Int, m: Int): Dataset[T] = withTypedPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not match the SQL API. Shall we just have def offset(...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#36319 moves the check of Offset from analysis to finsh analysis, so we can add def offset(...)
.
* @since 3.4.0 | ||
*/ | ||
@Evolving | ||
public interface SupportsPushDownLimitAndOffset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a different opinion about this API design. For top n pushdown, the rationale is that we shouldn't pushdown sort, but the data source should report its ordering, and pushing top n is more reasonable. For offset, the current limitation is it must be used together with limit, but this can be changed in the future. I don't see why can't we pushdown offset along.
ping @huaxingao cc @cloud-fan |
@@ -433,13 +433,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { | |||
|
|||
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr) | |||
|
|||
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a separate PR for this?
* @group typedrel | ||
* @since 3.4.0 | ||
*/ | ||
def offset(n: Int): Dataset[T] = withTypedPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please open a separate PR for it. This is adding a new user-facing API and needs more visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit | |||
pushDownFilters, | |||
pushDownAggregates, | |||
pushDownLimits, | |||
pushDownOffsets, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we push down offset before limit? The natural operators order in SQL is offset then limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the doc, OFFSET should be pushed after LIMIT now.
/** | ||
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to | ||
* push down OFFSET. Please note that the combination of OFFSET with other operations | ||
* such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that this is copied from other pushdown interfaces, but I find it really hard to follow. We can push down OFFSET with many other operators if they follow the operator order we defined in ScanBuilder
's class doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW we need to update ScanBuider
's classdoc for new pushdown support.
@@ -23,7 +23,7 @@ | |||
* An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ | |||
* interfaces to do operator push down, and keep the operator push down result in the returned | |||
* {@link Scan}. When pushing down operators, the push down order is: | |||
* sample -> filter -> aggregate -> limit -> column pruning. | |||
* sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
top n is a bit tricky as it's sort + limit. how about aggregate -> limit/top-N(sort + limit) -> offset
? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. And this order matches the physical plan more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Show resolved
Hide resolved
* push down LIMIT. Please note that the combination of LIMIT with other operations | ||
* such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. | ||
* push down LIMIT. We can push down LIMIT with many other operations if they follow the | ||
* operator order we defined in {@link ScanBuilder}'s class doc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's open a backport PR for 3.3 to fix the classdoc later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
Show resolved
Hide resolved
checkLimitRemoved(df1) | ||
checkOffsetRemoved(df1) | ||
checkPushedInfo(df1, | ||
"PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: OFFSET 1,") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not match https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26
Assume that I have a local array data source. According to the API doc, Spark pushes down LIMIT first. For this query, I'll do array.take(1).drop(1)
. This is wrong and doesn't match the query df.limit(2).offset(1)
.
we should either fix the API doc, or fix the pushdown logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -23,7 +23,8 @@ | |||
* An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ | |||
* interfaces to do operator push down, and keep the operator push down result in the returned | |||
* {@link Scan}. When pushing down operators, the push down order is: | |||
* sample -> filter -> aggregate -> limit -> column pruning. | |||
* sample -> filter -> aggregate -> offset -> limit/top-n(sort + limit) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can switch the order of LIMIT and OFFSET to match this defined order, but we can't switch the order of ORDER BY and OFFSET. What if the SQL query is ORDER BY ... LIMIT ... OFFSET ...
? Nothing can be pushed down?
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Outdated
Show resolved
Hide resolved
case other => (other, false) | ||
} | ||
|
||
def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code can be much easier to read if we push down limit and offset together.
def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform {
case Limit...
case Offset...
case LimitAndOffset ...
...
}
the code should be similar to the plann rule SpecialLimits
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") | ||
} | ||
if (pushedDownOperators.offset.isDefined) { | ||
Map("PushedPaging" -> s"$topNStr OFFSET ${pushedDownOperators.offset.get}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to combine them into one metrics? I think we can have PushedTopN
and PushedOffset
together in the metrics.
@@ -44,6 +44,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit | |||
pushDownFilters, | |||
pushDownAggregates, | |||
pushDownLimits, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we still keep it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LIMIT should be pushed by pushDownLimitAndOffset
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
private def pushDownOffset( | ||
plan: LogicalPlan, | ||
offset: Int): (LogicalPlan, Boolean) = plan match { | ||
case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => | ||
val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) | ||
if (isPushed) { | ||
sHolder.pushedOffset = Some(offset) | ||
} | ||
(operation, isPushed) | ||
case p: Project => | ||
val (newChild, isPushed) = pushDownOffset(p.child, offset) | ||
(p.withNewChildren(Seq(newChild)), isPushed) | ||
case other => (other, false) | ||
} | ||
|
||
def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { | ||
case offset @ Offset(IntegerLiteral(n), child) => | ||
// For `dataset.offset(n)`, try to push down `OFFSET n`. | ||
val (newChild, isPushed) = pushDownOffset(child, n) | ||
if (isPushed) { | ||
newChild | ||
} else { | ||
offset | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def pushDownOffset( | |
plan: LogicalPlan, | |
offset: Int): (LogicalPlan, Boolean) = plan match { | |
case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => | |
val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) | |
if (isPushed) { | |
sHolder.pushedOffset = Some(offset) | |
} | |
(operation, isPushed) | |
case p: Project => | |
val (newChild, isPushed) = pushDownOffset(p.child, offset) | |
(p.withNewChildren(Seq(newChild)), isPushed) | |
case other => (other, false) | |
} | |
def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { | |
case offset @ Offset(IntegerLiteral(n), child) => | |
// For `dataset.offset(n)`, try to push down `OFFSET n`. | |
val (newChild, isPushed) = pushDownOffset(child, n) | |
if (isPushed) { | |
newChild | |
} else { | |
offset | |
} | |
private def pushDownOffset( | |
plan: LogicalPlan, | |
offset: Int): Boolean = plan match { | |
case sHolder: ScanBuilderHolder => | |
val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) | |
if (isPushed) { | |
sHolder.pushedOffset = Some(offset) | |
} | |
isPushed | |
case p: Project => | |
pushDownOffset(p.child, offset) | |
case _ => false | |
} | |
def pushDownLimitAndOffset(plan: LogicalPlan): LogicalPlan = plan.transform { | |
case offset @ Offset(IntegerLiteral(n), child) => | |
// For `dataset.offset(n)`, try to push down `OFFSET n`. | |
val isPushed = pushDownOffset(child, n) | |
if (isPushed) { | |
child | |
} else { | |
offset | |
} |
@@ -143,13 +143,15 @@ case class RowDataSourceScanExec( | |||
|
|||
val topNOrLimitInfo = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we don't need to change anything in topNOrLimitInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
if (canRemoveLimit) { | ||
// If we can remove limit, it indicates data source only have one partition. | ||
// For `dataset.limit(m).offset(n)`, try to push down `LIMIT (m - n) OFFSET n`. | ||
// For example, `dataset.limit(5).offset(3)`, we can push down `LIMIT 2 OFFSET 3`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment makes no sense here. We are pushing down operators, not pushing down a SQL query to JDBC. It's only a problem in JDBC that LIMIT x OFFSET y
means doing OFFSET first, then LIMIT
offset | ||
} | ||
case globalLimit @ OffsetAndLimit(offset, limitValue, child) => | ||
val isPushed = pushDownOffset(child, offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we can't push OFFSET first. We need to follow the API doc and push LIMIT first. Although OFFSET appears first in the query plan, we can still push down LIMIT by updating the limit value to limit + offset
|
||
val df5 = spark.read | ||
.table("h2.test.employee") | ||
.groupBy("DEPT").sum("SALARY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test the same query while we can fully push down agg? then offset can be pushed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, limit have the same issue. I think we can support them in a single PR.
|
||
val df10 = spark.read | ||
.table("h2.test.employee") | ||
.groupBy("DEPT").sum("SALARY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
case globalLimit @ OffsetAndLimit(offset, limit, child) => | ||
val (newChild, canRemoveLimit) = pushDownLimit(child, limit + offset) | ||
if (canRemoveLimit) { | ||
// If we can remove limit, it indicates data source only have one partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto:
// Try to push down OFFSET only if the LIMIT operator has been pushed and can be removed.
if (isPushed) { | ||
newChild | ||
} else { | ||
// For `dataset.offset(n).limit(m)`, try to push down `limit(m + n)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems in a wrong place.
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Show resolved
Hide resolved
Offset(Literal(offset), newChild) | ||
} | ||
} else { | ||
// For `dataset.offset(n).limit(m)`, try to push down `offset(n)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// For `df.offset(n).limit(m)`, since we can't push down `limit(m+n)`, try to push down `offset(n)` here.
// Spark still do `limit(m)`. | ||
val isPushed = pushDownOffset(child, offset) | ||
if (isPushed) { | ||
globalLimit.withNewChildren(Seq(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this correct? seems we may lose the local limit operator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe Limit(limit, child)
?
// For example, `dataset.limit(5).offset(3)`, we can push down `LIMIT 2 OFFSET 3`. | ||
// 2. For `dataset.offset(n).limit(m)`, try to push down `LIMIT m OFFSET n`. | ||
// For example, `dataset.offset(3).limit(5)`, we can push down `LIMIT 5 OFFSET 3`. | ||
// 3. For `dataset.offset(n)`, try to push down `OFFSET n`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the 1,2,3 comments make sense here. It's about the internal details of the spark pushdown rule, and data source shouldn't care about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
…rces/v2/V2ScanRelationPushDown.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…rces/v2/V2ScanRelationPushDown.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…rces/v2/V2ScanRelationPushDown.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…rces/v2/V2ScanRelationPushDown.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
thanks, merging to master! |
@cloud-fan Thank you for the hard work! |
### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
…e#491) * [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. new feature 'No' Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. Improve the implement of offset clause. 'No'. New feature. Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. `offset` API is very useful and construct test case more easily. 'No'. New feature. New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. code simplification no existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` Let DS V2 aggregate push-down supports order by expressions 'No'. New feature. New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. push down `OFFSET` could improves the performance. 'No'. New feature. New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. No for end users. For data source developers, they can trigger agg pushdowm more often. a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | DS V2 supports push down misc non-aggregate functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | DS V2 supports push down math functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. new feature 'No' Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. Improve the implement of offset clause. 'No'. New feature. Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. `offset` API is very useful and construct test case more easily. 'No'. New feature. New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. code simplification no existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` Let DS V2 aggregate push-down supports order by expressions 'No'. New feature. New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. push down `OFFSET` could improves the performance. 'No'. New feature. New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. No for end users. For data source developers, they can trigger agg pushdowm more often. a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | DS V2 supports push down misc non-aggregate functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | DS V2 supports push down math functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently, DS V2 push-down supports
LIMIT
butOFFSET
.If we can pushing down
OFFSET
to JDBC data source, it will be better performance.Why are the changes needed?
push down
OFFSET
could improves the performance.Does this PR introduce any user-facing change?
'No'.
New feature.
How was this patch tested?
New tests.