-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25784][SQL] Infer filters from constraints after rewriting predicate subquery #22778
Conversation
Test build #97629 has finished for PR 22778 at commit
|
Test build #97655 has finished for PR 22778 at commit
|
retest this please |
Test build #97669 has finished for PR 22778 at commit
|
Can you put the concrete example of the missing case you described in the PR description? |
Also, to make sure no performance regression in the optimizer, can you check optimizer statistics in TPCDS by running |
CollapseProject, | ||
RemoveRedundantProject) :: Nil | ||
} | ||
|
||
test("Column pruning after rewriting predicate subquery") { | ||
val relation = LocalRelation('a.int, 'b.int) | ||
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int) | ||
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to modify this existing test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, spark.sql.constraintPropagation.enabled=false
to test ColumnPruning
.
spark.sql.constraintPropagation.enabled=true
to test ColumnPruning
, InferFiltersFromConstraints
and PushDownPredicate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Thanks.
} | ||
} | ||
|
||
test("Infer filters and push down predicate after rewriting predicate subquery") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need the column pruning in the test title?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making the test title simple, then leaving comments about what's tested clearly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about refactor these test to:
val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)
test("Column pruning") {
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
.analyze
comparePlans(optimized, correctAnswer)
}
}
test("Column pruning, infer filters and push down predicate") {
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.where(IsNotNull('a)).select('a)
.join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x))
.analyze
comparePlans(optimized, correctAnswer)
}
}
RewritePredicateSubquery, | ||
ColumnPruning, | ||
InferFiltersFromConstraints, | ||
PushDownPredicate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, cc @gatorsmile @maryannxue
@maropu This is optimizer statistics before and after this patch.
|
We also need to add the |
@@ -171,10 +171,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) | |||
// "Extract PythonUDF From JoinCondition". | |||
Batch("Check Cartesian Products", Once, | |||
CheckCartesianProducts) :+ | |||
Batch("RewriteSubquery", Once, | |||
Batch("Rewrite Subquery", Once, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not have a good answer for this PR. Ideally, we should run the whole batch operatorOptimizationBatch
. However, running the whole batch could be very time consuming. I would suggest to add a new parameter for introducing the time bound limit for each batch.
cc @maryannxue WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Do you think its a good time to revisit Natt's PR to convert subquery expressions to Joins early in the optimization process ? Perhaps then we can take advantage of all the subsequent rules firing after the subquery rewrite ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I think @dilipbiswal's suggestion is the right way to go. If you think of this subquery rewriting as another kind of de-correlation, it should be a pre-optimization rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. That sounds also good to me. @dilipbiswal Could you take the PR #17520 over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Sure Sean.. Let me give it a try.
Test build #98246 has finished for PR 22778 at commit
|
Let us hold this PR and try to fix #17520 instead. |
Test build #98294 has finished for PR 22778 at commit
|
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
Test build #127986 has finished for PR 22778 at commit
|
What changes were proposed in this pull request?
SPARK-22662 fixed failed to prune columns after rewriting predicate subquery. but infer filters is still missing, this pr fix it.
Before this patch:
After this patch:
How was this patch tested?
unit tests and benchmark tests
1. Benchmark test 1
2. Benchmark test 2
This change affect TPC-DS: q4, q5, q8, q10, q11, q14a, q14b, q16, q23a, q23b, q33, q35, q56, q58, q60, q69, q70, q74, q83, q93, q94, q95, q5a, q10a, q14, q35, q35a, q70a, q74. and this is benchmark result(note that: you can click SQL name to compare the optimized plan before and after):