Skip to content

Commit

Permalink
[SPARK-2184][SQL] AddExchange isn't idempotent
Browse files Browse the repository at this point in the history
...redPartitioning.

Author: Michael Armbrust <michael@databricks.com>

Closes apache#1122 from marmbrus/fixAddExchange and squashes the following commits:

3417537 [Michael Armbrust] Don't bind partitioning expressions as that breaks comparison with requiredPartitioning.
  • Loading branch information
marmbrus authored and conviva-zz committed Sep 4, 2014
1 parent af3bf1b commit 9d9ed2b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
}

object BindReferences extends Logging {
def bindReference(expression: Expression, input: Seq[Attribute]): Expression = {
def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = {
expression.transform { case a: AttributeReference =>
attachTree(a, "Binding attribute") {
val ordinal = input.indexWhere(_.exprId == a.exprId)
Expand All @@ -83,6 +83,6 @@ object BindReferences extends Logging {
BoundReference(ordinal, a)
}
}
}
}.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {


class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))

def compare(a: Row, b: Row): Int = {
var i = 0
while (i < ordering.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.{SQLConf, SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering}
import org.apache.spark.sql.catalyst.expressions.{NoBind, MutableProjection, RowOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.util.MutablePair
* :: DeveloperApi ::
*/
@DeveloperApi
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode with NoBind {

override def outputPartitioning = newPartitioning

Expand All @@ -42,7 +42,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = child.execute().mapPartitions { iter =>
val hashExpressions = new MutableProjection(expressions)
val hashExpressions = new MutableProjection(expressions, child.output)
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
Expand All @@ -53,7 +53,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una

case RangePartitioning(sortingExpressions, numPartitions) =>
// TODO: RangePartitioner should take an Ordering.
implicit val ordering = new RowOrdering(sortingExpressions)
implicit val ordering = new RowOrdering(sortingExpressions, child.output)

val rdd = child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null](null, null)
Expand Down

0 comments on commit 9d9ed2b

Please sign in to comment.