Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29145][SQL] Support sub-queries in join conditions #25854

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case j: Join if j.childrenResolved =>
resolveSubQueries(j, Seq(j, j.left, j.right))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't recall the details, but why it's not Seq(j.left, j.right)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't recall the details, but why it's not Seq(j.left, j.right)?

Should be a mistake, raise a pr and remove this?

case s: SupportsSubquery if s.childrenResolved =>
resolveSubQueries(s, s.children)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,10 @@ trait CheckAnalysis extends PredicateHelper {

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter | _: SupportsSubquery => // Ok
case _: Filter | _: SupportsSubquery | _: Join => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter and a few commands: $plan")
s" Filter/Join and a few commands: $plan")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, Project}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._

/**
* Unit tests for [[ResolveSubquery]].
Expand All @@ -29,8 +30,10 @@ class ResolveSubquerySuite extends AnalysisTest {

val a = 'a.int
val b = 'b.int
val c = 'c.int
val t1 = LocalRelation(a)
val t2 = LocalRelation(b)
val t3 = LocalRelation(c)

test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") {
val expr = Filter(
Expand All @@ -41,4 +44,13 @@ class ResolveSubquerySuite extends AnalysisTest {
assert(m.contains(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses"))
}

test("SPARK-29145 Support subquery in join condition") {
val expr = Join(t1,
t2,
Inner,
Some(InSubquery(Seq(a), ListQuery(Project(Seq(UnresolvedAttribute("c")), t3)))),
JoinHint.NONE)
assertAnalysisSuccess(expr)
}
}
148 changes: 148 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,154 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-29145: JOIN Condition use QueryList") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to SQLQueryTestSuite?

It sounds like it does not contain any test case that check the EXISTS subquery? Could you also add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to SQLQueryTestSuite?

It sounds like it does not contain any test case that check the EXISTS subquery? Could you also add it?

Ok, will raise a pr follow your comment.

withTempView("s1", "s2", "s3") {
Seq(1, 3, 5, 7, 9).toDF("id").createOrReplaceTempView("s1")
Seq(1, 3, 4, 6, 9).toDF("id").createOrReplaceTempView("s2")
Seq(3, 4, 6, 9).toDF("id").createOrReplaceTempView("s3")

checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id IN (SELECT 9)
""".stripMargin),
Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id NOT IN (SELECT 9)
""".stripMargin),
Row(1) :: Row(3) :: Nil)

// case `IN`
checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(3) :: Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id AS id2 FROM s1
| LEFT SEMI JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(3) :: Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id as id2 FROM s1
| LEFT ANTI JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(1) :: Row(5) :: Row(7) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id as id2 FROM s1
| LEFT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(1, null) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id as id2 FROM s1
| RIGHT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(null, 1) :: Row(3, 3) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| FULL OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(1, null) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) ::
Row(null, 1) :: Row(null, 4) :: Row(null, 6) :: Nil)

// case `NOT IN`
checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id AS id2 FROM s1
| LEFT SEMI JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id AS id2 FROM s1
| LEFT ANTI JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(3) :: Row(5) :: Row(7) :: Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| LEFT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1, 1) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| RIGHT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1, 1) :: Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| FULL OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1, 1) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) ::
Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil)
}
}

test("SPARK-14791: scalar subquery inside broadcast join") {
val df = sql("select a, sum(b) as s from l group by a having a > (select avg(a) from l)")
val expected = Row(3, 2.0, 3, 3.0) :: Row(6, null, 6, null) :: Nil
Expand Down