From a2de20c0e6857653de63f46052935784be87d34f Mon Sep 17 00:00:00 2001 From: lijunqing Date: Fri, 27 Dec 2019 11:52:39 +0800 Subject: [PATCH] [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by ### Why are the changes needed? `EnsureRequirements` adds `ShuffleExchangeExec` (RangePartitioning) after Sort if `RoundRobinPartitioning` behinds it. This will cause 2 shuffles, and the number of partitions in the final stage is not the number specified by `RoundRobinPartitioning. **Example SQL** ``` SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a ``` **BEFORE** ``` == Physical Plan == *(1) Sort [a#0 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 200), true, [id=#11] +- Exchange RoundRobinPartitioning(5), false, [id=#9] +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1] ``` **AFTER** ``` == Physical Plan == *(1) Sort [a#0 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 5), true, [id=#11] +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1] ``` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run suite Tests and add new test for this. Closes #26946 from stczwd/RoundRobinPartitioning. Lead-authored-by: lijunqing Co-authored-by: stczwd Signed-off-by: Wenchen Fan --- .../exchange/EnsureRequirements.scala | 2 + .../spark/sql/ConfigBehaviorSuite.scala | 8 ++-- .../spark/sql/execution/PlannerSuite.scala | 46 +++++++++++++++++++ 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 068e0164443dd..033404ccac44d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) + case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) => + ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(defaultNumPreShufflePartitions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala index 56ae904e83fdb..0e090c6772d41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala @@ -39,9 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 10000 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + val data = spark.range(0, n, 1, 10).sort($"id".desc) .selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect() // Compute histogram for the number of records per partition post sort @@ -55,12 +53,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) { // The default chi-sq value should be low - assert(computeChiSquareTest() < 100) + assert(computeChiSquareTest() < 10) withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") { // If we only sample one point, the range boundaries will be pretty bad and the // chi-sq value would be very high. - assert(computeChiSquareTest() > 300) + assert(computeChiSquareTest() > 100) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3dea0b1ce937c..017e548809413 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -421,6 +421,52 @@ class PlannerSuite extends SharedSparkSession { } } + test("SPARK-30036: Remove unnecessary RoundRobinPartitioning " + + "if SortExec is followed by RoundRobinPartitioning") { + val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) + val partitioning = RoundRobinPartitioning(5) + assert(!partitioning.satisfies(distribution)) + + val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil, + global = true, + child = ShuffleExchangeExec( + partitioning, + DummySparkPlan(outputPartitioning = partitioning))) + val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) + assert(outputPlan.find { + case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true + case _ => false + }.isEmpty, + "RoundRobinPartitioning should be changed to RangePartitioning") + + val query = testData.select('key, 'value).repartition(2).sort('key.asc) + assert(query.rdd.getNumPartitions == 2) + assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50)) + } + + test("SPARK-30036: Remove unnecessary HashPartitioning " + + "if SortExec is followed by HashPartitioning") { + val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) + val partitioning = HashPartitioning(Literal(1) :: Nil, 5) + assert(!partitioning.satisfies(distribution)) + + val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil, + global = true, + child = ShuffleExchangeExec( + partitioning, + DummySparkPlan(outputPartitioning = partitioning))) + val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) + assert(outputPlan.find { + case ShuffleExchangeExec(_: HashPartitioning, _, _) => true + case _ => false + }.isEmpty, + "HashPartitioning should be changed to RangePartitioning") + + val query = testData.select('key, 'value).repartition(5, 'key).sort('key.asc) + assert(query.rdd.getNumPartitions == 5) + assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 20)) + } + test("EnsureRequirements does not eliminate Exchange with different partitioning") { val distribution = ClusteredDistribution(Literal(1) :: Nil) val partitioning = HashPartitioning(Literal(2) :: Nil, 5)