-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33302][SQL] Push down filters through Expand #30278
Changes from 1 commit
c3ec848
803bf0a
77d6e45
a09f836
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ | |
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.{BooleanType, IntegerType, TimestampType} | ||
import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, TimestampType} | ||
import org.apache.spark.unsafe.types.CalendarInterval | ||
|
||
class FilterPushdownSuite extends PlanTest { | ||
|
@@ -1208,6 +1208,36 @@ class FilterPushdownSuite extends PlanTest { | |
checkAnalysis = false) | ||
} | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: unnecessary blank. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done |
||
test("push down predicate through expand") { | ||
val input = LocalRelation('a.int, 'b.string, 'c.double) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done |
||
val query = | ||
Aggregate( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this test need an Aggregate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not necessary, remove it. |
||
Seq('a, 'b), | ||
Seq(sum('c).as("sum")), | ||
Filter('a > 1, | ||
Expand( | ||
Seq( | ||
Seq('a, 'b, 'c, Literal.create(null, StringType), 1), | ||
Seq('a, 'b, 'c, 'a, 2)), | ||
Seq('a, 'b, 'c), | ||
input))).analyze | ||
val optimized = Optimize.execute(query) | ||
|
||
val expected = | ||
Aggregate( | ||
Seq('a, 'b), | ||
Seq(sum('c).as("sum")), | ||
Expand( | ||
Seq( | ||
Seq('a, 'b, 'c, Literal.create(null, StringType), 1), | ||
Seq('a, 'b, 'c, 'a, 2)), | ||
Seq('a, 'b, 'c), | ||
Filter('a > 1, input))).analyze | ||
|
||
comparePlans(optimized, expected) | ||
} | ||
|
||
test("SPARK-28345: PythonUDF predicate should be able to pushdown to join") { | ||
val pythonUDFJoinCond = { | ||
val pythonUDF = PythonUDF("pythonUDF", null, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change affects the
PushDownLeftSemiAntiJoin
rule, too. So, could you add tests for the case?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double check the case, seems current master fix this case by some pr, but 3.0 is still as jira desc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "fix this case"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found the pr #29673
Before this pr, SQL
Optimized plan is
After that pr, Optimized plan is
Filter((CASE WHEN (pid#16 = 3) THEN iOS WHEN (pid#16 = 4) THEN Android ELSE Other END = iOS))
is pushed down and won't generateExpand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does it related to left semi join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With test case in
LeftSemiPushdownSuite
originalQuery is
Test result is
Expand will be promoted below Join, so should we ignore this case or add a parameter in
canPushThrough
like belowThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry I didn't get it. What's the issue here? We can't pushdown left-semi join through expand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimized (left-side) plan above looks correct to me...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh..my mistake, I misunderstood some code about
PushDownLeftSemiAntiJoin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My fault, I misunderstand some code about
PushDownLeftSemiAntiJoin
, test case added ==