From 34191e66360b987d5415397bd0b8a0c4c1c3dfef Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 14 Aug 2018 10:25:29 -0700 Subject: [PATCH] [SPARK-25051][SQL] FixNullability should not stop on AnalysisBarrier ## What changes were proposed in this pull request? The introduction of `AnalysisBarrier` prevented `FixNullability` to go through all the nodes. This introduced a bug, which can lead to wrong results, as the nullability of the output attributes of an outer join can be wrong. The PR makes `FixNullability` going through the `AnalysisBarrier`s. ## How was this patch tested? added UT Author: Marco Gaido Closes #22102 from mgaido91/SPARK-25051. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5963c1467e254..531f3d468185d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1704,6 +1704,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p if !p.resolved => p // Skip unresolved nodes. + case ab: AnalysisBarrier => apply(ab.child) case p: LogicalPlan if p.resolved => val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { case (exprId, attributes) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 9f8d337ead09d..3640f6a4d0c62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2300,4 +2300,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(aggPlusFilter1, aggPlusFilter2.collect()) } } + + test("SPARK-25051: fix nullabilities of outer join attributes doesn't stop on AnalysisBarrier") { + val df1 = spark.range(4).selectExpr("id", "cast(id as string) as name") + val df2 = spark.range(3).selectExpr("id") + assert(df1.join(df2, Seq("id"), "left_outer").where(df2("id").isNull).collect().length == 1) + } }