Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
KE-36478 support string function in DS V2 (apache#472)
* [SPARK-38997][SQL] DS V2 aggregate push-down supports group by expressions ### What changes were proposed in this pull request? Currently, Spark DS V2 aggregate push-down only supports group by column. But the SQL show below is very useful and common. ``` SELECT CASE WHEN 'SALARY' > 8000.00 AND 'SALARY' < 10000.00 THEN 'SALARY' ELSE 0.00 END AS key, SUM('SALARY') FROM "test"."employee" GROUP BY key ``` ### Why are the changes needed? Let DS V2 aggregate push-down supports group by expressions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36325 from beliefer/SPARK-38997. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> fix ut * [SPARK-39135][SQL] DS V2 aggregate partial push-down should supports group by without aggregate functions ### What changes were proposed in this pull request? Currently, the SQL show below not supported by DS V2 aggregate partial push-down. `select key from tab group by key` ### Why are the changes needed? Make DS V2 aggregate partial push-down supports group by without aggregate functions. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests Closes apache#36492 from beliefer/SPARK-39135. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the data type is correct ### What changes were proposed in this pull request? Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the DS V2 push-down will throw exception show below: ``` Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor driver): org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL statement: SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee" WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) > 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00) [50004-210] ``` H2Dialect should implement `getJDBCType` of `JdbcDialect`. ### Why are the changes needed? make the H2 data type is correct. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug for `H2Dialect`. ### How was this patch tested? New tests. Closes apache#36516 from beliefer/SPARK-39157. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39162][SQL] Jdbc dialect should decide which function could be pushed down ### What changes were proposed in this pull request? Regardless of whether the functions are ANSI or not, most databases are actually unsure of their support. So we should add a new API into `JdbcDialect` so that jdbc dialect decide which function could be pushed down. ### Why are the changes needed? Let function push-down more flexible. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists tests. Closes apache#36521 from beliefer/SPARK-39162. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: huaxingao <huaxin_gao@apple.com> * [SPARK-38897][SQL] DS V2 supports push down string functions ### What changes were proposed in this pull request? Currently, Spark have some string functions of ANSI standard. Please refer https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L503 These functions show below: `SUBSTRING,` `UPPER`, `LOWER`, `TRANSLATE`, `TRIM`, `OVERLAY` The mainstream databases support these functions show below. Function | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- `SUBSTRING` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes `UPPER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes `LOWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | YES | Yes | Yes | Yes | Yes `TRIM` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes `TRANSLATE` | Yes | No | Yes | No | Yes | Yes | No | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | No | No | No | No | No `OVERLAY` | Yes | No | No | No | Yes | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No DS V2 should supports push down these string functions. ### Why are the changes needed? DS V2 supports push down string functions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36330 from chenzhx/spark-master. Authored-by: chenzhx <chen@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression ### What changes were proposed in this pull request? This is a ANSI SQL and feature id is `F861` ``` <query expression> ::= [ <with clause> ] <query expression body> [ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ] <result offset clause> ::= OFFSET <offset row count> { ROW | ROWS } ``` For example: ``` SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name; customer_name | customer_gender ----------------------+----------------- Amy X. Lang | Female Anna H. Li | Female Brian O. Weaver | Male Craig O. Pavlov | Male Doug Z. Goldberg | Male Harold S. Jones | Male Jack E. Perkins | Male Joseph W. Overstreet | Male Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (14 rows) SELECT customer_name, customer_gender FROM customer_dimension WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8; customer_name | customer_gender -------------------+----------------- Kevin . Campbell | Male Raja Y. Wilson | Male Samantha O. Brown | Female Steve H. Gauthier | Male William . Nielson | Male William Z. Roy | Male (6 rows) ``` There are some mainstream database support the syntax. **Druid** https://druid.apache.org/docs/latest/querying/sql.html#offset **Kylin** http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX **Exasol** https://docs.exasol.com/sql/select.htm **Greenplum** http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html **MySQL** https://dev.mysql.com/doc/refman/5.6/en/select.html **Monetdb** https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT **PostgreSQL** https://www.postgresql.org/docs/11/queries-limit.html **Sqlite** https://www.sqlite.org/lang_select.html **Vertica** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset The description for design: **1**. Consider `OFFSET` as the special case of `LIMIT`. For example: `SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;` `SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;` **2**. Because the current implement of `LIMIT` has good performance. For example: `SELECT * FROM a limit 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) ``` and then the physical plan as below: ``` GlobalLimitExec (limit = 10) // Take the first 10 rows globally |--LocalLimitExec (limit = 10) // Take the first 10 rows locally ``` This operator reduce massive shuffle and has good performance. Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10) // Take the first 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally ``` Based on this situation, this PR produces the following operations. For example: `SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below: ``` GlobalLimit (limit = 10) |--LocalLimit (limit = 10) |--Offset (offset = 10) ``` After optimization, the above logic plan will be transformed to: ``` GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause |--LocalLimit (limit = 20) // 10 + offset = 20 ``` and then the physical plan as below: ``` GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally |--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally ``` Sometimes, the logic plan transformed to the physical plan as: ``` CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally ``` If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`. This SQL will be transformed to the physical plan as below: ``` TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally ``` **3**.In addition to the above, there is a special case that is only offset but no limit. For example: `SELECT * FROM a offset 10;` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause ``` If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it. A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future. Note: The origin PR to support this feature is apache#25416. Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature. ### Why are the changes needed? new feature ### Does this PR introduce any user-facing change? 'No' ### How was this patch tested? Exists and new UT Closes apache#35975 from beliefer/SPARK-28330. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39057][SQL] Offset could work without Limit ### What changes were proposed in this pull request? Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`. If we use `Offset` alone, there are two situations: 1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way. 2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator. For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below: ``` Offset (offset = 10) // Only offset clause |--Relation ``` and then the physical plan as below: ``` CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows |--JDBCRelation ``` or ``` GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows |--JDBCRelation ``` After this PR merged, users could input the SQL show below: ``` SELECT '' AS ten, unique1, unique2, stringu1 FROM onek ORDER BY unique1 OFFSET 990; ``` Note: apache#35975 supports offset clause, it create a logical node named `GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name. ### Why are the changes needed? Improve the implement of offset clause. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists test cases. Closes apache#36417 from beliefer/SPARK-28330_followup2. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-39159][SQL] Add new Dataset API for Offset ### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * fix ut * update spark version Co-authored-by: Jiaan Geng <beliefer@163.com>
- Loading branch information