Skip to content

Commit

Permalink
[SPARK-27411][SQL] DataSourceV2Strategy should not eliminate subquery
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake after normalizing filters.
We have a sql with a scalar subquery:

``` scala
val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from t1)")
plan.explain(true)
```

And we get the log info of DataSourceV2Strategy:
```
Pushing operators to csv:examples/src/main/resources/t2.txt
Pushed Filters:
Post-Scan Filters: isnotnull(t2a#30)
Output: t2a#30, t2b#31
```

The `Post-Scan Filters` should contain the scalar subquery, but we eliminate it by mistake.
```
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('t2a > scalar-subquery#56 [])
   :  +- 'Project [unresolvedalias('max('t1a), None)]
   :     +- 'UnresolvedRelation `t1`
   +- 'UnresolvedRelation `t2`

== Analyzed Logical Plan ==
t2a: string, t2b: string
Project [t2a#30, t2b#31]
+- Filter (t2a#30 > scalar-subquery#56 [])
   :  +- Aggregate [max(t1a#13) AS max(t1a)apache-spark-on-k8s#63]
   :     +- SubqueryAlias `t1`
   :        +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt
   +- SubqueryAlias `t2`
      +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt

== Optimized Logical Plan ==
Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 []))
:  +- Aggregate [max(t1a#13) AS max(t1a)apache-spark-on-k8s#63]
:     +- Project [t1a#13]
:        +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt
+- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt

== Physical Plan ==
*(1) Project [t2a#30, t2b#31]
+- *(1) Filter isnotnull(t2a#30)
   +- *(1) BatchScan[t2a#30, t2b#31] class org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
```
## How was this patch tested?

ut

Closes apache#24321 from francis0407/SPARK-27411.

Authored-by: francis0407 <hanmingcong123@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
francis0407 authored and cloud-fan committed Apr 9, 2019
1 parent 3e4cfe9 commit 601fac2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,16 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()

val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery)
val normalizedFilters = DataSourceStrategy.normalizeFilters(
filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
withoutSubquery, relation.output)

// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
val (pushedFilters, postScanFiltersWithoutSubquery) =
pushFilters(scanBuilder, normalizedFilters)
val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,19 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
withTempView("t1") {
val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
Seq(2, 3).toDF("a").createTempView("t1")
val df = t2.where("i < (select max(a) from t1)").select('i)
val subqueries = df.queryExecution.executedPlan.collect {
case p => p.subqueries
}.flatten
assert(subqueries.length == 1)
checkAnswer(df, (0 until 3).map(i => Row(i)))
}
}
}


Expand Down

0 comments on commit 601fac2

Please sign in to comment.