Skip to content

Commit

Permalink
Remove no benefit inferred DPP on partition column when ${SQLConf.EXC…
Browse files Browse the repository at this point in the history
…HANGE_REUSE_ENABLED.key} is disabled
  • Loading branch information
wangyum committed Aug 24, 2020
1 parent baa7796 commit 4901af4
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SparkOptimizer(
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
CleanupDynamicPruningFilters,
BooleanSimplification,
PruneFilters)) ++
postHocOptimizationBatches :+
Batch("Extract Python UDFs", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.dynamicpruning.PartitionPruning._
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -32,10 +33,12 @@ import org.apache.spark.sql.internal.SQLConf
*/
object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelper {

private def isFilterOnNonPartition(condition: Expression, child: LogicalPlan): Boolean = {
// Check whether need to remove inferred DPP.
private def isRemoveInferred(condition: Expression, child: LogicalPlan): Boolean = {
splitConjunctivePredicates(condition).exists {
case DynamicPruningSubquery(pruningKey, _, _, _, _, _) =>
PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty
case DynamicPruningSubquery(pruningKey, buildQuery, buildKeys, index, _, _) =>
getPartitionTableScan(pruningKey, child).isEmpty || (!SQLConf.get.exchangeReuseEnabled &&
!pruningHasBenefit(pruningKey, child, buildKeys(index), buildQuery))
case _ => false
}
}
Expand All @@ -46,12 +49,17 @@ object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelp
}

plan.transform {
// Remove any Filters with DynamicPruning that didn't filter on partition column`.
// This is inferred by Infer Filters from PartitionPruning.
case f @ Filter(condition, child) if isFilterOnNonPartition(condition, child) =>
// Remove any DynamicPruning Filters that didn't filter on partition column and
// do not have has benefit. This is inferred by Infer Filters from PartitionPruning.
case f @ Filter(condition, child)
if SQLConf.get.constraintPropagationEnabled && isRemoveInferred(condition, child) =>
val newCondition = condition.transform {
case DynamicPruningSubquery(pruningKey, _, _, _, _, _)
if PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty =>
if getPartitionTableScan(pruningKey, child).isEmpty =>
TrueLiteral
case DynamicPruningSubquery(pruningKey, buildQuery, buildKeys, index, _, _)
if !SQLConf.get.exchangeReuseEnabled &&
!pruningHasBenefit(pruningKey, child, buildKeys(index), buildQuery) =>
TrueLiteral
}
f.copy(condition = newCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
* using column statistics if they are available, otherwise we use the config value of
* `spark.sql.optimizer.joinFilterRatio`.
*/
private def pruningHasBenefit(
def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,57 +1305,135 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

test("Infer DPP filter to other partition column") {
val df = sql(
"""
|SELECT t11.store_id,
| t11.code,
| t3.product_id
|FROM (SELECT t1.store_id,
| t2.code
| FROM fact_stats t1
| JOIN code_stats t2
| ON t1.store_id = t2.store_id) t11
| JOIN product t3
| ON t11.store_id = t3.store_id AND t3.product_id < 3
|""".stripMargin)

assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2)
checkDistinctSubqueries(df, 1)
checkPartitionPruningPredicate(df, false, true)
assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression has been pushed down.")

checkAnswer(df,
Row(2, 20, 2) ::
Row(2, 20, 2) ::
Row(1, 10, 1) :: Nil
)
}
test("Infer filters from DPP") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
withTable("df1", "df2", "df3", "df4") {
spark.range(1000)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df1")

test("Should not infer DPP filter to other non-partition column") {
val df = sql(
"""
|SELECT t11.store_id,
| t11.country,
| t3.product_id
|FROM (SELECT t1.store_id,
| t2.country
| FROM fact_stats t1
| JOIN dim_stats t2
| ON t1.store_id = t2.store_id) t11
| JOIN product t3
| ON t11.store_id = t3.store_id AND t3.product_id < 3
|""".stripMargin)

assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
checkPartitionPruningPredicate(df, false, true)
assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression should be removed.")

checkAnswer(df,
Row(2, "NL", 2) ::
Row(2, "NL", 2) ::
Row(1, "NL", 1) :: Nil
)
spark.range(1000)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df2")

spark.range(5)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df3")

spark.range(100)
.select(col("id"), col("id").as("k"))
.write
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df4")

spark.range(1000)
.select(col("id"), col("id").as("k"))
.write
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df5")

Given(s"Inferred DPP on partition column")
Seq(true, false).foreach { infer =>
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> s"$infer") {
val df = sql(
"""
|SELECT t1.id,
| df4.k
|FROM (SELECT df2.id,
| df1.k
| FROM df1
| JOIN df2
| ON df1.k = df2.k) t1
| JOIN df4
| ON t1.k = df4.k AND df4.id < 2
|""".stripMargin)
if (infer) {
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2)
} else {
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
}
checkDistinctSubqueries(df, 1)
checkPartitionPruningPredicate(df, false, true)
assert(!checkUnpushedFilters(df))

checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
}
}

Given("Remove no benefit inferred DPP on partition column " +
s"when ${SQLConf.EXCHANGE_REUSE_ENABLED.key} is disabled")
Seq(true, false).foreach { reuse =>
Seq(true, false).foreach { broadcastOnly =>
withSQLConf(
SQLConf.EXCHANGE_REUSE_ENABLED.key -> s"$reuse",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> s"$broadcastOnly") {
val df = sql(
"""
|SELECT t1.id,
| df4.k
|FROM (SELECT df3.id,
| df1.k
| FROM df1
| JOIN df3
| ON df1.k = df3.k) t1
| JOIN df4
| ON t1.k = df4.k AND df4.id < 2
|""".stripMargin)

if (!reuse) {
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
checkDistinctSubqueries(df, 0)
checkPartitionPruningPredicate(df, !broadcastOnly, false)
} else {
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2)
checkDistinctSubqueries(df, 1)
checkPartitionPruningPredicate(df, false, true)
}
assert(!checkUnpushedFilters(df))

checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
}
}
}

Given("Remove inferred DPP on non-partition column")
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
val df = sql(
"""
|SELECT t1.id,
| df4.k
|FROM (SELECT df5.id,
| df1.k
| FROM df1
| JOIN df5
| ON df1.k = df5.k) t1
| JOIN df4
| ON t1.k = df4.k AND df4.id < 2
|""".stripMargin)

assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
checkDistinctSubqueries(df, 1)
checkPartitionPruningPredicate(df, false, true)
assert(!checkUnpushedFilters(df))

checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
}
}
}
}
}

Expand Down

0 comments on commit 4901af4

Please sign in to comment.