From 4901af4ac5aa8a5b6130899cfd19c68622d7cf2f Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 24 Aug 2020 14:37:10 +0800 Subject: [PATCH] Remove no benefit inferred DPP on partition column when ${SQLConf.EXCHANGE_REUSE_ENABLED.key} is disabled --- .../spark/sql/execution/SparkOptimizer.scala | 1 + .../CleanupDynamicPruningFilters.scala | 22 ++- .../dynamicpruning/PartitionPruning.scala | 2 +- .../sql/DynamicPartitionPruningSuite.scala | 178 +++++++++++++----- 4 files changed, 145 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index d62a950aff2bc..a7ac92404a057 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -52,6 +52,7 @@ class SparkOptimizer( PushDownPredicates) :+ Batch("Cleanup filters that cannot be pushed down", Once, CleanupDynamicPruningFilters, + BooleanSimplification, PruneFilters)) ++ postHocOptimizationBatches :+ Batch("Extract Python UDFs", Once, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala index 5af6436033e56..c9029b4ad60b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.dynamicpruning.PartitionPruning._ import org.apache.spark.sql.internal.SQLConf /** @@ -32,10 +33,12 @@ import org.apache.spark.sql.internal.SQLConf */ object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelper { - private def isFilterOnNonPartition(condition: Expression, child: LogicalPlan): Boolean = { + // Check whether need to remove inferred DPP. + private def isRemoveInferred(condition: Expression, child: LogicalPlan): Boolean = { splitConjunctivePredicates(condition).exists { - case DynamicPruningSubquery(pruningKey, _, _, _, _, _) => - PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty + case DynamicPruningSubquery(pruningKey, buildQuery, buildKeys, index, _, _) => + getPartitionTableScan(pruningKey, child).isEmpty || (!SQLConf.get.exchangeReuseEnabled && + !pruningHasBenefit(pruningKey, child, buildKeys(index), buildQuery)) case _ => false } } @@ -46,12 +49,17 @@ object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelp } plan.transform { - // Remove any Filters with DynamicPruning that didn't filter on partition column`. - // This is inferred by Infer Filters from PartitionPruning. - case f @ Filter(condition, child) if isFilterOnNonPartition(condition, child) => + // Remove any DynamicPruning Filters that didn't filter on partition column and + // do not have has benefit. This is inferred by Infer Filters from PartitionPruning. + case f @ Filter(condition, child) + if SQLConf.get.constraintPropagationEnabled && isRemoveInferred(condition, child) => val newCondition = condition.transform { case DynamicPruningSubquery(pruningKey, _, _, _, _, _) - if PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty => + if getPartitionTableScan(pruningKey, child).isEmpty => + TrueLiteral + case DynamicPruningSubquery(pruningKey, buildQuery, buildKeys, index, _, _) + if !SQLConf.get.exchangeReuseEnabled && + !pruningHasBenefit(pruningKey, child, buildKeys(index), buildQuery) => TrueLiteral } f.copy(condition = newCondition) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 34a5b65a58903..68455d8aeb53a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -111,7 +111,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { * using column statistics if they are available, otherwise we use the config value of * `spark.sql.optimizer.joinFilterRatio`. */ - private def pruningHasBenefit( + def pruningHasBenefit( partExpr: Expression, partPlan: LogicalPlan, otherExpr: Expression, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 7005669cc2596..d902643ae2535 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1305,57 +1305,135 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("Infer DPP filter to other partition column") { - val df = sql( - """ - |SELECT t11.store_id, - | t11.code, - | t3.product_id - |FROM (SELECT t1.store_id, - | t2.code - | FROM fact_stats t1 - | JOIN code_stats t2 - | ON t1.store_id = t2.store_id) t11 - | JOIN product t3 - | ON t11.store_id = t3.store_id AND t3.product_id < 3 - |""".stripMargin) - - assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2) - checkDistinctSubqueries(df, 1) - checkPartitionPruningPredicate(df, false, true) - assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression has been pushed down.") - - checkAnswer(df, - Row(2, 20, 2) :: - Row(2, 20, 2) :: - Row(1, 10, 1) :: Nil - ) - } + test("Infer filters from DPP") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { + withTable("df1", "df2", "df3", "df4") { + spark.range(1000) + .select(col("id"), col("id").as("k")) + .write + .partitionBy("k") + .format(tableFormat) + .mode("overwrite") + .saveAsTable("df1") - test("Should not infer DPP filter to other non-partition column") { - val df = sql( - """ - |SELECT t11.store_id, - | t11.country, - | t3.product_id - |FROM (SELECT t1.store_id, - | t2.country - | FROM fact_stats t1 - | JOIN dim_stats t2 - | ON t1.store_id = t2.store_id) t11 - | JOIN product t3 - | ON t11.store_id = t3.store_id AND t3.product_id < 3 - |""".stripMargin) - - assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1) - checkPartitionPruningPredicate(df, false, true) - assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression should be removed.") - - checkAnswer(df, - Row(2, "NL", 2) :: - Row(2, "NL", 2) :: - Row(1, "NL", 1) :: Nil - ) + spark.range(1000) + .select(col("id"), col("id").as("k")) + .write + .partitionBy("k") + .format(tableFormat) + .mode("overwrite") + .saveAsTable("df2") + + spark.range(5) + .select(col("id"), col("id").as("k")) + .write + .partitionBy("k") + .format(tableFormat) + .mode("overwrite") + .saveAsTable("df3") + + spark.range(100) + .select(col("id"), col("id").as("k")) + .write + .format(tableFormat) + .mode("overwrite") + .saveAsTable("df4") + + spark.range(1000) + .select(col("id"), col("id").as("k")) + .write + .format(tableFormat) + .mode("overwrite") + .saveAsTable("df5") + + Given(s"Inferred DPP on partition column") + Seq(true, false).foreach { infer => + withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> s"$infer") { + val df = sql( + """ + |SELECT t1.id, + | df4.k + |FROM (SELECT df2.id, + | df1.k + | FROM df1 + | JOIN df2 + | ON df1.k = df2.k) t1 + | JOIN df4 + | ON t1.k = df4.k AND df4.id < 2 + |""".stripMargin) + if (infer) { + assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2) + } else { + assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1) + } + checkDistinctSubqueries(df, 1) + checkPartitionPruningPredicate(df, false, true) + assert(!checkUnpushedFilters(df)) + + checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil) + } + } + + Given("Remove no benefit inferred DPP on partition column " + + s"when ${SQLConf.EXCHANGE_REUSE_ENABLED.key} is disabled") + Seq(true, false).foreach { reuse => + Seq(true, false).foreach { broadcastOnly => + withSQLConf( + SQLConf.EXCHANGE_REUSE_ENABLED.key -> s"$reuse", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> s"$broadcastOnly") { + val df = sql( + """ + |SELECT t1.id, + | df4.k + |FROM (SELECT df3.id, + | df1.k + | FROM df1 + | JOIN df3 + | ON df1.k = df3.k) t1 + | JOIN df4 + | ON t1.k = df4.k AND df4.id < 2 + |""".stripMargin) + + if (!reuse) { + assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1) + checkDistinctSubqueries(df, 0) + checkPartitionPruningPredicate(df, !broadcastOnly, false) + } else { + assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2) + checkDistinctSubqueries(df, 1) + checkPartitionPruningPredicate(df, false, true) + } + assert(!checkUnpushedFilters(df)) + + checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil) + } + } + } + + Given("Remove inferred DPP on non-partition column") + withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") { + val df = sql( + """ + |SELECT t1.id, + | df4.k + |FROM (SELECT df5.id, + | df1.k + | FROM df1 + | JOIN df5 + | ON df1.k = df5.k) t1 + | JOIN df4 + | ON t1.k = df4.k AND df4.id < 2 + |""".stripMargin) + + assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1) + checkDistinctSubqueries(df, 1) + checkPartitionPruningPredicate(df, false, true) + assert(!checkUnpushedFilters(df)) + + checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil) + } + } + } } }