Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29145][SQL] Support sub-queries in join conditions #25854

Closed
wants to merge 20 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Sep 19, 2019

What changes were proposed in this pull request?

Support SparkSQL use iN/EXISTS with subquery in JOIN condition.

Why are the changes needed?

Support SQL use iN/EXISTS with subquery in JOIN condition.

Does this PR introduce any user-facing change?

This PR is for enable user use subquery in JOIN's ON condition. such as we have create three table

CREATE TABLE A(id String);
CREATE TABLE B(id String);
CREATE TABLE C(id String);

we can do query like :

SELECT A.id  from  A JOIN B ON A.id = B.id and A.id IN (select C.id from C)

How was this patch tested?

ADDED UT

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #111015 has finished for PR 25854 at commit 5aa2ed6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Sep 20, 2019

Does this PR introduce any user-facing change?

No? It seems ths pr intends to accept a new statement in DataFrame/SQL?

@maropu
Copy link
Member

maropu commented Sep 20, 2019

Also, can you add end-to-end tests in SQLQueryTestSuite or somewhere?

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Sep 20, 2019

end-to-end tests in SQLQ

Ok, I will do this. End-to-end UT add to SubQuerySuit

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111029 has finished for PR 25854 at commit fa55b3a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111037 has finished for PR 25854 at commit bd7c098.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2019

Test build #112100 has finished for PR 25854 at commit 3108da2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2019

Test build #112101 has finished for PR 25854 at commit 6b58893.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

gentle ping @maropu @wangyum @cloud-fan

@@ -602,7 +602,7 @@ trait CheckAnalysis extends PredicateHelper {

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter | _: SupportsSubquery => // Ok
case _: Filter | _: SupportsSubquery | _: Join => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter and a few commands: $plan")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the message: Filter/Join and a few commands

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the message: Filter/Join and a few commands

Done

Seq(3, 4, 6, 9).toDF("id").createOrReplaceTempView("s3")

checkAnswer(
sql("SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select 9)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put correlated subquery in join condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put correlated subquery in join condition?

Subquery is in join condition, LogicalPlan as below:

== Parsed Logical Plan ==
'Project ['s1.id]
+- 'Join Inner, (('s1.id = 's2.id) AND 's1.id IN (list#258 []))
   :  +- 'Project [unresolvedalias(9, None)]
   :     +- OneRowRelation
   :- 'UnresolvedRelation [s1]
   +- 'UnresolvedRelation [s2]

== Analyzed Logical Plan ==
id: int
Project [id#244]
+- Join Inner, ((id#244 = id#250) AND id#244 IN (list#258 []))
   :  +- Project [9 AS 9#259]
   :     +- OneRowRelation
   :- SubqueryAlias `s1`
   :  +- Project [value#241 AS id#244]
   :     +- LocalRelation [value#241]
   +- SubqueryAlias `s2`
      +- Project [value#247 AS id#250]
         +- LocalRelation [value#247]

== Optimized Logical Plan ==
Project [id#244]
+- Join Inner, (id#244 = id#250)
   :- Project [value#241 AS id#244]
   :  +- Join LeftSemi, (value#241 = 9#259)
   :     :- LocalRelation [value#241]
   :     +- Project [9 AS 9#259]
   :        +- OneRowRelation
   +- Project [value#247 AS id#250]
      +- Join LeftSemi, (value#247 = 9#259)
         :- LocalRelation [value#247]
         +- Project [9 AS 9#259]
            +- OneRowRelation

== Physical Plan ==
*(4) Project [id#244]
+- *(4) BroadcastHashJoin [id#244], [id#250], Inner, BuildRight
   :- *(4) Project [value#241 AS id#244]
   :  +- *(4) BroadcastHashJoin [value#241], [9#259], LeftSemi, BuildRight
   :     :- *(4) LocalTableScan [value#241]
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#145]
   :        +- *(1) Project [9 AS 9#259]
   :           +- *(1) Scan OneRowRelation[]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#173]
      +- *(3) Project [value#247 AS id#250]
         +- *(3) BroadcastHashJoin [value#247], [9#259], LeftSemi, BuildRight
            :- *(3) LocalTableScan [value#247]
            +- ReusedExchange [9#259], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#145]

Row(1) :: Row(3) :: Nil)

checkAnswer(
sql("SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select id from s3)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, do we support
SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select id from s3 where s3.id = s2.id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, do we support
SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id IN (select id from s3 where s3.id = s2.id)

Cann't since strategy's idempotence is broken. Seem write sql like this is not reasonable...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @dilipbiswal

I checked with pgsql and it's supported. We need to update RewriteCorrelatedScalarSubquery to support it in Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @dilipbiswal

I checked with pgsql and it's supported. We need to update RewriteCorrelatedScalarSubquery to support it in Spark.

We should support it, checking on this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to address the support in this pr? I think its ok to do in another jira. kindly ping @dilipbiswal

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112140 has finished for PR 25854 at commit 25f31dc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu I just quickly checked the plan for following query :

query

SELECT s1.id from s1 JOIN s2 ON s1.id = s2.id and s1.id NOT IN (select id from s3)

plan

Project [id#244]
+- Join Inner, (id#244 = id#250)
  :- Project [value#241 AS id#244]
  :  +- Join LeftAnti, ((value#241 = id#256) OR isnull((value#241 = id#256)))
  :     :- LocalRelation [value#241]
  :     +- Project [value#253 AS id#256]
  :        +- LocalRelation [value#253]
  +- Project [value#247 AS id#250]
     +- Join LeftAnti, ((value#247 = id#256) OR isnull((value#247 = id#256)))
        :- LocalRelation [value#247]
        +- Project [value#253 AS id#256]
           +- LocalRelation [value#253]

Thats the reason i asked to test out the outer joins. Lets please make sure that in case of Outer joins we preserve the full join condition in the main join. Lets add few tests to make sure please.

Check whole process, you show is optimized plan, in analyzed plan, join condition is still in main join, after optimize, it was pushed down.

@AngersZhuuuu
Copy link
Contributor Author

@dilipbiswal
Add more UT, result is ok, I wonder if it covers all the cases you want

@SparkQA
Copy link

SparkQA commented Oct 23, 2019

Test build #112521 has finished for PR 25854 at commit 2ead378.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 23, 2019

Test build #112534 has finished for PR 25854 at commit 3db4aaf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

@AngersZhuuuu Great.. Thanks a lot for adding the UTs. Looks good to me.

Row(3) :: Row(9) :: Nil)

checkAnswer(
sql("SELECT s1.id as id2 from s1 LEFT SEMI JOIN s2 " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you follow the format of the other tests? In multi-line cases, the format seems to be like this;

      sql("""
            |
            | ...
            ...
            |  ) """.stripMargin)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@maropu
Copy link
Member

maropu commented Oct 24, 2019

Can you update the title? Support sub-queries in join conditions?

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-29145][SQL] Spark SQL cannot handle "NOT IN" condition when using "JOIN" [SPARK-29145][SQL] Support sub-queries in join conditions Oct 24, 2019
@SparkQA
Copy link

SparkQA commented Oct 24, 2019

Test build #112585 has finished for PR 25854 at commit 307802a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Oct 24, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Oct 24, 2019

Test build #112593 has finished for PR 25854 at commit 307802a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 67cf043 Oct 24, 2019
@maropu
Copy link
Member

maropu commented Oct 24, 2019

Thanks! Merged to master.

@@ -204,6 +204,154 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-29145: JOIN Condition use QueryList") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to SQLQueryTestSuite?

It sounds like it does not contain any test case that check the EXISTS subquery? Could you also add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to SQLQueryTestSuite?

It sounds like it does not contain any test case that check the EXISTS subquery? Could you also add it?

Ok, will raise a pr follow your comment.

dongjoon-hyun pushed a commit that referenced this pull request Nov 13, 2019
…query/in-subquery/in-joins.sql`

### What changes were proposed in this pull request?
Follow comment of #25854 (comment)

### Why are the changes needed?
NO

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
ADD TEST CASE

Closes #26406 from AngersZhuuuu/SPARK-29145-FOLLOWUP.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@@ -1697,6 +1697,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case j: Join if j.childrenResolved =>
resolveSubQueries(j, Seq(j, j.left, j.right))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't recall the details, but why it's not Seq(j.left, j.right)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't recall the details, but why it's not Seq(j.left, j.right)?

Should be a mistake, raise a pr and remove this?

cloud-fan pushed a commit that referenced this pull request May 12, 2021
…in join conditions

### What changes were proposed in this pull request?
According to discuss #25854 (comment)

### Why are the changes needed?
Clean code

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

Closes #32499 from AngersZhuuuu/SPARK-29145-fix.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants