-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39057][SQL] Offset could work without Limit #36417
Conversation
ping @dtenedor cc @cloud-fan |
*/ | ||
object RewriteOffsets extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case GlobalLimit(le, Offset(oe, grandChild)) => | ||
GlobalLimitAndOffset(le, oe, grandChild) | ||
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this logic to LimitPushDown
? then we don't need this rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -92,15 +92,17 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |||
CollectLimitExec(limit, planLater(child)) :: Nil | |||
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), | |||
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => | |||
TakeOrderedAndProjectExec( | |||
limit, order, child.output, planLater(child), Some(offset)) :: Nil | |||
TakeOrderedAndProjectExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the new indentation is incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm dazzled
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Show resolved
Hide resolved
val locallyLimited = | ||
offsetOpt.map(offset => childRDD.mapPartitionsInternal(_.take(limit + offset))) | ||
.getOrElse(childRDD.mapPartitionsInternal(_.take(limit))) | ||
val locallyLimited = if (offset > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to take care of the case if limit = -1
limit: Int, | ||
offset: Int, | ||
limit: Int = -1, | ||
offset: Int = 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a default value for offset
. If there is no offset, we should use GlobalLimitExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we can put a assert(offset > 0)
@@ -250,17 +295,19 @@ case class TakeOrderedAndProjectExec( | |||
limit: Int, | |||
sortOrder: Seq[SortOrder], | |||
projectList: Seq[NamedExpression], | |||
child: SparkPlan, offsetOpt: Option[Int] = None) extends UnaryExecNode { | |||
child: SparkPlan, offset: Int = 0) extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: put the parameter in a new line
OFFSET 1) | ||
GROUP BY t1b | ||
ORDER BY t1b NULLS last | ||
OFFSET 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have tests for non-subquery offset-only queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncommented in postgreSQL/limit.sql
@@ -750,6 +749,11 @@ object LimitPushDown extends Rule[LogicalPlan] { | |||
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child))) | |||
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly => | |||
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child)))) | |||
// Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset. | |||
case LocalLimit(le, Offset(oe @ IntegerLiteral(offset), grandChild)) if offset > 0 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the pushdown is correct even if offset == 0
. We don't need if offset > 0
case LocalLimit(le, Offset(oe @ IntegerLiteral(offset), grandChild)) if offset > 0 => | ||
Offset(oe, LocalLimit(Add(le, oe), grandChild)) | ||
// Eliminate Offset if offset value equals 0. | ||
case Offset(IntegerLiteral(0), child) => child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a new rule EliminateOffsets
for this. And there is one more idea: if child.maxRows < offset
, then we can simply turn the query into a empty local relation.
child | ||
case Offset(oe, child) | ||
if oe.foldable && child.maxRows.exists(_ <= oe.eval().asInstanceOf[Int]) => | ||
LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) | |
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming) |
override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => | ||
iter.take(limit + offset).drop(offset) | ||
override def doExecute(): RDD[InternalRow] = if (limit >= 0) { | ||
child.execute().mapPartitions(iter => iter.take(limit + offset).drop(offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR, but since we are touching here, let's use mapPartitionsInternal
which is more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
.map(offset => child.execute().map(_.copy()).takeOrdered(limit + offset)(ord).drop(offset)) | ||
.getOrElse(child.execute().map(_.copy()).takeOrdered(limit)(ord)) | ||
val data = if (offset > 0) { | ||
child.execute().map(_.copy()).takeOrdered(limit + offset)(ord).drop(offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, use mapPartitionsInternal
Utils.takeOrdered(iter.map(_.copy()), limit)(ord) | ||
}) | ||
val localTopK = if (offset > 0) { | ||
childRDD.mapPartitions { iter => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
thanks, merging to master! |
@cloud-fan Thank you very much ! |
### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-38997][SQL] DS V2 aggregate push-down supports group by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports group by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, SUM('SALARY') FROM "test"."employee" GROUP BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports group by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36325 from beliefer/SPARK-38997. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> fix ut * [SPARK-39135][SQL] DS V2 aggregate partial push-down should supports group by without aggregate functions ### What changes were proposed in this pull request? Currently, the SQL show below not supported by DS V2 aggregate partial push-down. `select key from tab group by key` ### Why are the changes needed? Make DS V2 aggregate partial push-down supports group by without aggregate functions. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36492 from beliefer/SPARK-39135. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the data type is correct ### What changes were proposed in this pull request? Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the DS V2 push-down will throw exception show below: ``` Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor driver): org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL statement: SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee" WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) > 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00) [50004-210] ``` H2Dialect should implement `getJDBCType` of `JdbcDialect`. ### Why are the changes needed? make the H2 data type is correct. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug for `H2Dialect`. ### How was this patch tested? New tests. Closes apache#36516 from beliefer/SPARK-39157. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39162][SQL] Jdbc dialect should decide which function could be pushed down ### What changes were proposed in this pull request? Regardless of whether the functions are ANSI or not, most databases are actually unsure of their support. So we should add a new API into `JdbcDialect` so that jdbc dialect decide which function could be pushed down. ### Why are the changes needed? Let function push-down more flexible. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists tests. Closes apache#36521 from beliefer/SPARK-39162. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: huaxingao <huaxin_gao@apple.com> * [SPARK-38897][SQL] DS V2 supports push down string functions ### What changes were proposed in this pull request? Currently, Spark have some string functions of ANSI standard. Please refer https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L503 These functions show below: `SUBSTRING,` `UPPER`, `LOWER`, `TRANSLATE`, `TRIM`, `OVERLAY` The mainstream databases support these functions show below. Function | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- `SUBSTRING` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes `UPPER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes `LOWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | YES | Yes | Yes | Yes | Yes `TRIM` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes `TRANSLATE` | Yes | No | Yes | No | Yes | Yes | No | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | No | No | No | No | No `OVERLAY` | Yes | No | No | No | Yes | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No DS V2 should supports push down these string functions. ### Why are the changes needed? DS V2 supports push down string functions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36330 from chenzhx/spark-master. Authored-by: chenzhx <chen@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * update spark version Co-authored-by: Jiaan Geng <beliefer@163.com>
### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
…e#491) * [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. new feature 'No' Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. Improve the implement of offset clause. 'No'. New feature. Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. `offset` API is very useful and construct test case more easily. 'No'. New feature. New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. code simplification no existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` Let DS V2 aggregate push-down supports order by expressions 'No'. New feature. New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. push down `OFFSET` could improves the performance. 'No'. New feature. New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. No for end users. For data source developers, they can trigger agg pushdowm more often. a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | DS V2 supports push down misc non-aggregate functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | DS V2 supports push down math functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset ### What changes were proposed in this pull request? This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports order by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator ### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns ### What changes were proposed in this pull request? It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. ### Why are the changes needed? Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. ### Does this PR introduce _any_ user-facing change? No for end users. For data source developers, they can trigger agg pushdowm more often. ### How was this patch tested? a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) ### What changes were proposed in this pull request? apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | ### Why are the changes needed? DS V2 supports push down misc non-aggregate functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) ### What changes were proposed in this pull request? apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | ### Why are the changes needed? DS V2 supports push down math functions supported by mainstream databases. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. new feature 'No' Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. Improve the implement of offset clause. 'No'. New feature. Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. `offset` API is very useful and construct test case more easily. 'No'. New feature. New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39180][SQL] Simplify the planning of limit and offset This PR simplifies the planning of limit and offset: 1. Unify the semantics of physical plans that need to deal with limit + offset. These physical plans always do limit first, then offset. The planner rule should set limit and offset properly, for different plans, such as limit + offset and offset + limit. 2. Refactor the planner rule `SpecialLimit` to reuse the code of planning `TakeOrderedAndProjectExec`. 3. Let `GlobalLimitExec` to handle offset as well, so that we can remove `GlobalLimitAndOffsetExec`. This matches `CollectLimitExec`. code simplification no existing tests Closes apache#36541 from cloud-fan/offset. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39037][SQL] DS V2 aggregate push-down supports order by expressions Currently, Spark DS V2 aggregate push-down only supports order by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, dept, name FROM "test"."employee" ORDER BY key ``` Let DS V2 aggregate push-down supports order by expressions 'No'. New feature. New tests Closes apache#36370 from beliefer/SPARK-39037. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-38978][SQL] DS V2 supports push down OFFSET operator Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. push down `OFFSET` could improves the performance. 'No'. New feature. New tests. Closes apache#36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * [SPARK-39340][SQL] DS v2 agg pushdown should allow dots in the name of top-level columns It turns out that I was wrong in apache#36727 . We still have the limitation (column name cannot contain dot) in master and 3.3 braches, in a very implicit way: The `V2ExpressionBuilder` has a boolean flag `nestedPredicatePushdownEnabled` whose default value is false. When it's false, it uses `PushableColumnWithoutNestedColumn` to match columns, which doesn't support dot in names. `V2ExpressionBuilder` is only used in 2 places: 1. `PushableExpression`. This is a pattern match that is only used in v2 agg pushdown 2. `PushablePredicate`. This is a pattern match that is used in various places, but all the caller sides set `nestedPredicatePushdownEnabled` to true. This PR removes the `nestedPredicatePushdownEnabled` flag from `V2ExpressionBuilder`, and makes it always support nested fields. `PushablePredicate` is also updated accordingly to remove the boolean flag, as it's always true. Fix a mistake to eliminate an unexpected limitation in DS v2 pushdown. No for end users. For data source developers, they can trigger agg pushdowm more often. a new test Closes apache#36945 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) apache#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard. Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362. The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No | | `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No | | `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | DS V2 supports push down misc non-aggregate functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36830 from beliefer/SPARK-38761_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39479][SQL] DS V2 supports push down math functions(non ANSI) apache#36140 makes DS V2 supports push down math functions are claimed by ANSI standard. Spark have a lot common used math functions are not claimed by ANSI standard. https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 The mainstream databases support these functions show below. | Function name | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase | | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | `SIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `SINH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `COS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `COSH` | Yes | Yes | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | | `TAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `TANH` | Yes | No | Yes | No | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | No | No | No | Yes | No | | `COT` | Yes | No | Yes | Yes | No | Yes | No | No | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | No | No | Yes | | `ASIN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ASINH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ACOS` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ACOSH` | Yes | Yes | No | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `ATAN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `ATAN2` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | | `ATANH` | Yes | Yes | No | No | No | No | No | Yes | Yes | Yes | No | No | No | No | No | No | No | No | No | No | Yes | Yes | No | | `LOG` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG10` | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `LOG2` | No | Yes | No | Yes | No | No | Yes | Yes | No | No | No | Yes | No | No | Yes | Yes | No | No | Yes | No | Yes | Yes | No | | `CBRT` | Yes | Yes | No | No | No | Yes | Yes | No | Yes | No | Yes | No | No | Yes | No | No | No | Yes | No | Yes | No | Yes | No | | `DEGREES` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `RADIANS` | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | Yes | | `ROUND` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | | `SIGN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | No | No | Yes | DS V2 supports push down math functions supported by mainstream databases. 'No'. New feature. New tests. Closes apache#36877 from beliefer/SPARK-39479. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Jiaan Geng <beliefer@163.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently,
Offset
must work withLimit
. The behavior not allow to use offset alone and add offset API intoDataFrame
.If we use
Offset
alone, there are two situations:Offset
is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debugOffset
in the way.Offset
is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator.For example,
SELECT * FROM a offset 10;
parsed to the logic plan as below:and then the physical plan as below:
or
After this PR merged, users could input the SQL show below:
Note: #35975 supports offset clause, it create a logical node named
GlobalLimitAndOffset
. In fact, we can avoid use this node and useOffset
instead and the latter is good with unify name.Why are the changes needed?
Improve the implement of offset clause.
Does this PR introduce any user-facing change?
'No'.
New feature.
How was this patch tested?
Exists test cases.