-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29800][SQL] Rewrite non-correlated EXISTS subquery use ScalaSubquery to optimize perf #26437
Conversation
@cloud-fan |
|
||
|
||
def updateResult(): Unit = { | ||
val rows = plan.executeCollect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we don't have a physical plan for Exists is: it's not robust. Collecting the entire result of a query plan at the driver side is very likely to hit OOM. That's why we have to convert Exists to a join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we don't have a physical plan for Exists is: it's not robust. Collecting the entire result of a query plan at the driver side is very likely to hit OOM. That's why we have to convert Exists to a join.
We can make it just return rdd.isEmpy() since exists just need to judge if result is empty.
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
Outdated
Show resolved
Hide resolved
|
||
|
||
def updateResult(): Unit = { | ||
result = !plan.execute().isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this is better to execute a non-correlated EXISTS subquery. Maybe we should update RewritePredicateSubquery
to only handle correlated EXISTS subquery. @dilipbiswal what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this is better to execute a non-correlated EXISTS subquery. Maybe we should update
RewritePredicateSubquery
to only handle correlated EXISTS subquery. @dilipbiswal what do you think?
Yeah, wait for his advise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @AngersZhuuuu Thanks for pinging me. Just for me to understand, since we refer to another pr in this pr.
So we are considering planning the Subqueries appearing inside ON clause as a Join, right ?
Assuming above, so if the query was :
SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND T1.C1 EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1)
We are considering to plan it as :
(T1 LeftSemi T3 ON T1.C1 = T3.C1) Join T2 ON T1.C1 = T2.C2
This Looks okay to me for inner joins. I am just not sure about outer joins.. What do you think Wenchen ?
Now, coming to the non-correlated subqueries, if we keep it as a PlanExpression and execute it, one thing we have to see is "what is the join strategy thats being picked". Its always going to be broadcast nested loop as it won't be a "equi-join" ? right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND T1.C1 EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1)
Is not correct .
You mean below ?
SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1)
For this type sql we need to change RewritePredicateSubquery
as cloud-fan said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu You r right. Sorry.. i had written it as IN initially and forgot to adjust to exists :-)
Yeah, we need to change RewritePredicateSubquery which handles correlated subquery rewrites. The only thing i am not sure is about the outer joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we need to change RewritePredicateSubquery which handles correlated subquery rewrites. The only thing i am not sure is about the outer joins.
Yes, outer join is complex, if we do this, we need to add end to end test case cover each case to make sure the final plans are as expected.
|
||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
prepareResult() | ||
ExistsSubquery(child, subQuery, result).doGenCode(ctx, ev) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we create ExistsSubquery
to only do codegen? can we put the codegen logic in ExistsExec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we create
ExistsSubquery
to only do codegen? can we put the codegen logic inExistsExec
?
There are conflicts between ExecSubqueryExpression and UnaryExpression,
They are both abstract class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to extend UnaryExpression
and we can still implement codegen, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to extend
UnaryExpression
and we can still implement codegen, right?
Done
buildJoin(outerPlan, sub, LeftAnti, joinCond) | ||
} else { | ||
Filter(Not(exists), newFilter) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @dilipbiswal
Change here to support non-correct exists subquery run like mentioned in this comment https://github.com/apache/spark/pull/26437/files#r344203937
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu I discussed this with Wenchen briefly. Do you think we can safely inject a "LIMIT 1" into our subplan to expedite its execution ? Pl. lets us know what you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu I discussed this with Wenchen briefly. Do you think we can safely inject a "LIMIT 1" into our subplan to expedite its execution ? Pl. lets us know what you think ?
I am also thinking about reduce the execution cost of this sub query.
LIMIT 1
is ok .
My direction is making this execution like Spark Thrift Server's incremental collect.
Only execute one partition.
Discuss these two ways safety and cost?
@cloud-fan @dilipbiswal @maropu We need to update |
some more thoughts: I think we can rewrite non-correlated EXISTS subquery to a non-correlated scalar subquery. e.g. |
It's ok to do like this, but it will add one more shuffle action for |
Yea true, we can add a LIMIT 1 to the scalar subquery before count, then the result won't be huge. This is also how we implement |
I am not sure if |
@dilipbiswal @cloud-fan With
|
do you mean something like |
Yea, what I want.
In this way, we can reduce the subquery's minimize expenses.
This way is ok? |
makes sense, let's keep the new EXISTS expression. Can we add a |
Good suggestion, updated. One point I don't support @dilipbiswal For current way, what do you think? |
case (p, Not(Exists(sub, conditions, _))) => | ||
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) | ||
buildJoin(outerPlan, sub, LeftAnti, joinCond) | ||
case (p, exists @ Exists(sub, conditions, _)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we change the beginning instead?
val (withSubquery, withoutSubquery) = splitConjunctivePredicates(condition).partition { cond =>
SubqueryExpression.hasInOrExistsSubquery(cond) && !isNonCorrelatedExists
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we change hasInOrExistsSubquery
to hasInOrCorrelatedExistsSubquery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasInOrCorrelatedExistsSubquery
Done
* The physical node of exists-subquery. This is for support use exists in join's on condition, | ||
* since some join type we can't pushdown exists condition, we plan it here | ||
*/ | ||
case class ExistsExec(child: Expression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
case class A(
para1: T,
para2: T): R ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
case class A( para1: T, para2: T): R ...
Done
subQuery: String, | ||
plan: BaseSubqueryExec, | ||
exprId: ExprId, | ||
private var resultBroadcast: Broadcast[Boolean] = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need broadcast? Can we follow the physical scalar subquery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need broadcast? Can we follow the physical scalar subquery?
Make it can be used by each partition, reduce return data size during compute or return result
Test build #4985 has finished for PR 26437 at commit
|
Is the second SQL query wrong ( |
Write comment error in added rule, thanks for you reminding, could you help to trigger retest |
retest this please. |
@@ -56,7 +56,7 @@ object ReplaceExpressions extends Rule[LogicalPlan] { | |||
* Rewrite non correlated exists subquery to use ScalarSubquery | |||
* WHERE EXISTS (SELECT A FROM TABLE B WHERE COL1 > 10) | |||
* will be rewrite to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rewritten
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rewritten
Got the point, changed...
plan.expressions.map(_.collect { | ||
case sub: ExecSubqueryExpression => getNumInMemoryTablesRecursively(sub.plan) | ||
}.sum).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
plan.expressions.flatMap(_.collect {
case sub: ExecSubqueryExpression =>
getNumInMemoryTablesRecursively(sub.plan)
}).sum
|WHERE | ||
|NOT EXISTS (SELECT * FROM t1) | ||
|NOT EXISTS (SELECT * FROM t1 b WHERE a.i = b.i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to change the existing test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to change the existing test?
case inMemoryTable @ InMemoryTableScanExec(_, _, relation) => | ||
getNumInMemoryTablesRecursively(relation.cachedPlan) + | ||
getNumInMemoryTablesInSubquery(inMemoryTable) + 1 | ||
case p => | ||
getNumInMemoryTablesInSubquery(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed for this PR? Looks like not directly related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed for this PR? Looks like not directly related?
splitConjunctivePredicates(condition) | ||
.partition(SubqueryExpression.hasInOrCorrelatedExistsSubquery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, nvm, I saw it.
@@ -64,9 +64,10 @@ object SubqueryExpression { | |||
/** | |||
* Returns true when an expression contains an IN or EXISTS subquery and false otherwise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update the doc too.
@@ -52,6 +52,21 @@ object ReplaceExpressions extends Rule[LogicalPlan] { | |||
} | |||
} | |||
|
|||
/** | |||
* Rewrite non correlated exists subquery to use ScalarSubquery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non correlated -> uncorrelated
* will be rewrite to | ||
* WHERE (SELECT 1 FROM (SELECT A FROM TABLE B WHERE COL1 > 10) LIMIT 1) IS NOT NULL | ||
*/ | ||
object RewriteNonCorrelatedExists extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a test for this rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a test for this rule?
With test case
test("Rewritten uncorrelated exists subquery to use ScalarSubquery") {
val relation = LocalRelation('a.int)
val relExistSubquery = LocalRelation('x.int, 'y.int, 'z.int).where('x > 10)
val query = relation.where(Exists(relExistSubquery)).select('a)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.where(IsNotNull(ScalarSubquery(Limit(Literal(1),
Project(Seq(Alias(Literal(1), "col")()), relExistSubquery)))))
.analyze
comparePlans(optimized, correctAnswer)
}
Get error
\[info] RewriteSubquerySuite:
[info] - Rewritten uncorrelated exists subquery to use ScalarSubquery *** FAILED *** (852 milliseconds)
[info] == FAIL: Plans do not match ===
[info] Filter isnotnull(scalar-subquery#0 []) Filter isnotnull(scalar-subquery#0 [])
[info] : +- GlobalLimit 1 : +- GlobalLimit 1
[info] : +- LocalLimit 1 : +- LocalLimit 1
[info] !: +- Project [1 AS col#5] : +- Project [1 AS col#6]
[info] : +- Filter (x#1 > 10) : +- Filter (x#1 > 10)
[info] : +- LocalRelation <empty>, [x#1, y#2, z#3] : +- LocalRelation <empty>, [x#1, y#2, z#3]
[info] +- LocalRelation <empty>, [a#0] +- LocalRelation <empty>, [a#0] (PlanTest.scala:147)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
Because of Alias
in RewriteNonCorrelatedExists
.
Any good advise for test case, where I add test case can avoid this problem? @cloud-fan @viirya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also noticed that in related docs (in Spark code or not), Exists expression seems to be correlated condition.
For example, the Exists
doc said The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
.
Although uncorrelated exist query definitely work now and we also have such query in tests like exist-basic.sql
.
Maybe we should also update the doc.
Test build #116119 has finished for PR 26437 at commit
|
Updated what you have mentioned. |
@@ -52,6 +52,21 @@ object ReplaceExpressions extends Rule[LogicalPlan] { | |||
} | |||
} | |||
|
|||
/** | |||
* Rewritten uncorrelated exists subquery to use ScalarSubquery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrite non-correlated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrite non-correlated
updated.
Test build #4986 has finished for PR 26437 at commit
|
retest this please |
Test build #116134 has finished for PR 26437 at commit
|
retest this please |
Test build #116144 has finished for PR 26437 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
Current catalyst rewrite non-correlated exists subquery to BroadcastNestLoopJoin, it's performance is not good , now we rewrite non-correlated EXISTS subquery to ScalaSubquery to optimize the performance.
We rewrite
to
to avoid build join to solve EXISTS expression.
Why are the changes needed?
Optimize EXISTS performance.
Does this PR introduce any user-facing change?
NO
How was this patch tested?
Manuel Tested