-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29768][SQL][FOLLOW-UP]Improve handling non-deterministic filter of ScanOperation #27073
Conversation
cc @cloud-fan |
// Follow CombineFilters and only keep going if the collected Filters | ||
// are all deterministic and this filter doesn't have common non-deterministic | ||
// Follow CombineFilters and only keep going if 1) the collected Filters | ||
// and this filter are all deterministic or 2) if this filter is non-deterministic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit 2) if this filter is the first collected filter
// expressions with lower Project. | ||
if (filters.forall(_.deterministic) && | ||
!hasCommonNonDeterministic(Seq(condition), aliases)) { | ||
if (filters.nonEmpty && filters.forall(_.deterministic)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be filters.nonEmpty && filters.forall(_.deterministic) && condition.deterministic && !hasCommonNonDeterministic(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition
is checked below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
val canMergeFilters = (filters.nonEmpty && filters.forall(_.deterministic) &&
condition.deterministic) || filters.isEmpty
if (canMergeFilters && !hasCommonNonDeterministic(...)) {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good!
@@ -148,6 +148,7 @@ object FileSourceStrategy extends Strategy with Logging { | |||
val filterSet = ExpressionSet(filters) | |||
|
|||
val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, l.output) | |||
.filter(_.deterministic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DataSourceStrategy.normalizeExprs(filters.filter(_.deterministic), l.output)
@@ -17,8 +17,6 @@ | |||
|
|||
package org.apache.spark.sql.catalyst.planning | |||
|
|||
import scala.collection.mutable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a unused import.
Test build #116018 has finished for PR 27073 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
Show resolved
Hide resolved
val substitutedCondition = substitute(aliases)(condition) | ||
// Follow CombineFilters and only keep going if 1) the collected Filters | ||
// and this filter are all deterministic or 2) if this filter is the first | ||
// collected filter which is non-deterministic and doesn't have common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... which is non-deterministic ...
this is not corrected. The first collected filter can be non-deterministic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean it can also be deterministic? Is it a typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry I misread it. The only thing that matters is: it should be the first collected filter. It doesn't have to be deterministic or non-deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we update the comment?
Test build #116037 has finished for PR 27073 at commit
|
@@ -35,7 +33,9 @@ trait OperationHelper { | |||
}) | |||
|
|||
protected def substitute(aliases: AttributeMap[Expression])(expr: Expression): Expression = { | |||
expr.transform { | |||
// use transformUp instead of transformDown to avoid dead loop | |||
// in case of there's Alias which recursively alias itself. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's Alias whose exprId is the same as its child attribute
Test build #116079 has finished for PR 27073 at commit
|
Test build #116081 has finished for PR 27073 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
For
ScanOperation
, if it collects more than one filters, then all filters must be deterministic. And filter can be non-deterministic iff there's only one collected filter.FileSourceStrategy
should filter out non-deterministic filter, as it will hit haven't initialized exception if it's a partition related filter.Why are the changes needed?
Strictly follow
CombineFilters
's behavior which doesn't allow combine two filters where non-deterministic predicates exist. And avoid hitting exception for file source.Does this PR introduce any user-facing change?
No
How was this patch tested?
Test exists.