Skip to content

Commit

Permalink
[SPARK-7440][SQL] Remove physical Distinct operator in favor of Aggre…
Browse files Browse the repository at this point in the history
…gate.

Distinct is very similar to Aggregate, which is an important operator to optimize for.
This patch replaces Distinct with Aggregate in the optimizer, so Distinct will become
more efficient over time as we optimize Aggregate.
  • Loading branch information
rxin committed Jun 4, 2015
1 parent 9982d45 commit 87e4741
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ object DefaultOptimizer extends Optimizer {
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
Batch("Distinct", FixedPoint(100),
ReplaceDistinctWithAggregate) ::
Batch("Operator Reordering", FixedPoint(100),
UnionPushdown,
CombineFilters,
Expand Down Expand Up @@ -696,3 +698,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
}
}

/**
* Replaces logical [[Distinct]] operator with an [[Aggregate]] operator.
*/
object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Distinct(child) => Aggregate(child.output, child.output, child)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ case class Sample(
override def output: Seq[Attribute] = child.output
}

/**
* Returns a new logical plan that dedups input rows.
*/
case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class ReplaceDistinctWithAggregateSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("ProjectCollapsing", Once, ReplaceDistinctWithAggregate) :: Nil
}

test("replace distinct with aggregate") {
val input = LocalRelation('a.int, 'b.int)

val query = Distinct(input)
val optimized = Optimize.execute(query.analyze)

val correctAnswer = Aggregate(input.output, input.output, input)

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
override def distinct: DataFrame = Distinct(logicalPlan)
override def distinct: DataFrame = dropDuplicates()

/**
* @group basic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: RunnableCommand => ExecutedCommand(r) :: Nil

case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
throw new IllegalStateException(
"logical distinct operator should have been replaced by aggregate in the optimizer")
case logical.Repartition(numPartitions, shuffle, child) =>
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
case logical.SortPartitions(sortExprs, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,37 +230,6 @@ case class ExternalSort(
override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
* :: DeveloperApi ::
* Computes the set of distinct input rows using a HashSet.
* @param partial when true the distinct operation is performed partially, per partition, without
* shuffling the data.
* @param child the input query plan.
*/
@DeveloperApi
case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def requiredChildDistribution: Seq[Distribution] =
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil

protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
val hashSet = new scala.collection.mutable.HashSet[Row]()

var currentRow: Row = null
while (iter.hasNext) {
currentRow = iter.next()
if (!hashSet.contains(currentRow)) {
hashSet.add(currentRow.copy())
}
}

hashSet.iterator
}
}
}

/**
* :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
Expand Down

0 comments on commit 87e4741

Please sign in to comment.