Skip to content

Commit

Permalink
[SPARK-22153][SQL] Rename ShuffleExchange -> ShuffleExchangeExec
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Sep 28, 2017
1 parent 3b117d6 commit 9e9627d
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -411,7 +411,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
Expand Down Expand Up @@ -446,7 +446,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case logical.RepartitionByExpression(expressions, child, numPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
exchange.ShuffleExchangeExec(HashPartitioning(
expressions, numPartitions), planLater(child)) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
* Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
* of input data meets the
* [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
* each operator by inserting [[ShuffleExchange]] Operators where required. Also ensure that the
* each operator by inserting [[ShuffleExchangeExec]] Operators where required. Also ensure that the
* input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
Expand Down Expand Up @@ -57,17 +57,17 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}

/**
* Adds [[ExchangeCoordinator]] to [[ShuffleExchange]]s if adaptive query execution is enabled
* and partitioning schemes of these [[ShuffleExchange]]s support [[ExchangeCoordinator]].
* Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
* and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
*/
private def withExchangeCoordinator(
children: Seq[SparkPlan],
requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
val supportsCoordinator =
if (children.exists(_.isInstanceOf[ShuffleExchange])) {
if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
// Right now, ExchangeCoordinator only support HashPartitionings.
children.forall {
case e @ ShuffleExchange(hash: HashPartitioning, _, _) => true
case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
case child =>
child.outputPartitioning match {
case hash: HashPartitioning => true
Expand All @@ -94,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
targetPostShuffleInputSize,
minNumPostShufflePartitions)
children.zip(requiredChildDistributions).map {
case (e: ShuffleExchange, _) =>
case (e: ShuffleExchangeExec, _) =>
// This child is an Exchange, we need to add the coordinator.
e.copy(coordinator = Some(coordinator))
case (child, distribution) =>
Expand Down Expand Up @@ -138,7 +138,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
val targetPartitioning =
createPartitioning(distribution, defaultNumPreShufflePartitions)
assert(targetPartitioning.isInstanceOf[HashPartitioning])
ShuffleExchange(targetPartitioning, child, Some(coordinator))
ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
}
} else {
// If we do not need ExchangeCoordinator, the original children are returned.
Expand All @@ -162,7 +162,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (child, distribution) =>
ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
ShuffleExchangeExec(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
}

// If the operator has multiple children and specifies child output distributions (e.g. join),
Expand Down Expand Up @@ -215,8 +215,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child match {
// If child is an exchange, we replace it with
// a new one having targetPartitioning.
case ShuffleExchange(_, c, _) => ShuffleExchange(targetPartitioning, c)
case _ => ShuffleExchange(targetPartitioning, child)
case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(targetPartitioning, c)
case _ => ShuffleExchangeExec(targetPartitioning, child)
}
}
}
Expand Down Expand Up @@ -246,9 +246,9 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator @ ShuffleExchange(partitioning, child, _) =>
case operator @ ShuffleExchangeExec(partitioning, child, _) =>
child.children match {
case ShuffleExchange(childPartitioning, baseChild, _)::Nil =>
case ShuffleExchangeExec(childPartitioning, baseChild, _)::Nil =>
if (childPartitioning.guarantees(partitioning)) child else operator
case _ => operator
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
*
* A coordinator is constructed with three parameters, `numExchanges`,
* `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
* - `numExchanges` is used to indicated that how many [[ShuffleExchange]]s that will be registered
* - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be registered
* to this coordinator. So, when we start to do any actual work, we have a way to make sure that
* we have got expected number of [[ShuffleExchange]]s.
* we have got expected number of [[ShuffleExchangeExec]]s.
* - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
* input data size. With this parameter, we can estimate the number of post-shuffle partitions.
* This parameter is configured through
Expand All @@ -47,27 +47,27 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
* partitions.
*
* The workflow of this coordinator is described as follows:
* - Before the execution of a [[SparkPlan]], for a [[ShuffleExchange]] operator,
* - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator,
* if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
* This happens in the `doPrepare` method.
* - Once we start to execute a physical plan, a [[ShuffleExchange]] registered to this
* - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this
* coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
* [[ShuffledRowRDD]].
* If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchange]]
* If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]]
* will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
* - If this coordinator has not made the decision on how to shuffle data, it will ask those
* registered [[ShuffleExchange]]s to submit their pre-shuffle stages. Then, based on the
* registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the
* size statistics of pre-shuffle partitions, this coordinator will determine the number of
* post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
* to a single post-shuffle partition whenever necessary.
* - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
* [[ShuffleExchange]]s. So, when a [[ShuffleExchange]] calls `postShuffleRDD`, this coordinator
* [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this coordinator
* can lookup the corresponding [[RDD]].
*
* The strategy used to determine the number of post-shuffle partitions is described as follows.
* To determine the number of post-shuffle partitions, we have a target input size for a
* post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
* corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and
* corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics and
* pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
* adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
* greater than the target size.
Expand All @@ -89,23 +89,23 @@ class ExchangeCoordinator(
extends Logging {

// The registered Exchange operators.
private[this] val exchanges = ArrayBuffer[ShuffleExchange]()
private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()

// This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
private[this] val postShuffleRDDs: JMap[ShuffleExchange, ShuffledRowRDD] =
new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)

// A boolean that indicates if this coordinator has made decision on how to shuffle data.
// This variable will only be updated by doEstimationIfNecessary, which is protected by
// synchronized.
@volatile private[this] var estimated: Boolean = false

/**
* Registers a [[ShuffleExchange]] operator to this coordinator. This method is only allowed to
* be called in the `doPrepare` method of a [[ShuffleExchange]] operator.
* Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed to
* be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator.
*/
@GuardedBy("this")
def registerExchange(exchange: ShuffleExchange): Unit = synchronized {
def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized {
exchanges += exchange
}

Expand Down Expand Up @@ -200,7 +200,7 @@ class ExchangeCoordinator(
// Make sure we have the expected number of registered Exchange operators.
assert(exchanges.length == numExchanges)

val newPostShuffleRDDs = new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)

// Submit all map stages
val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
Expand Down Expand Up @@ -255,7 +255,7 @@ class ExchangeCoordinator(
}
}

def postShuffleRDD(exchange: ShuffleExchange): ShuffledRowRDD = {
def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = {
doEstimationIfNecessary()

if (!postShuffleRDDs.containsKey(exchange)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.MutablePair
/**
* Performs a shuffle that will result in the desired `newPartitioning`.
*/
case class ShuffleExchange(
case class ShuffleExchangeExec(
var newPartitioning: Partitioning,
child: SparkPlan,
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
Expand Down Expand Up @@ -84,7 +84,7 @@ case class ShuffleExchange(
*/
private[exchange] def prepareShuffleDependency()
: ShuffleDependency[Int, InternalRow, InternalRow] = {
ShuffleExchange.prepareShuffleDependency(
ShuffleExchangeExec.prepareShuffleDependency(
child.execute(), child.output, newPartitioning, serializer)
}

Expand Down Expand Up @@ -129,9 +129,9 @@ case class ShuffleExchange(
}
}

object ShuffleExchange {
def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchange = {
ShuffleExchange(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
object ShuffleExchangeExec {
def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = {
ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.util.Utils

/**
Expand All @@ -40,7 +40,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
ShuffleExchangeExec.prepareShuffleDependency(
locallyLimited, child.output, SinglePartition, serializer))
shuffled.mapPartitionsInternal(_.take(limit))
}
Expand Down Expand Up @@ -153,7 +153,7 @@ case class TakeOrderedAndProjectExec(
}
}
val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
ShuffleExchangeExec.prepareShuffleDependency(
localTopK, child.output, SinglePartition, serializer))
shuffled.mapPartitions { iter =>
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.streaming.OutputMode

/**
Expand Down Expand Up @@ -155,7 +155,7 @@ object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
child.execute().getNumPartitions == expectedPartitioning.numPartitions) {
child
} else {
ShuffleExchange(expectedPartitioning, child)
ShuffleExchangeExec(expectedPartitioning, child)
}
}
so.withNewChildren(children)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
Expand Down Expand Up @@ -420,7 +420,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
* Verifies that the plan for `df` contains `expected` number of Exchange operators.
*/
private def verifyNumExchanges(df: DataFrame, expected: Int): Unit = {
assert(df.queryExecution.executedPlan.collect { case e: ShuffleExchange => e }.size == expected)
assert(df.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => e }.size == expected)
}

test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union}
import org.apache.spark.sql.execution.{FilterExec, QueryExecution}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
Expand Down Expand Up @@ -1529,7 +1529,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
fail("Should not have back to back Aggregates")
}
atFirstAgg = true
case e: ShuffleExchange => atFirstAgg = false
case e: ShuffleExchangeExec => atFirstAgg = false
case _ =>
}
}
Expand Down Expand Up @@ -1710,14 +1710,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val plan = join.queryExecution.executedPlan
checkAnswer(join, df)
assert(
join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
join.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size === 1)
assert(
join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 1)
val broadcasted = broadcast(join)
val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
checkAnswer(join2, df)
assert(
join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
join2.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size === 1)
assert(
join2.queryExecution.executedPlan
.collect { case e: BroadcastExchangeExec => true }.size === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1206,7 +1206,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val agg = cp.groupBy('id % 2).agg(count('id))

agg.queryExecution.executedPlan.collectFirst {
case ShuffleExchange(_, _: RDDScanExec, _) =>
case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
case BroadcastExchangeExec(_, _: RDDScanExec) =>
}.foreach { _ =>
fail(
Expand Down
Loading

0 comments on commit 9e9627d

Please sign in to comment.