Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29145][SQL] Support sub-queries in join conditions #25854

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case j: Join if j.childrenResolved =>
resolveSubQueries(j, Seq(j, j.left, j.right))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't recall the details, but why it's not Seq(j.left, j.right)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't recall the details, but why it's not Seq(j.left, j.right)?

Should be a mistake, raise a pr and remove this?

case s: SupportsSubquery if s.childrenResolved =>
resolveSubQueries(s, s.children)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ trait CheckAnalysis extends PredicateHelper {

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter | _: SupportsSubquery => // Ok
case _: Filter | _: SupportsSubquery | _: Join => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter and a few commands: $plan")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the message: Filter/Join and a few commands

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the message: Filter/Join and a few commands

Done

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, Project}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalRelation, Project}

/**
* Unit tests for [[ResolveSubquery]].
Expand All @@ -29,8 +30,10 @@ class ResolveSubquerySuite extends AnalysisTest {

val a = 'a.int
val b = 'b.int
val c = 'c.int
val t1 = LocalRelation(a)
val t2 = LocalRelation(b)
val t3 = LocalRelation(c)

test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") {
val expr = Filter(
Expand All @@ -41,4 +44,13 @@ class ResolveSubquerySuite extends AnalysisTest {
assert(m.contains(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses"))
}

test("SPARK-29145 Support subquery in join condition") {
val expr = Join(t1,
t2,
Inner,
Some(InSubquery(Seq(a), ListQuery(Project(Seq(UnresolvedAttribute("c")), t3)))),
null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: JoinHint.NONE instead of null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: JoinHint.NONE instead of null.

Done

assertAnalysisSuccess(expr)
}
}
24 changes: 24 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-29145: JOIN Condition use QueryList") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to SQLQueryTestSuite?

It sounds like it does not contain any test case that check the EXISTS subquery? Could you also add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to SQLQueryTestSuite?

It sounds like it does not contain any test case that check the EXISTS subquery? Could you also add it?

Ok, will raise a pr follow your comment.

withTempView("s1", "s2", "s3") {
Seq(1, 3, 5, 7, 9).toDF("id").createOrReplaceTempView("s1")
Seq(1, 3, 4, 6, 9).toDF("id").createOrReplaceTempView("s2")
Seq(3, 4, 6, 9).toDF("id").createOrReplaceTempView("s3")

checkAnswer(
sql("SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select 9)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put correlated subquery in join condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put correlated subquery in join condition?

Subquery is in join condition, LogicalPlan as below:

== Parsed Logical Plan ==
'Project ['s1.id]
+- 'Join Inner, (('s1.id = 's2.id) AND 's1.id IN (list#258 []))
   :  +- 'Project [unresolvedalias(9, None)]
   :     +- OneRowRelation
   :- 'UnresolvedRelation [s1]
   +- 'UnresolvedRelation [s2]

== Analyzed Logical Plan ==
id: int
Project [id#244]
+- Join Inner, ((id#244 = id#250) AND id#244 IN (list#258 []))
   :  +- Project [9 AS 9#259]
   :     +- OneRowRelation
   :- SubqueryAlias `s1`
   :  +- Project [value#241 AS id#244]
   :     +- LocalRelation [value#241]
   +- SubqueryAlias `s2`
      +- Project [value#247 AS id#250]
         +- LocalRelation [value#247]

== Optimized Logical Plan ==
Project [id#244]
+- Join Inner, (id#244 = id#250)
   :- Project [value#241 AS id#244]
   :  +- Join LeftSemi, (value#241 = 9#259)
   :     :- LocalRelation [value#241]
   :     +- Project [9 AS 9#259]
   :        +- OneRowRelation
   +- Project [value#247 AS id#250]
      +- Join LeftSemi, (value#247 = 9#259)
         :- LocalRelation [value#247]
         +- Project [9 AS 9#259]
            +- OneRowRelation

== Physical Plan ==
*(4) Project [id#244]
+- *(4) BroadcastHashJoin [id#244], [id#250], Inner, BuildRight
   :- *(4) Project [value#241 AS id#244]
   :  +- *(4) BroadcastHashJoin [value#241], [9#259], LeftSemi, BuildRight
   :     :- *(4) LocalTableScan [value#241]
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#145]
   :        +- *(1) Project [9 AS 9#259]
   :           +- *(1) Scan OneRowRelation[]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#173]
      +- *(3) Project [value#247 AS id#250]
         +- *(3) BroadcastHashJoin [value#247], [9#259], LeftSemi, BuildRight
            :- *(3) LocalTableScan [value#247]
            +- ReusedExchange [9#259], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#145]

Row(9) :: Nil)

checkAnswer(
sql("SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id NOT IN (select 9)"),
Row(1) :: Row(3) :: Nil)

checkAnswer(
sql("SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select id from s3)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, do we support
SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select id from s3 where s3.id = s2.id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, do we support
SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select id from s3 where s3.id = s2.id)

Cann't since strategy's idempotence is broken. Seem write sql like this is not reasonable...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @dilipbiswal

I checked with pgsql and it's supported. We need to update RewriteCorrelatedScalarSubquery to support it in Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @dilipbiswal

I checked with pgsql and it's supported. We need to update RewriteCorrelatedScalarSubquery to support it in Spark.

We should support it, checking on this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to address the support in this pr? I think its ok to do in another jira. kindly ping @dilipbiswal

Row(3) :: Row(9) :: Nil)

checkAnswer(
sql("SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id NOT IN (select id from s3)"),
Row(1) :: Nil)
}
}

test("SPARK-14791: scalar subquery inside broadcast join") {
val df = sql("select a, sum(b) as s from l group by a having a > (select avg(a) from l)")
val expected = Row(3, 2.0, 3, 3.0) :: Row(6, null, 6, null) :: Nil
Expand Down