From 04975a68b583a6175f93da52374108e5d4754d9a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 24 Sep 2017 00:05:17 +0900 Subject: [PATCH] [SPARK-22109][SQL] Resolves type conflicts between strings and timestamps in partition column ## What changes were proposed in this pull request? This PR proposes to resolve the type conflicts in strings and timestamps in partition column values. It looks we need to set the timezone as it needs a cast between strings and timestamps. ```scala val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str") val path = "/tmp/test.parquet" df.write.format("parquet").partitionBy("str").save(path) spark.read.parquet(path).show() ``` **Before** ``` java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172) at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207) at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331) at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481) at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ``` **After** ``` +---+-------------------+ | i| str| +---+-------------------+ | 2|2014-01-01 00:00:00| | 1|2015-01-01 00:00:00| | 3| blah| +---+-------------------+ ``` ## How was this patch tested? Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests. Author: hyukjinkwon Closes #19331 from HyukjinKwon/SPARK-22109. --- .../execution/datasources/PartitioningUtils.scala | 11 ++++++----- .../parquet/ParquetPartitionDiscoverySuite.scala | 12 ++++++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 92358da6d6c67..1c00c9ebb4144 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -139,7 +139,7 @@ object PartitioningUtils { "root directory of the table. If there are multiple root directories, " + "please load them separately and then union them.") - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone) // Creates the StructType which represents the partition columns. val fields = { @@ -318,7 +318,8 @@ object PartitioningUtils { * }}} */ def resolvePartitions( - pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { + pathsWithPartitionValues: Seq[(Path, PartitionValues)], + timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { @@ -333,7 +334,7 @@ object PartitioningUtils { val values = pathsWithPartitionValues.map(_._2) val columnCount = values.head.columnNames.size val resolvedValues = (0 until columnCount).map { i => - resolveTypeConflicts(values.map(_.literals(i))) + resolveTypeConflicts(values.map(_.literals(i)), timeZone) } // Fills resolved literals back to each partition @@ -470,7 +471,7 @@ object PartitioningUtils { * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. */ - private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { + private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = { val desiredType = { val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) // Falls back to string if all values of this column are null or empty string @@ -478,7 +479,7 @@ object PartitioningUtils { } literals.map { case l @ Literal(_, dataType) => - Literal.create(Cast(l, desiredType).eval(), desiredType) + Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 837a0872d7b71..f79b92b804c70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1055,4 +1055,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") { + val df = Seq( + (1, "2015-01-01 00:00:00"), + (2, "2014-01-01 00:00:00"), + (3, "blah")).toDF("i", "str") + + withTempPath { path => + df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath) + checkAnswer(spark.read.load(path.getAbsolutePath), df) + } + } }