Skip to content

Commit

Permalink
[SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functi…
Browse files Browse the repository at this point in the history
…ons(non ANSI)

### What changes were proposed in this pull request?
#36039 makes DS V2 supports push down misc non-aggregate functions are claimed by ANSI standard.
Spark have a lot common used misc non-aggregate functions are not claimed by ANSI standard.
https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L362.

The mainstream databases support these functions show below.
|  Function name   | PostgreSQL  | ClickHouse  | H2  | MySQL  | Oracle  | Redshift  | Presto  | Teradata  | Snowflake  | DB2  | Vertica  | Exasol  | SqlServer  | Yellowbrick  | Impala  | Mariadb | Druid | Pig | Singlestore | ElasticSearch | SQLite | Influxdata | Sybase |
|  ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  | ----  |
| `GREATEST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No |
| `LEAST` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | No | No | No |
| `IF` | No | Yes | No | Yes | No | No | Yes | No | Yes | No | No | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | No | No |
| `RAND` | No | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | No | Yes |

### Why are the changes needed?
DS V2 supports push down misc non-aggregate functions supported by mainstream databases.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes #36830 from beliefer/SPARK-38761_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed Jun 24, 2022
1 parent 4ad7386 commit 1bb272d
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@
* <li>Since version: 3.3.0</li>
* </ul>
* </li>
* <li>Name: <code>GREATEST</code>
* <ul>
* <li>SQL semantic: <code>GREATEST(expr, ...)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>LEAST</code>
* <ul>
* <li>SQL semantic: <code>LEAST(expr, ...)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>RAND</code>
* <ul>
* <li>SQL semantic: <code>RAND([seed])</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>LN</code>
* <ul>
* <li>SQL semantic: <code>LN(expr)</code></li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public String build(Expression expr) {
return visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
case "ABS":
case "COALESCE":
case "GREATEST":
case "LEAST":
case "RAND":
case "LN":
case "EXP":
case "POWER":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
} else {
None
}
case Greatest(children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
if (children.length == childrenExpressions.length) {
Some(new GeneralScalarExpression("GREATEST", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case Least(children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
if (children.length == childrenExpressions.length) {
Some(new GeneralScalarExpression("LEAST", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case Rand(child, hideSeed) =>
if (hideSeed) {
Some(new GeneralScalarExpression("RAND", Array.empty[V2Expression]))
} else {
generateExpression(child)
.map(v => new GeneralScalarExpression("RAND", Array[V2Expression](v)))
}
case Log(child) => generateExpression(child)
.map(v => new GeneralScalarExpression("LN", Array[V2Expression](v)))
case Exp(child) => generateExpression(child)
Expand Down Expand Up @@ -195,6 +216,13 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
} else {
None
}
case iff: If =>
val childrenExpressions = iff.children.flatMap(generateExpression(_))
if (iff.children.length == childrenExpressions.length) {
Some(new GeneralScalarExpression("CASE_WHEN", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case substring: Substring =>
val children = if (substring.len == Literal(Integer.MAX_VALUE)) {
Seq(substring.str, substring.pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private[sql] object H2Dialect extends JdbcDialect {
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

private val supportedFunctions =
Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL",
"SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")
Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LN", "EXP", "POWER", "SQRT", "FLOOR",
"CEIL", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")

override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,26 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkFiltersRemoved(df10)
checkPushedInfo(df10, "PushedFilters: [ID IS NOT NULL, ID > 1], ")
checkAnswer(df10, Row("mary", 2))

val df11 = sql(
"""
|SELECT * FROM h2.test.employee
|WHERE GREATEST(bonus, 1100) > 1200 AND LEAST(salary, 10000) > 9000 AND RAND(1) < 1
|""".stripMargin)
checkFiltersRemoved(df11)
checkPushedInfo(df11, "PushedFilters: " +
"[(GREATEST(BONUS, 1100.0)) > 1200.0, (LEAST(SALARY, 10000.00)) > 9000.00, RAND(1) < 1.0]")
checkAnswer(df11, Row(2, "david", 10000, 1300, true))

val df12 = sql(
"""
|SELECT * FROM h2.test.employee
|WHERE IF(SALARY > 10000, SALARY, LEAST(SALARY, 1000)) > 1200
|""".stripMargin)
checkFiltersRemoved(df12)
checkPushedInfo(df12, "PushedFilters: " +
"[(CASE WHEN SALARY > 10000.00 THEN SALARY ELSE LEAST(SALARY, 1000.00) END) > 1200.00]")
checkAnswer(df12, Seq(Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
}

test("scan with filter push-down with ansi mode") {
Expand Down

0 comments on commit 1bb272d

Please sign in to comment.