From d74dee1336e7152cc0fb7d2b3bf1a44f4f452025 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 28 Sep 2017 09:20:37 -0700 Subject: [PATCH] [SPARK-22153][SQL] Rename ShuffleExchange -> ShuffleExchangeExec ## What changes were proposed in this pull request? For some reason when we added the Exec suffix to all physical operators, we missed this one. I was looking for this physical operator today and couldn't find it, because I was looking for ExchangeExec. ## How was this patch tested? This is a simple rename and should be covered by existing tests. Author: Reynold Xin Closes #19376 from rxin/SPARK-22153. --- .../spark/sql/execution/SparkStrategies.scala | 6 +-- .../exchange/EnsureRequirements.scala | 26 ++++++------- .../exchange/ExchangeCoordinator.scala | 38 +++++++++---------- ...change.scala => ShuffleExchangeExec.scala} | 10 ++--- .../apache/spark/sql/execution/limit.scala | 6 +-- .../streaming/IncrementalExecution.scala | 4 +- .../apache/spark/sql/CachedTableSuite.scala | 5 ++- .../org/apache/spark/sql/DataFrameSuite.scala | 10 ++--- .../org/apache/spark/sql/DatasetSuite.scala | 4 +- .../execution/ExchangeCoordinatorSuite.scala | 22 +++++------ .../spark/sql/execution/ExchangeSuite.scala | 12 +++--- .../spark/sql/execution/PlannerSuite.scala | 32 ++++++++-------- .../spark/sql/sources/BucketedReadSuite.scala | 10 ++--- .../EnsureStatefulOpPartitioningSuite.scala | 4 +- 14 files changed, 95 insertions(+), 94 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/{ShuffleExchange.scala => ShuffleExchangeExec.scala} (98%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4da7a73469537..92eaab5cd8f81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf @@ -411,7 +411,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Repartition(numPartitions, shuffle, child) => if (shuffle) { - ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil + ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil } else { execution.CoalesceExec(numPartitions, planLater(child)) :: Nil } @@ -446,7 +446,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case logical.RepartitionByExpression(expressions, child, numPartitions) => - exchange.ShuffleExchange(HashPartitioning( + exchange.ShuffleExchangeExec(HashPartitioning( expressions, numPartitions), planLater(child)) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 1da72f2e92329..d28ce60e276d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.internal.SQLConf * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] * of input data meets the * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for - * each operator by inserting [[ShuffleExchange]] Operators where required. Also ensure that the - * input partition ordering requirements are met. + * each operator by inserting [[ShuffleExchangeExec]] Operators where required. Also ensure that + * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions @@ -57,17 +57,17 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } /** - * Adds [[ExchangeCoordinator]] to [[ShuffleExchange]]s if adaptive query execution is enabled - * and partitioning schemes of these [[ShuffleExchange]]s support [[ExchangeCoordinator]]. + * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled + * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]]. */ private def withExchangeCoordinator( children: Seq[SparkPlan], requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = { val supportsCoordinator = - if (children.exists(_.isInstanceOf[ShuffleExchange])) { + if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) { // Right now, ExchangeCoordinator only support HashPartitionings. children.forall { - case e @ ShuffleExchange(hash: HashPartitioning, _, _) => true + case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true case child => child.outputPartitioning match { case hash: HashPartitioning => true @@ -94,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { targetPostShuffleInputSize, minNumPostShufflePartitions) children.zip(requiredChildDistributions).map { - case (e: ShuffleExchange, _) => + case (e: ShuffleExchangeExec, _) => // This child is an Exchange, we need to add the coordinator. e.copy(coordinator = Some(coordinator)) case (child, distribution) => @@ -138,7 +138,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val targetPartitioning = createPartitioning(distribution, defaultNumPreShufflePartitions) assert(targetPartitioning.isInstanceOf[HashPartitioning]) - ShuffleExchange(targetPartitioning, child, Some(coordinator)) + ShuffleExchangeExec(targetPartitioning, child, Some(coordinator)) } } else { // If we do not need ExchangeCoordinator, the original children are returned. @@ -162,7 +162,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => - ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child) + ShuffleExchangeExec(createPartitioning(distribution, defaultNumPreShufflePartitions), child) } // If the operator has multiple children and specifies child output distributions (e.g. join), @@ -215,8 +215,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { child match { // If child is an exchange, we replace it with // a new one having targetPartitioning. - case ShuffleExchange(_, c, _) => ShuffleExchange(targetPartitioning, c) - case _ => ShuffleExchange(targetPartitioning, child) + case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(targetPartitioning, c) + case _ => ShuffleExchangeExec(targetPartitioning, child) } } } @@ -246,9 +246,9 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } def apply(plan: SparkPlan): SparkPlan = plan.transformUp { - case operator @ ShuffleExchange(partitioning, child, _) => + case operator @ ShuffleExchangeExec(partitioning, child, _) => child.children match { - case ShuffleExchange(childPartitioning, baseChild, _)::Nil => + case ShuffleExchangeExec(childPartitioning, baseChild, _)::Nil => if (childPartitioning.guarantees(partitioning)) child else operator case _ => operator } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 9fc4ffb651ec8..78f11ca8d8c78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -35,9 +35,9 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * * A coordinator is constructed with three parameters, `numExchanges`, * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`. - * - `numExchanges` is used to indicated that how many [[ShuffleExchange]]s that will be registered - * to this coordinator. So, when we start to do any actual work, we have a way to make sure that - * we have got expected number of [[ShuffleExchange]]s. + * - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be + * registered to this coordinator. So, when we start to do any actual work, we have a way to + * make sure that we have got expected number of [[ShuffleExchangeExec]]s. * - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's * input data size. With this parameter, we can estimate the number of post-shuffle partitions. * This parameter is configured through @@ -47,28 +47,28 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * partitions. * * The workflow of this coordinator is described as follows: - * - Before the execution of a [[SparkPlan]], for a [[ShuffleExchange]] operator, + * - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator, * if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator. * This happens in the `doPrepare` method. - * - Once we start to execute a physical plan, a [[ShuffleExchange]] registered to this + * - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this * coordinator will call `postShuffleRDD` to get its corresponding post-shuffle * [[ShuffledRowRDD]]. - * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchange]] + * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]] * will immediately get its corresponding post-shuffle [[ShuffledRowRDD]]. * - If this coordinator has not made the decision on how to shuffle data, it will ask those - * registered [[ShuffleExchange]]s to submit their pre-shuffle stages. Then, based on the + * registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the * size statistics of pre-shuffle partitions, this coordinator will determine the number of * post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices * to a single post-shuffle partition whenever necessary. * - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered - * [[ShuffleExchange]]s. So, when a [[ShuffleExchange]] calls `postShuffleRDD`, this coordinator - * can lookup the corresponding [[RDD]]. + * [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this + * coordinator can lookup the corresponding [[RDD]]. * * The strategy used to determine the number of post-shuffle partitions is described as follows. * To determine the number of post-shuffle partitions, we have a target input size for a * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages - * corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and - * pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until + * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics + * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be * greater than the target size. * @@ -89,11 +89,11 @@ class ExchangeCoordinator( extends Logging { // The registered Exchange operators. - private[this] val exchanges = ArrayBuffer[ShuffleExchange]() + private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator. - private[this] val postShuffleRDDs: JMap[ShuffleExchange, ShuffledRowRDD] = - new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges) + private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] = + new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) // A boolean that indicates if this coordinator has made decision on how to shuffle data. // This variable will only be updated by doEstimationIfNecessary, which is protected by @@ -101,11 +101,11 @@ class ExchangeCoordinator( @volatile private[this] var estimated: Boolean = false /** - * Registers a [[ShuffleExchange]] operator to this coordinator. This method is only allowed to - * be called in the `doPrepare` method of a [[ShuffleExchange]] operator. + * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed + * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator. */ @GuardedBy("this") - def registerExchange(exchange: ShuffleExchange): Unit = synchronized { + def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized { exchanges += exchange } @@ -200,7 +200,7 @@ class ExchangeCoordinator( // Make sure we have the expected number of registered Exchange operators. assert(exchanges.length == numExchanges) - val newPostShuffleRDDs = new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges) + val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) // Submit all map stages val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]() @@ -255,7 +255,7 @@ class ExchangeCoordinator( } } - def postShuffleRDD(exchange: ShuffleExchange): ShuffledRowRDD = { + def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = { doEstimationIfNecessary() if (!postShuffleRDDs.containsKey(exchange)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 0d06d83fb2f3c..11c4aa9b4acf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.MutablePair /** * Performs a shuffle that will result in the desired `newPartitioning`. */ -case class ShuffleExchange( +case class ShuffleExchangeExec( var newPartitioning: Partitioning, child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { @@ -84,7 +84,7 @@ case class ShuffleExchange( */ private[exchange] def prepareShuffleDependency() : ShuffleDependency[Int, InternalRow, InternalRow] = { - ShuffleExchange.prepareShuffleDependency( + ShuffleExchangeExec.prepareShuffleDependency( child.execute(), child.output, newPartitioning, serializer) } @@ -129,9 +129,9 @@ case class ShuffleExchange( } } -object ShuffleExchange { - def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchange = { - ShuffleExchange(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator]) +object ShuffleExchangeExec { + def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = { + ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator]) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 1f515e29b4af5..13da4b26a5dcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.util.Utils /** @@ -40,7 +40,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( - ShuffleExchange.prepareShuffleDependency( + ShuffleExchangeExec.prepareShuffleDependency( locallyLimited, child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } @@ -153,7 +153,7 @@ case class TakeOrderedAndProjectExec( } } val shuffled = new ShuffledRowRDD( - ShuffleExchange.prepareShuffleDependency( + ShuffleExchangeExec.prepareShuffleDependency( localTopK, child.output, SinglePartition, serializer)) shuffled.mapPartitions { iter => val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 8e0aae39cabb6..82f879c763c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} -import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.streaming.OutputMode /** @@ -155,7 +155,7 @@ object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { child.execute().getNumPartitions == expectedPartitioning.numPartitions) { child } else { - ShuffleExchange(expectedPartitioning, child) + ShuffleExchangeExec(expectedPartitioning, child) } } so.withNewChildren(children) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 3e4f619431599..1e52445f28fc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan} import org.apache.spark.sql.execution.columnar._ -import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.storage.{RDDBlockId, StorageLevel} @@ -420,7 +420,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext * Verifies that the plan for `df` contains `expected` number of Exchange operators. */ private def verifyNumExchanges(df: DataFrame, expected: Int): Unit = { - assert(df.queryExecution.executedPlan.collect { case e: ShuffleExchange => e }.size == expected) + assert( + df.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => e }.size == expected) } test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6178661cf7b2b..0e2f2e5a193e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union} import org.apache.spark.sql.execution.{FilterExec, QueryExecution} import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext} @@ -1529,7 +1529,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { fail("Should not have back to back Aggregates") } atFirstAgg = true - case e: ShuffleExchange => atFirstAgg = false + case e: ShuffleExchangeExec => atFirstAgg = false case _ => } } @@ -1710,19 +1710,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val plan = join.queryExecution.executedPlan checkAnswer(join, df) assert( - join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1) + join.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size === 1) assert( join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 1) val broadcasted = broadcast(join) val join2 = join.join(broadcasted, "id").join(broadcasted, "id") checkAnswer(join2, df) assert( - join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1) + join2.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) assert( join2.queryExecution.executedPlan .collect { case e: BroadcastExchangeExec => true }.size === 1) assert( - join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 4) + join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size == 4) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 5015f3709f131..dace6825ee40e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1206,7 +1206,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val agg = cp.groupBy('id % 2).agg(count('id)) agg.queryExecution.executedPlan.collectFirst { - case ShuffleExchange(_, _: RDDScanExec, _) => + case ShuffleExchangeExec(_, _: RDDScanExec, _) => case BroadcastExchangeExec(_, _: RDDScanExec) => }.foreach { _ => fail( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index f1b5e3be5b63f..737eeb0af586e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite} import org.apache.spark.sql._ -import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchangeExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -300,13 +300,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val exchanges = agg.queryExecution.executedPlan.collect { - case e: ShuffleExchange => e + case e: ShuffleExchangeExec => e } assert(exchanges.length === 1) minNumPostShufflePartitions match { case Some(numPartitions) => exchanges.foreach { - case e: ShuffleExchange => + case e: ShuffleExchangeExec => assert(e.coordinator.isDefined) assert(e.outputPartitioning.numPartitions === 5) case o => @@ -314,7 +314,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { case None => exchanges.foreach { - case e: ShuffleExchange => + case e: ShuffleExchangeExec => assert(e.coordinator.isDefined) assert(e.outputPartitioning.numPartitions === 3) case o => @@ -351,13 +351,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchange => e + case e: ShuffleExchangeExec => e } assert(exchanges.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => exchanges.foreach { - case e: ShuffleExchange => + case e: ShuffleExchangeExec => assert(e.coordinator.isDefined) assert(e.outputPartitioning.numPartitions === 5) case o => @@ -365,7 +365,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { case None => exchanges.foreach { - case e: ShuffleExchange => + case e: ShuffleExchangeExec => assert(e.coordinator.isDefined) assert(e.outputPartitioning.numPartitions === 2) case o => @@ -407,13 +407,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchange => e + case e: ShuffleExchangeExec => e } assert(exchanges.length === 4) minNumPostShufflePartitions match { case Some(numPartitions) => exchanges.foreach { - case e: ShuffleExchange => + case e: ShuffleExchangeExec => assert(e.coordinator.isDefined) assert(e.outputPartitioning.numPartitions === 5) case o => @@ -459,13 +459,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val exchanges = join.queryExecution.executedPlan.collect { - case e: ShuffleExchange => e + case e: ShuffleExchangeExec => e } assert(exchanges.length === 3) minNumPostShufflePartitions match { case Some(numPartitions) => exchanges.foreach { - case e: ShuffleExchange => + case e: ShuffleExchangeExec => assert(e.coordinator.isDefined) assert(e.outputPartitioning.numPartitions === 5) case o => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index 59eaf4d1c29b7..aac8d56ba6201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, IdentityBroadcastMode, SinglePartition} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.test.SharedSQLContext @@ -31,7 +31,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext { val input = (1 to 1000).map(Tuple1.apply) checkAnswer( input.toDF(), - plan => ShuffleExchange(SinglePartition, plan), + plan => ShuffleExchangeExec(SinglePartition, plan), input.map(Row.fromTuple) ) } @@ -81,12 +81,12 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext { assert(plan sameResult plan) val part1 = HashPartitioning(output, 1) - val exchange1 = ShuffleExchange(part1, plan) - val exchange2 = ShuffleExchange(part1, plan) + val exchange1 = ShuffleExchangeExec(part1, plan) + val exchange2 = ShuffleExchangeExec(part1, plan) val part2 = HashPartitioning(output, 2) - val exchange3 = ShuffleExchange(part2, plan) + val exchange3 = ShuffleExchangeExec(part2, plan) val part3 = HashPartitioning(output ++ output, 2) - val exchange4 = ShuffleExchange(part3, plan) + val exchange4 = ShuffleExchangeExec(part3, plan) val exchange5 = ReusedExchangeExec(output, exchange4) assert(exchange1 sameResult exchange1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 63e17c7f372b0..86066362da9dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -214,7 +214,7 @@ class PlannerSuite extends SharedSQLContext { | JOIN tiny ON (small.key = tiny.key) """.stripMargin ).queryExecution.executedPlan.collect { - case exchange: ShuffleExchange => exchange + case exchange: ShuffleExchangeExec => exchange }.length assert(numExchanges === 5) } @@ -229,7 +229,7 @@ class PlannerSuite extends SharedSQLContext { | JOIN tiny ON (normal.key = tiny.key) """.stripMargin ).queryExecution.executedPlan.collect { - case exchange: ShuffleExchange => exchange + case exchange: ShuffleExchangeExec => exchange }.length assert(numExchanges === 5) } @@ -300,7 +300,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case e: ShuffleExchange => true }.isEmpty) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.isEmpty) { fail(s"Exchange should have been added:\n$outputPlan") } } @@ -338,7 +338,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case e: ShuffleExchange => true }.isEmpty) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.isEmpty) { fail(s"Exchange should have been added:\n$outputPlan") } } @@ -358,7 +358,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case e: ShuffleExchange => true }.nonEmpty) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.nonEmpty) { fail(s"Exchange should not have been added:\n$outputPlan") } } @@ -381,7 +381,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case e: ShuffleExchange => true }.nonEmpty) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.nonEmpty) { fail(s"No Exchanges should have been added:\n$outputPlan") } } @@ -391,7 +391,7 @@ class PlannerSuite extends SharedSQLContext { val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5) val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5) assert(!childPartitioning.satisfies(distribution)) - val inputPlan = ShuffleExchange(finalPartitioning, + val inputPlan = ShuffleExchangeExec(finalPartitioning, DummySparkPlan( children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, requiredChildDistribution = Seq(distribution), @@ -400,7 +400,7 @@ class PlannerSuite extends SharedSQLContext { val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case e: ShuffleExchange => true }.size == 2) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) { fail(s"Topmost Exchange should have been eliminated:\n$outputPlan") } } @@ -411,7 +411,7 @@ class PlannerSuite extends SharedSQLContext { val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 8) val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5) assert(!childPartitioning.satisfies(distribution)) - val inputPlan = ShuffleExchange(finalPartitioning, + val inputPlan = ShuffleExchangeExec(finalPartitioning, DummySparkPlan( children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, requiredChildDistribution = Seq(distribution), @@ -420,7 +420,7 @@ class PlannerSuite extends SharedSQLContext { val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case e: ShuffleExchange => true }.size == 1) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) { fail(s"Topmost Exchange should not have been eliminated:\n$outputPlan") } } @@ -430,7 +430,7 @@ class PlannerSuite extends SharedSQLContext { val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5) val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5) assert(!childPartitioning.satisfies(distribution)) - val shuffle = ShuffleExchange(finalPartitioning, + val shuffle = ShuffleExchangeExec(finalPartitioning, DummySparkPlan( children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, requiredChildDistribution = Seq(distribution), @@ -449,7 +449,7 @@ class PlannerSuite extends SharedSQLContext { if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) { fail(s"Should re-use the shuffle:\n$outputPlan") } - if (outputPlan.collect { case e: ShuffleExchange => true }.size != 1) { + if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size != 1) { fail(s"Should have only one shuffle:\n$outputPlan") } @@ -459,14 +459,14 @@ class PlannerSuite extends SharedSQLContext { Literal(1) :: Nil, Inner, None, - ShuffleExchange(finalPartitioning, inputPlan), - ShuffleExchange(finalPartitioning, inputPlan)) + ShuffleExchangeExec(finalPartitioning, inputPlan), + ShuffleExchangeExec(finalPartitioning, inputPlan)) val outputPlan2 = ReuseExchange(spark.sessionState.conf).apply(inputPlan2) if (outputPlan2.collect { case e: ReusedExchangeExec => true }.size != 2) { fail(s"Should re-use the two shuffles:\n$outputPlan2") } - if (outputPlan2.collect { case e: ShuffleExchange => true }.size != 2) { + if (outputPlan2.collect { case e: ShuffleExchangeExec => true }.size != 2) { fail(s"Should have only two shuffles:\n$outputPlan") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index eb9e6458fc61c..ab18905e2ddb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -302,10 +302,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { // check existence of shuffle assert( - joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft, + joinOperator.left.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined == shuffleLeft, s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}") assert( - joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight, + joinOperator.right.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined == shuffleRight, s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}") // check existence of sort @@ -506,7 +506,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { agged.sort("i", "j"), df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) - assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty) + assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) } } @@ -520,7 +520,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { agged.sort("i", "j"), df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) - assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty) + assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala index 044bb03480aa4..ed9823fbddfda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} -import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo} import org.apache.spark.sql.test.SharedSQLContext @@ -93,7 +93,7 @@ class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLCont fail(s"Was expecting an exchange but didn't get one in:\n$executed") } assert(exchange.get === - ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan), + ShuffleExchangeExec(expectedPartitioning(inputPlan.output.take(1)), inputPlan), s"Exchange didn't have expected properties:\n${exchange.get}") } else { assert(!executed.children.exists(_.isInstanceOf[Exchange]),