diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 34371c9659423..73e4bfd78e577 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,7 +46,9 @@ object MimaExcludes { "org.apache.spark.api.java.JavaRDDLike.partitioner"), // Mima false positive (was a private[spark] class) ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.util.collection.PairIterator") + "org.apache.spark.util.collection.PairIterator"), + // SQL execution is considered private. + excludePackage("org.apache.spark.sql.execution") ) case v if v.startsWith("1.4") => Seq( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5c6379b8d44b0..0a17b10c521e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -36,6 +36,8 @@ object DefaultOptimizer extends Optimizer { // SubQueries are only needed for analysis and can be removed before execution. Batch("Remove SubQueries", FixedPoint(100), EliminateSubQueries) :: + Batch("Distinct", FixedPoint(100), + ReplaceDistinctWithAggregate) :: Batch("Operator Reordering", FixedPoint(100), UnionPushdown, CombineFilters, @@ -696,3 +698,15 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { LocalRelation(projectList.map(_.toAttribute), data.map(projection)) } } + +/** + * Replaces logical [[Distinct]] operator with an [[Aggregate]] operator. + * {{{ + * SELECT DISTINCT f1, f2 FROM t ==> SELECT f1, f2 FROM t GROUP BY f1, f2 + * }}} + */ +object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Distinct(child) => Aggregate(child.output, child.output, child) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 33a9e55a47dee..e77e5c27b687a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -339,6 +339,9 @@ case class Sample( override def output: Seq[Attribute] = child.output } +/** + * Returns a new logical plan that dedups input rows. + */ case class Distinct(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala new file mode 100644 index 0000000000000..df29a62ff0e15 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class ReplaceDistinctWithAggregateSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("ProjectCollapsing", Once, ReplaceDistinctWithAggregate) :: Nil + } + + test("replace distinct with aggregate") { + val input = LocalRelation('a.int, 'b.int) + + val query = Distinct(input) + val optimized = Optimize.execute(query.analyze) + + val correctAnswer = Aggregate(input.output, input.output, input) + + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d1a54ada7b191..4a224153e1a37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1311,7 +1311,7 @@ class DataFrame private[sql]( * @group dfops * @since 1.3.0 */ - override def distinct: DataFrame = Distinct(logicalPlan) + override def distinct: DataFrame = dropDuplicates() /** * @group basic diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d0a1ad00560d3..7a1331a39151a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -284,8 +284,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: RunnableCommand => ExecutedCommand(r) :: Nil case logical.Distinct(child) => - execution.Distinct(partial = false, - execution.Distinct(partial = true, planLater(child))) :: Nil + throw new IllegalStateException( + "logical distinct operator should have been replaced by aggregate in the optimizer") case logical.Repartition(numPartitions, shuffle, child) => execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil case logical.SortPartitions(sortExprs, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a30ade86441ca..fb42072f9d5a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -230,37 +230,6 @@ case class ExternalSort( override def outputOrdering: Seq[SortOrder] = sortOrder } -/** - * :: DeveloperApi :: - * Computes the set of distinct input rows using a HashSet. - * @param partial when true the distinct operation is performed partially, per partition, without - * shuffling the data. - * @param child the input query plan. - */ -@DeveloperApi -case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - - override def requiredChildDistribution: Seq[Distribution] = - if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil - - protected override def doExecute(): RDD[Row] = { - child.execute().mapPartitions { iter => - val hashSet = new scala.collection.mutable.HashSet[Row]() - - var currentRow: Row = null - while (iter.hasNext) { - currentRow = iter.next() - if (!hashSet.contains(currentRow)) { - hashSet.add(currentRow.copy()) - } - } - - hashSet.iterator - } - } -} - /** * :: DeveloperApi :: * Return a new RDD that has exactly `numPartitions` partitions.