From d9620ef9111c8167f86632c4e4c343e9d70a6375 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Mon, 27 Nov 2017 14:23:42 -0800 Subject: [PATCH] review comments --- .../exchange/EnsureRequirements.scala | 78 ++++++++++++++++++- .../spark/sql/sources/BucketedReadSuite.scala | 31 +++++--- 2 files changed, 94 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 1e148ce06599e..82f0b9f5cd060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.execution.exchange +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.joins.ReorderJoinPredicates +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, + SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf /** @@ -32,8 +35,6 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private val reorderJoinPredicates = new ReorderJoinPredicates - private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize @@ -251,6 +252,75 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { operator.withNewChildren(children) } + /** + * When the physical operators are created for JOIN, the ordering of join keys is based on order + * in which the join keys appear in the user query. That might not match with the output + * partitioning of the join node's children (thus leading to extra sort / shuffle being + * introduced). This rule will change the ordering of the join keys to match with the + * partitioning of the join nodes' children. + */ + def reorderJoinPredicates(plan: SparkPlan): SparkPlan = { + def reorderJoinKeys( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + leftPartitioning: Partitioning, + rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = { + + def reorder(expectedOrderOfKeys: Seq[Expression], + currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { + val leftKeysBuffer = ArrayBuffer[Expression]() + val rightKeysBuffer = ArrayBuffer[Expression]() + + expectedOrderOfKeys.foreach(expression => { + val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression)) + leftKeysBuffer.append(leftKeys(index)) + rightKeysBuffer.append(rightKeys(index)) + }) + (leftKeysBuffer, rightKeysBuffer) + } + + if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) { + leftPartitioning match { + case HashPartitioning(leftExpressions, _) + if leftExpressions.length == leftKeys.length && + leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) => + reorder(leftExpressions, leftKeys) + + case _ => rightPartitioning match { + case HashPartitioning(rightExpressions, _) + if rightExpressions.length == rightKeys.length && + rightKeys.forall(x => rightExpressions.exists(_.semanticEquals(x))) => + reorder(rightExpressions, rightKeys) + + case _ => (leftKeys, rightKeys) + } + } + } else { + (leftKeys, rightKeys) + } + } + + plan.transformUp { + case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, + right) => + val (reorderedLeftKeys, reorderedRightKeys) = + reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning) + BroadcastHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition, + left, right) + + case ShuffledHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, right) => + val (reorderedLeftKeys, reorderedRightKeys) = + reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning) + ShuffledHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition, + left, right) + + case SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, left, right) => + val (reorderedLeftKeys, reorderedRightKeys) = + reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning) + SortMergeJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, condition, left, right) + } + } + def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator @ ShuffleExchangeExec(partitioning, child, _) => child.children match { @@ -259,6 +329,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { case _ => operator } case operator: SparkPlan => - ensureDistributionAndOrdering(reorderJoinPredicates.apply(operator)) + ensureDistributionAndOrdering(reorderJoinPredicates(operator)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 13527c88c26b4..9025859e91066 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -609,17 +609,26 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { - sql(""" - |SELECT * - |FROM ( - | SELECT a.i, a.j, a.k - | FROM bucketed_table a - | JOIN table1 b - | ON a.i = b.i - |) c - |JOIN table2 - |ON c.i = table2.i - |""".stripMargin).explain() + checkAnswer( + sql(""" + |SELECT ab.i, ab.j, ab.k, c.i, c.j, c.k + |FROM ( + | SELECT a.i, a.j, a.k + | FROM bucketed_table a + | JOIN table1 b + | ON a.i = b.i + |) ab + |JOIN table2 c + |ON ab.i = c.i + |""".stripMargin), + sql(""" + |SELECT a.i, a.j, a.k, c.i, c.j, c.k + |FROM bucketed_table a + |JOIN table1 b + |ON a.i = b.i + |JOIN table2 c + |ON a.i = c.i + |""".stripMargin)) } } }