From 43597eb391d8448d8963dc8c4cf8d825f95a18c9 Mon Sep 17 00:00:00 2001 From: yumwang Date: Fri, 15 Apr 2022 10:26:04 +0800 Subject: [PATCH] [CARMEL-5933] Runtime Filter supports pruning side has window (#909) * Runtime Filter supports pruning side has window * Update DynamicDataPruningSuite.scala --- .../dynamicpruning/PartitionPruning.scala | 32 ++++++++--- .../spark/sql/DynamicDataPruningSuite.scala | 56 +++++++++++++++++++ 2 files changed, 79 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 2d89b3b9cc664..81b4360ef308e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -190,19 +190,33 @@ object PartitionPruning extends Rule[LogicalPlan] } } + // Make sure injected filters could push through Shuffle, see PushPredicateThroughNonJoin + private def probablyPushThroughShuffle(exp: Expression, plan: LogicalPlan): Boolean = { + plan match { + case j: Join if !canPlanAsBroadcastHashJoin(j, conf) => true + case a @ Aggregate(groupingExps, aggExps, child) + if aggExps.forall(_.deterministic) && groupingExps.nonEmpty && + replaceAlias(exp, getAliasMap(a)).references.subsetOf(child.outputSet) => true + case w: Window + if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) && + exp.references.subsetOf(AttributeSet(w.partitionSpec.flatMap(_.references))) => true + case p: Project => + probablyPushThroughShuffle(replaceAlias(exp, getAliasMap(p)), p.child) + case other => + other.children.exists { p => + if (exp.references.subsetOf(p.outputSet)) probablyPushThroughShuffle(exp, p) else false + } + } + } + private def dataPruningHasBenefit( prunRelation: LogicalRelation, + exp: Expression, prunPlan: LogicalPlan, otherPlan: LogicalPlan, canBuildBroadcast: Boolean): Boolean = { if (canBuildBroadcast) { - val shuffleStages = prunPlan.collect { - case j @ Join(left, right, _, _, hint) - if !canBroadcastBySize(left, SQLConf.get) && !canBroadcastBySize(right, SQLConf.get) - && !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) => j - case a: Aggregate => a - } - shuffleStages.exists(_.collectLeaves().exists(_.equals(prunRelation))) && + probablyPushThroughShuffle(exp, prunPlan) && prunRelation.stats.sizeInBytes >= SQLConf.get.dynamicDataPruningSideThreshold } else { val estimatePruningSideSize = @@ -251,7 +265,7 @@ object PartitionPruning extends Rule[LogicalPlan] canPruneLeft(joinType) && supportDynamicPruning(right) && (canBroadcastBySize(right, conf) || hintToBroadcastRight(hint)) && - dataPruningHasBenefit(scan.logicalRelation, left, right, + dataPruningHasBenefit(scan.logicalRelation, l, left, right, canBuildBroadcastRight(joinType)) => newLeft = insertDataPredicate(l, newLeft, r, right, rightKeys) case _ => @@ -269,7 +283,7 @@ object PartitionPruning extends Rule[LogicalPlan] canPruneRight(joinType) && supportDynamicPruning(left) && (canBroadcastBySize(left, conf) || hintToBroadcastLeft(hint)) && - dataPruningHasBenefit(scan.logicalRelation, right, left, + dataPruningHasBenefit(scan.logicalRelation, r, right, left, canBuildBroadcastLeft(joinType)) => newRight = insertDataPredicate(r, newRight, l, left, leftKeys) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicDataPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicDataPruningSuite.scala index 48f1e5ddd467e..3a8002509f3ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicDataPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicDataPruningSuite.scala @@ -624,6 +624,62 @@ abstract class DynamicDataPruningSuiteBase } } + test("Aggregate should triggers shuffle pruning") { + withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K", + SQLConf.DYNAMIC_DATA_PRUNING_SIDE_THRESHOLD.key -> "10K") { + val df = sql( + """ + |SELECT t11.a, + | t11.b + |FROM (SELECT DISTINCT * FROM t1) t11 + | JOIN t3 + | ON t11.a = t3.a AND t3.b < 2 + |""".stripMargin) + + checkDataPruningPredicate(df, false, true) + checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil) + } + } + + test("Window should triggers shuffle pruning") { + withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K", + SQLConf.DYNAMIC_DATA_PRUNING_SIDE_THRESHOLD.key -> "10K") { + val df = sql( + """ + |SELECT t11.a, + | t11.b, + | t11.rn + |FROM (SELECT *, Row_number() OVER (PARTITION BY a ORDER BY b) rn FROM t1) t11 + | JOIN t3 + | ON t11.a = t3.a AND t3.b < 2 + |""".stripMargin) + + checkDataPruningPredicate(df, false, true) + checkAnswer(df, Row(0, 0, 1) :: Row(1, 1, 1) :: Nil) + } + } + + test("Window should not triggers shuffle pruning if can't push through Shuffle") { + withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K", + SQLConf.DYNAMIC_DATA_PRUNING_SIDE_THRESHOLD.key -> "10K") { + val df = sql( + """ + |SELECT t11.a, + | t11.b, + | t11.rn + |FROM (SELECT *, Row_number() OVER (PARTITION BY b ORDER BY a) rn FROM t1) t11 + | JOIN t3 + | ON t11.a = t3.a AND t3.b < 2 + |""".stripMargin) + + checkDataPruningPredicate(df, false, false) + checkAnswer(df, Row(0, 0, 1) :: Row(1, 1, 1) :: Nil) + } + } + test("CARMEL-5442: Fall back to true if InSet size exceed DYNAMIC_PRUNING_MAX_INSET_NUM") { withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K",