From c3ec8488d945712d6e2a19987e205f5aee5146de Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 6 Nov 2020 17:29:30 +0800 Subject: [PATCH 1/4] [SPARK-33302][SQL] Failed to push down filters through Expand --- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../optimizer/FilterPushdownSuite.scala | 32 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9519a56c2817a..51f7799b1e427 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1269,6 +1269,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe case _: Sort => true case _: BatchEvalPython => true case _: ArrowEvalPython => true + case _: Expand => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 156313300eef9..5e151c7b8ffdb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, IntegerType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.CalendarInterval class FilterPushdownSuite extends PlanTest { @@ -1208,6 +1208,36 @@ class FilterPushdownSuite extends PlanTest { checkAnalysis = false) } + + test("push down predicate through expand") { + val input = LocalRelation('a.int, 'b.string, 'c.double) + val query = + Aggregate( + Seq('a, 'b), + Seq(sum('c).as("sum")), + Filter('a > 1, + Expand( + Seq( + Seq('a, 'b, 'c, Literal.create(null, StringType), 1), + Seq('a, 'b, 'c, 'a, 2)), + Seq('a, 'b, 'c), + input))).analyze + val optimized = Optimize.execute(query) + + val expected = + Aggregate( + Seq('a, 'b), + Seq(sum('c).as("sum")), + Expand( + Seq( + Seq('a, 'b, 'c, Literal.create(null, StringType), 1), + Seq('a, 'b, 'c, 'a, 2)), + Seq('a, 'b, 'c), + Filter('a > 1, input))).analyze + + comparePlans(optimized, expected) + } + test("SPARK-28345: PythonUDF predicate should be able to pushdown to join") { val pythonUDFJoinCond = { val pythonUDF = PythonUDF("pythonUDF", null, From 803bf0a6ac5f68e634fbc0980c75342ffee3f51a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 9 Nov 2020 20:56:58 +0800 Subject: [PATCH 2/4] Update FilterPushdownSuite.scala --- .../sql/catalyst/optimizer/FilterPushdownSuite.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 5e151c7b8ffdb..e96b3f69bfa69 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1212,28 +1212,22 @@ class FilterPushdownSuite extends PlanTest { test("push down predicate through expand") { val input = LocalRelation('a.int, 'b.string, 'c.double) val query = - Aggregate( - Seq('a, 'b), - Seq(sum('c).as("sum")), Filter('a > 1, Expand( Seq( Seq('a, 'b, 'c, Literal.create(null, StringType), 1), Seq('a, 'b, 'c, 'a, 2)), Seq('a, 'b, 'c), - input))).analyze + input)).analyze val optimized = Optimize.execute(query) val expected = - Aggregate( - Seq('a, 'b), - Seq(sum('c).as("sum")), Expand( Seq( Seq('a, 'b, 'c, Literal.create(null, StringType), 1), Seq('a, 'b, 'c, 'a, 2)), Seq('a, 'b, 'c), - Filter('a > 1, input))).analyze + Filter('a > 1, input)).analyze comparePlans(optimized, expected) } From 77d6e45ceb7b21dcc8fabf05b9b40102bd0b645b Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Nov 2020 00:00:17 +0800 Subject: [PATCH 3/4] Update LeftSemiAntiJoinPushDownSuite.scala --- .../optimizer/LeftSemiAntiJoinPushDownSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index a3da9f73ebd40..729a1e9f06ca5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -315,6 +315,21 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + test("Unary: LeftSemi join push down through Expand") { + val expand = Expand(Seq(Seq('a, 'b, "null"), Seq('a, "null", 'c)), + Seq('a, 'b, 'c), testRelation) + val originalQuery = expand + .join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd && 'b === 1)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = Expand(Seq(Seq('a, 'b, "null"), Seq('a, "null", 'c)), + Seq('a, 'b, 'c), testRelation + .join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd && 'b === 1))) + .analyze + + comparePlans(optimized, correctAnswer) + } + Seq(Some('d === 'e), None).foreach { case innerJoinCond => Seq(LeftSemi, LeftAnti).foreach { case outerJT => Seq(Inner, LeftOuter, Cross, RightOuter).foreach { case innerJT => From a09f8364cf4b9853cdf1ff4dc00d761b3a5b6291 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Nov 2020 07:52:48 +0800 Subject: [PATCH 4/4] Update FilterPushdownSuite.scala --- .../spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index e96b3f69bfa69..11ec037c94f73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1208,9 +1208,7 @@ class FilterPushdownSuite extends PlanTest { checkAnalysis = false) } - test("push down predicate through expand") { - val input = LocalRelation('a.int, 'b.string, 'c.double) val query = Filter('a > 1, Expand( @@ -1218,7 +1216,7 @@ class FilterPushdownSuite extends PlanTest { Seq('a, 'b, 'c, Literal.create(null, StringType), 1), Seq('a, 'b, 'c, 'a, 2)), Seq('a, 'b, 'c), - input)).analyze + testRelation)).analyze val optimized = Optimize.execute(query) val expected = @@ -1227,7 +1225,7 @@ class FilterPushdownSuite extends PlanTest { Seq('a, 'b, 'c, Literal.create(null, StringType), 1), Seq('a, 'b, 'c, 'a, 2)), Seq('a, 'b, 'c), - Filter('a > 1, input)).analyze + Filter('a > 1, testRelation)).analyze comparePlans(optimized, expected) }