From e31657a1b65ab73d52651c4e0b018d457e44d47f Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 25 Apr 2017 11:46:39 -0500 Subject: [PATCH] review feedback --- .../parquet/ParquetRowConverter.scala | 8 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 +- .../hive/ParquetHiveCompatibilitySuite.scala | 147 +++++++++++------- 3 files changed, 92 insertions(+), 69 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 2f9a7329c6029..d52ff62d93b26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -267,8 +267,8 @@ private[parquet] class ParquetRowConverter( case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. // If the table has a timezone property, apply the correct conversions. See SPARK-12297. - val sessionTsString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) - val sessionTz = Option(sessionTsString).map(TimeZone.getTimeZone(_)) + val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) + val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_)) .getOrElse(DateTimeUtils.defaultTimeZone()) val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz) @@ -685,13 +685,13 @@ private[parquet] object ParquetRowConverter { * actually store it as micros since epoch, why we have to apply a conversion when timezones * change. * - * @param binary + * @param binary a parquet Binary which holds one int96 * @param sessionTz the session timezone. This will be used to determine how to display the time, * and compute functions on the timestamp which involve a timezone, eg. extract * the hour. * @param storageTz the timezone which was used to store the timestamp. This should come from the * timestamp table property, or else assume its the same as the sessionTz - * @return + * @return a timestamp (millis since epoch) which will render correctly in the sessionTz */ def binaryToSQLTimestamp( binary: Binary, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 490bce11c6a8f..99171195453bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -228,11 +228,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // We add the table timezone to the relation options, which automatically gets injected into the // hadoopConf for the Parquet Converters val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - val storageTz = relation.tableMeta.properties.getOrElse(storageTzKey, "") - val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone - Map( - storageTzKey -> storageTz - ) + relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap } private def inferIfNeeded( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 1d0309db42cab..ef5dff31c84a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import java.io.File import java.sql.Timestamp import java.util.TimeZone @@ -52,15 +53,6 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi """.stripMargin) } - override def afterEach(): Unit = { - try { - // drop all databases, tables and functions after each test - spark.sessionState.catalog.reset() - } finally { - super.afterEach() - } - } - private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = { withTable("parquet_compat") { withTempPath { dir => @@ -180,11 +172,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt) // check with a variety of timezones. The unit tests currently are configured to always use // America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone. - Seq( - "UTC" -> "UTC", - "LA" -> "America/Los_Angeles", - "Berlin" -> "Europe/Berlin" - ).foreach { case (tableName, zone) => + testTimezones.foreach { case (tableName, zone) => if (zone != localTz) { testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt) } @@ -201,7 +189,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) } - private def checkHasTz(table: String, tz: Option[String]): Unit = { + private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = { val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table)) assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz) } @@ -214,9 +202,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " + s"sessionTzOpt = $sessionTzOpt") { val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY - withTable(baseTable, s"like_$baseTable", s"select_$baseTable") { - val localTz = TimeZone.getDefault() - val localTzId = localTz.getID() + withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") { // If we ever add a property to set the table timezone by default, defaultTz would change val defaultTz = None // check that created tables have correct TBLPROPERTIES @@ -231,28 +217,37 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi | $tblProperties """.stripMargin) val expectedTableTz = explicitTz.orElse(defaultTz) - checkHasTz(baseTable, expectedTableTz) + checkHasTz(spark, baseTable, expectedTableTz) + spark.sql( + raw"""CREATE TABLE partitioned_$baseTable ( + | x int + | ) + | PARTITIONED BY (y int) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz) spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable") - checkHasTz(s"like_$baseTable", expectedTableTz) + checkHasTz(spark, s"like_$baseTable", expectedTableTz) spark.sql( raw"""CREATE TABLE select_$baseTable | STORED AS PARQUET | AS | SELECT * from $baseTable """.stripMargin) - checkHasTz(s"select_$baseTable", defaultTz) + checkHasTz(spark, s"select_$baseTable", defaultTz) // check alter table, setting, unsetting, resetting the property spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""") - checkHasTz(baseTable, Some("America/Los_Angeles")) - spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""") - checkHasTz(baseTable, Some("UTC")) - spark.sql( raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""") - checkHasTz(baseTable, None) + checkHasTz(spark, baseTable, Some("America/Los_Angeles")) + spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""") + checkHasTz(spark, baseTable, Some("UTC")) + spark.sql(raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""") + checkHasTz(spark, baseTable, None) explicitTz.foreach { tz => - spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""") - checkHasTz(baseTable, expectedTableTz) + spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""") + checkHasTz(spark, baseTable, expectedTableTz) } } } @@ -269,31 +264,29 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi // we manually adjust the timezone just to determine what the desired millis (since epoch, in utc) // is for various "wall-clock" times in different timezones, and then we can compare against those // in our tests. - val originalTz = TimeZone.getDefault - val timestampTimezoneToMillis = try { - (for { - timestampString <- desiredTimestampStrings - timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { - TimeZone.getTimeZone(_) - } - } yield { - TimeZone.setDefault(timezone) - val timestamp = Timestamp.valueOf(timestampString) - (timestampString, timezone.getID()) -> timestamp.getTime() - }).toMap - } finally { - TimeZone.setDefault(originalTz) + val timestampTimezoneToMillis = { + val originalTz = TimeZone.getDefault + try { + (for { + timestampString <- desiredTimestampStrings + timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { + TimeZone.getTimeZone(_) + } + } yield { + TimeZone.setDefault(timezone) + val timestamp = Timestamp.valueOf(timestampString) + (timestampString, timezone.getID()) -> timestamp.getTime() + }).toMap + } finally { + TimeZone.setDefault(originalTz) + } } private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = { - val rowRdd = spark.sparkContext.parallelize(desiredTimestampStrings, 1).map(Row(_)) - val schema = StructType(Seq( - StructField("display", StringType, true) - )) - val df = spark.createDataFrame(rowRdd, schema) + import spark.implicits._ + val df = desiredTimestampStrings.toDF("display") // this will get the millis corresponding to the display time given the current *session* // timezone. - import spark.implicits._ df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)] } @@ -319,7 +312,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi // We write data into our tables, and then check the raw parquet files to see whether // the correct conversion was applied. rawData.write.saveAsTable(s"saveAsTable_$baseTable") - checkHasTz(s"saveAsTable_$baseTable", None) + checkHasTz(spark, s"saveAsTable_$baseTable", None) spark.sql( raw"""CREATE TABLE insert_$baseTable ( | display string, @@ -328,7 +321,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi | STORED AS PARQUET | $tblProperties """.stripMargin) - checkHasTz(s"insert_$baseTable", explicitTz) + checkHasTz(spark, s"insert_$baseTable", explicitTz) rawData.write.insertInto(s"insert_$baseTable") // no matter what, roundtripping via the table should leave the data unchanged val readFromTable = spark.table(s"insert_$baseTable").collect() @@ -368,7 +361,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " + s"sessionTzOpt = $sessionTzOpt") { - withTable(s"external_$baseTable") { + withTable(s"external_$baseTable", s"partitioned_$baseTable") { // we intentionally save this data directly, without creating a table, so we can // see that the data is read back differently depending on table properties. // we'll save with adjusted millis, so that it should be the correct millis after reading @@ -386,9 +379,11 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi case _ => rawData }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts") - withTempPath { path => - adjustedRawData.write.parquet(path.getCanonicalPath) - val options = Map("path" -> path.getCanonicalPath) ++ + withTempPath { basePath => + val unpartitionedPath = new File(basePath, "flat") + val partitionedPath = new File(basePath, "partitioned") + adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath) + val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++ explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map()) spark.catalog.createTable( @@ -397,13 +392,38 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi schema = new StructType().add("display", StringType).add("ts", TimestampType), options = options ) - Seq(false, true).foreach { vectorized => - withClue(s"vectorized = $vectorized;") { + + // also write out a partitioned table, to make sure we can access that correctly. + // add a column we can partition by (value doesn't particularly matter). + val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId) + partitionedData.write.partitionBy("id") + .parquet(partitionedPath.getCanonicalPath) + // unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use + // a "CREATE TABLE" stmt. + val tblOpts = explicitTz.map { tz => raw"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("") + spark.sql(raw"""CREATE EXTERNAL TABLE partitioned_$baseTable ( + | display string, + | ts timestamp + |) + |PARTITIONED BY (id bigint) + |STORED AS parquet + |LOCATION 'file:${partitionedPath.getCanonicalPath}' + |$tblOpts + """.stripMargin) + spark.sql(s"msck repair table partitioned_$baseTable") + + for { + vectorized <- Seq(false, true) + partitioned <- Seq(false, true) + } { + withClue(s"vectorized = $vectorized; partitioned = $partitioned") { spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) + val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable" val query = s"select display, cast(ts as string) as ts_as_string, ts " + - s"from external_$baseTable" + s"from $table" val collectedFromExternal = spark.sql(query).collect() + assert( collectedFromExternal.size === 4) collectedFromExternal.foreach { row => val displayTime = row.getAs[String](0) // the timestamp should still display the same, despite the changes in timezones @@ -433,8 +453,15 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi val hadoopConf = sparkContext.hadoopConfiguration val fs = FileSystem.get(hadoopConf) - val parts = fs.listStatus(new Path(path.getCanonicalPath)) - .filter(_.getPath().getName().endsWith(".parquet")) + val parts = if (partitioned) { + val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath)) + .filter(_.getPath().getName().startsWith("id=")) + fs.listStatus(subdirs.head.getPath()) + .filter(_.getPath().getName().endsWith(".parquet")) + } else { + fs.listStatus(new Path(unpartitionedPath.getCanonicalPath)) + .filter(_.getPath().getName().endsWith(".parquet")) + } // grab the meta data from the parquet file. The next section of asserts just make // sure the test is configured correctly. assert(parts.size == 1) @@ -458,7 +485,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi "<" -> "2016-01-01 02:00:00" ).foreach { case (comparison, value) => val query = - s"select ts from external_$baseTable where ts $comparison '$value'" + s"select ts from $table where ts $comparison '$value'" val countWithFilter = spark.sql(query).count() assert(countWithFilter === 4, query) }