Skip to content

Commit

Permalink
[SPARK-29145][SQL] Support sub-queries in join conditions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support SparkSQL use iN/EXISTS with subquery  in JOIN condition.

### Why are the changes needed?
Support SQL use iN/EXISTS with subquery  in JOIN condition.

### Does this PR introduce any user-facing change?

This PR is for enable user use subquery in `JOIN`'s ON condition. such as we have create three table
```
CREATE TABLE A(id String);
CREATE TABLE B(id String);
CREATE TABLE C(id String);
```
we can do query like :
```
SELECT A.id  from  A JOIN B ON A.id = B.id and A.id IN (select C.id from C)
```

### How was this patch tested?
ADDED UT

Closes #25854 from AngersZhuuuu/SPARK-29145.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
AngersZhuuuu authored and maropu committed Oct 24, 2019
1 parent 1296bbb commit 67cf043
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case j: Join if j.childrenResolved =>
resolveSubQueries(j, Seq(j, j.left, j.right))
case s: SupportsSubquery if s.childrenResolved =>
resolveSubQueries(s, s.children)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,10 @@ trait CheckAnalysis extends PredicateHelper {

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter | _: SupportsSubquery => // Ok
case _: Filter | _: SupportsSubquery | _: Join => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter and a few commands: $plan")
s" Filter/Join and a few commands: $plan")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, Project}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._

/**
* Unit tests for [[ResolveSubquery]].
Expand All @@ -29,8 +30,10 @@ class ResolveSubquerySuite extends AnalysisTest {

val a = 'a.int
val b = 'b.int
val c = 'c.int
val t1 = LocalRelation(a)
val t2 = LocalRelation(b)
val t3 = LocalRelation(c)

test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") {
val expr = Filter(
Expand All @@ -41,4 +44,13 @@ class ResolveSubquerySuite extends AnalysisTest {
assert(m.contains(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses"))
}

test("SPARK-29145 Support subquery in join condition") {
val expr = Join(t1,
t2,
Inner,
Some(InSubquery(Seq(a), ListQuery(Project(Seq(UnresolvedAttribute("c")), t3)))),
JoinHint.NONE)
assertAnalysisSuccess(expr)
}
}
148 changes: 148 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,154 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-29145: JOIN Condition use QueryList") {
withTempView("s1", "s2", "s3") {
Seq(1, 3, 5, 7, 9).toDF("id").createOrReplaceTempView("s1")
Seq(1, 3, 4, 6, 9).toDF("id").createOrReplaceTempView("s2")
Seq(3, 4, 6, 9).toDF("id").createOrReplaceTempView("s3")

checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id IN (SELECT 9)
""".stripMargin),
Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id NOT IN (SELECT 9)
""".stripMargin),
Row(1) :: Row(3) :: Nil)

// case `IN`
checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(3) :: Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id AS id2 FROM s1
| LEFT SEMI JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(3) :: Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id as id2 FROM s1
| LEFT ANTI JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(1) :: Row(5) :: Row(7) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id as id2 FROM s1
| LEFT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(1, null) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id as id2 FROM s1
| RIGHT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(null, 1) :: Row(3, 3) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| FULL OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id IN (SELECT id FROM s3)
""".stripMargin),
Row(1, null) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) ::
Row(null, 1) :: Row(null, 4) :: Row(null, 6) :: Nil)

// case `NOT IN`
checkAnswer(
sql(
"""
| SELECT s1.id FROM s1
| JOIN s2 ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id AS id2 FROM s1
| LEFT SEMI JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id AS id2 FROM s1
| LEFT ANTI JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(3) :: Row(5) :: Row(7) :: Row(9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| LEFT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1, 1) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| RIGHT OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1, 1) :: Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil)

checkAnswer(
sql(
"""
| SELECT s1.id, s2.id AS id2 FROM s1
| FULL OUTER JOIN s2
| ON s1.id = s2.id
| AND s1.id NOT IN (SELECT id FROM s3)
""".stripMargin),
Row(1, 1) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) ::
Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil)
}
}

test("SPARK-14791: scalar subquery inside broadcast join") {
val df = sql("select a, sum(b) as s from l group by a having a > (select avg(a) from l)")
val expected = Row(3, 2.0, 3, 3.0) :: Row(6, null, 6, null) :: Nil
Expand Down

0 comments on commit 67cf043

Please sign in to comment.