Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12660] [SPARK-14967] [SQL] Implement Except Distinct by Left Anti Join #12736

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ class Analyzer(
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))
case i @ Except(left, right) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on its descendants
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,16 @@ trait CheckAnalysis extends PredicateHelper {
|Failure when resolving conflicting references in Intersect:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)
""".stripMargin)

case e: Except if !e.duplicateResolved =>
val conflictingAttributes = e.left.outputSet.intersect(e.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Except:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)

case o if !o.resolved =>
failAnalysis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
CombineUnions) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions) ::
Expand Down Expand Up @@ -232,17 +233,12 @@ object LimitPushDown extends Rule[LogicalPlan] {
}

/**
* Pushes certain operations to both sides of a Union or Except operator.
* Pushes certain operations to both sides of a Union operator.
* Operations that are safe to pushdown are listed as follows.
* Union:
* Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is
* safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT,
* we will not be able to pushdown Projections.
*
* Except:
* It is not safe to pushdown Projections through it because we need to get the
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
* with deterministic condition.
*/
object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {

Expand Down Expand Up @@ -310,17 +306,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
Filter(pushToRight(deterministic, rewrites), child)
}
Filter(nondeterministic, Union(newFirstChild +: newOtherChildren))

// Push down filter through EXCEPT
case Filter(condition, Except(left, right)) =>
val (deterministic, nondeterministic) = partitionByDeterministic(condition)
val rewrites = buildRewrites(left, right)
Filter(nondeterministic,
Except(
Filter(deterministic, left),
Filter(pushToRight(deterministic, rewrites), right)
)
)
}
}

Expand Down Expand Up @@ -1007,38 +992,32 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

case filter @ Filter(condition, child)
if child.isInstanceOf[Union] || child.isInstanceOf[Intersect] =>
// Union/Intersect could change the rows, so non-deterministic predicate can't be pushed down
case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
cond.deterministic
}
if (pushDown.nonEmpty) {
val pushDownCond = pushDown.reduceLeft(And)
val output = child.output
val newGrandChildren = child.children.map { grandchild =>
val output = union.output
val newGrandChildren = union.children.map { grandchild =>
val newCond = pushDownCond transform {
case e if output.exists(_.semanticEquals(e)) =>
grandchild.output(output.indexWhere(_.semanticEquals(e)))
}
assert(newCond.references.subsetOf(grandchild.outputSet))
Filter(newCond, grandchild)
}
val newChild = child.withNewChildren(newGrandChildren)
val newUnion = union.withNewChildren(newGrandChildren)
if (stayUp.nonEmpty) {
Filter(stayUp.reduceLeft(And), newChild)
Filter(stayUp.reduceLeft(And), newUnion)
} else {
newChild
newUnion
}
} else {
filter
}

case filter @ Filter(condition, e @ Except(left, _)) =>
pushDownPredicate(filter, e.left) { predicate =>
e.copy(left = Filter(predicate, left))
}

// two filters should be combine together by other rules
case filter @ Filter(_, f: Filter) => filter
// should not push predicates through sample, or will generate different results.
Expand Down Expand Up @@ -1404,6 +1383,27 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
}
}

/**
* Replaces logical [[Except]] operator with a left-anti [[Join]] operator.
* {{{
* SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
* ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
* }}}
*
* Note:
* 1. This rule is only applicable to EXCEPT DISTINCT. Do not use it for EXCEPT ALL.
* 2. This rule has to be done after de-duplicating the attributes; otherwise, the generated
* join conditions will be incorrect.
*/
object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Except(left, right) =>
assert(left.output.size == right.output.size)
val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) }
Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And)))
}
}

/**
* Removes literals from group expressions in [[Aggregate]], as they have no effect to the result
* but only makes the grouping key bigger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
}

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output

Expand All @@ -173,7 +176,8 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le
override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved

override def statistics: Statistics = {
Statistics(sizeInBytes = left.statistics.sizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,6 @@ class HiveTypeCoercionSuite extends PlanTest {
assert(r1.right.isInstanceOf[Project])
assert(r2.left.isInstanceOf[Project])
assert(r2.right.isInstanceOf[Project])

val r3 = wt(Except(firstTable, firstTable)).asInstanceOf[Except]
checkOutput(r3.left, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType))
checkOutput(r3.right, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType))

// Check if no Project is added
assert(r3.left.isInstanceOf[LocalRelation])
assert(r3.right.isInstanceOf[LocalRelation])
}

test("WidenSetOperationTypes for union") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,40 +710,6 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("intersect") {
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)

val originalQuery = Intersect(testRelation, testRelation2)
.where('a === 2L && 'b + Rand(10).as("rnd") === 3)

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = Intersect(
testRelation.where('a === 2L),
testRelation2.where('d === 2L))
.where('b + Rand(10).as("rnd") === 3)
.analyze

comparePlans(optimized, correctAnswer)
}

test("except") {
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)

val originalQuery = Except(testRelation, testRelation2)
.where('a === 2L && 'b + Rand(10).as("rnd") === 3)

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = Except(
testRelation.where('a === 2L),
testRelation2)
.where('b + Rand(10).as("rnd") === 3)
.analyze

comparePlans(optimized, correctAnswer)
}

test("expand") {
val agg = testRelation
.groupBy(Cube(Seq('a, 'b)))('a, 'b, sum('c))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor

Expand All @@ -29,6 +29,7 @@ class ReplaceOperatorSuite extends PlanTest {
val batches =
Batch("Replace Operators", FixedPoint(100),
ReplaceDistinctWithAggregate,
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin) :: Nil
}

Expand All @@ -46,6 +47,20 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("replace Except with Left-anti Join") {
val table1 = LocalRelation('a.int, 'b.int)
val table2 = LocalRelation('c.int, 'd.int)

val query = Except(table1, table2)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(table1.output, table1.output,
Join(table1, table2, LeftAnti, Option('a <=> 'c && 'b <=> 'd))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Distinct with Aggregate") {
val input = LocalRelation('a.int, 'b.int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class SetOperationSuite extends PlanTest {
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
val testRelation3 = LocalRelation('g.int, 'h.int, 'i.int)
val testUnion = Union(testRelation :: testRelation2 :: testRelation3 :: Nil)
val testExcept = Except(testRelation, testRelation2)

test("union: combine unions into one unions") {
val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation)
Expand All @@ -56,15 +55,6 @@ class SetOperationSuite extends PlanTest {
comparePlans(combinedUnionsOptimized, unionOptimized3)
}

test("except: filter to each side") {
val exceptQuery = testExcept.where('c >= 5)
val exceptOptimized = Optimize.execute(exceptQuery.analyze)
val exceptCorrectAnswer =
Except(testRelation.where('c >= 5), testRelation2.where('f >= 5)).analyze

comparePlans(exceptOptimized, exceptCorrectAnswer)
}

test("union: filter to each side") {
val unionQuery = testUnion.where('a === 1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
Expand All @@ -85,10 +75,4 @@ class SetOperationSuite extends PlanTest {
testRelation3.select('g) :: Nil).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("SPARK-10539: Project should not be pushed down through Intersect or Except") {
val exceptQuery = testExcept.select('a, 'b, 'c)
val exceptOptimized = Optimize.execute(exceptQuery.analyze)
comparePlans(exceptOptimized, exceptQuery.analyze)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Intersect(left, right) =>
throw new IllegalStateException(
"logical intersect operator should have been replaced by semi-join in the optimizer")
case logical.Except(left, right) =>
throw new IllegalStateException(
"logical except operator should have been replaced by anti-join in the optimizer")

case logical.DeserializeToObject(deserializer, objAttr, child) =>
execution.DeserializeToObject(deserializer, objAttr, planLater(child)) :: Nil
Expand Down Expand Up @@ -347,8 +350,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.Union(unionChildren) =>
execution.UnionExec(unionChildren.map(planLater)) :: Nil
case logical.Except(left, right) =>
execution.ExceptExec(planLater(left), planLater(right)) :: Nil
case g @ logical.Generate(generator, join, outer, _, _, child) =>
execution.GenerateExec(
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,6 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN
}
}

/**
* Physical plan for returning a table with the elements from left that are not in right using
* the built-in spark subtract function.
*/
case class ExceptExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode {
override def output: Seq[Attribute] = left.output

protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
}

/**
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testSetOperation() {
unioned.collectAsList());

Dataset<String> subtracted = ds.except(ds2);
Assert.assertEquals(Arrays.asList("abc", "abc"), subtracted.collectAsList());
Assert.assertEquals(Arrays.asList("abc"), subtracted.collectAsList());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty strange. I will check the exact behavior of our current Except operator. It sounds like it does not remove the duplicate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except apparently does not remove duplicates:

range(0, 10).registerTempTable("a")
range(5, 15).registerTempTable("b")
sql("(select * from a union all select * from a) except select * from b")

results in:

+---+
| id|
+---+
|  2|
|  2|
|  0|
|  0|
|  3|
|  3|
|  4|
|  4|
|  1|
|  1|
+---+

So that is weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to decide what we should do next? This PR is doing Except Distinct.

If we want to keep the existing behavior, which is Except All, we need to change the external API in Dataset.

However, the current SQL interface for Except is wrong. We need to correct it at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation (before this PR) is somewhere between EXCEPT and EXCEPT ALL it will will remove all rows if it finds a match (essentially eliminating duplicates), but it does not remove duplicates where there is no match. Lets follow the principle of least surprise and create a correct EXCEPT (one that removes duplicates).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, I see... Let me add a test case to cover it for ensuring it will not be broken again

Copy link
Member Author

@gatorsmile gatorsmile Apr 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the example to show why the current master is wrong.

  test("except") {
    val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
    val df_right = Seq(1, 3).toDF("id")

    checkAnswer(
      df_left.except(df_right),
      Row(2) :: Row(2) :: Row(4) :: Nil
    )
  }

For EXCEPT ALL, we should output

Row(2) :: Row(2) :: Row(3) :: Row(4) :: Nil

For EXCEPT DISTINCT, we should output

Row(2) :: Row(4) :: Nil

Copy link
Contributor

@cloud-fan cloud-fan Apr 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this PR also fix the semantic of Except, or it only added the optimization?

Copy link
Member Author

@gatorsmile gatorsmile Apr 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. After this PR, the behavior of EXCEPT is changed to the standard behavior of EXCEPT DISTINCT.

}

private static <T> Set<T> toSet(List<T> records) {
Expand Down
Loading