Skip to content

Commit

Permalink
Infer filters from DPP
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Jul 26, 2020
1 parent 83ffef7 commit 69be17b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ object SubqueryExpression {
}

/**
* Returns true when an expression contains a subquery that has outer reference(s). The outer
* Returns true when an expression contains a subquery that has outer reference(s) except
* the [[org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery]]. The outer
* reference attributes are kept as children of subquery expression by
* [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
*/
def hasCorrelatedSubquery(e: Expression): Boolean = {
e.find {
case _: DynamicPruningSubquery => false
case s: SubqueryExpression => s.children.nonEmpty
case _ => false
}.isDefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class SparkOptimizer(
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Pushdown and infer Filters from PartitionPruning", fixedPoint,
PushDownPredicates,
InferFiltersFromConstraints) :+
Batch("Cleanup filters that cannot be pushed down", Once,
CleanupDynamicPruningFilters,
PruneFilters)) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,57 @@ abstract class DynamicPartitionPruningSuiteBase
)
}
}

test("Infer DPP filter to other partition column") {
val df = sql(
"""
|SELECT t11.store_id,
| t11.code,
| t3.product_id
|FROM (SELECT t1.store_id,
| t2.code
| FROM fact_stats t1
| JOIN code_stats t2
| ON t1.store_id = t2.store_id) t11
| JOIN product t3
| ON t11.store_id = t3.store_id AND t3.product_id < 3
|""".stripMargin)

assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2)
checkDistinctSubqueries(df, 1)
checkPartitionPruningPredicate(df, false, true)

checkAnswer(df,
Row(2, 20, 2) ::
Row(2, 20, 2) ::
Row(1, 10, 1) :: Nil
)
}

test("Should not infer DPP filter to other non-partition column") {
val df = sql(
"""
|SELECT t11.store_id,
| t11.country,
| t3.product_id
|FROM (SELECT t1.store_id,
| t2.country
| FROM fact_stats t1
| JOIN dim_stats t2
| ON t1.store_id = t2.store_id) t11
| JOIN product t3
| ON t11.store_id = t3.store_id AND t3.product_id < 3
|""".stripMargin)

assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
checkPartitionPruningPredicate(df, false, true)

checkAnswer(df,
Row(2, "NL", 2) ::
Row(2, "NL", 2) ::
Row(1, "NL", 1) :: Nil
)
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
Expand Down

0 comments on commit 69be17b

Please sign in to comment.