review feedback
squito committed Apr 25, 2017
1 parent 44a8bbb commit e31657a
Expand Up @@ -267,8 +267,8 @@ private[parquet] class ParquetRowConverter(
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTsString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTsString).map(TimeZone.getTimeZone(_))
val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
Expand Down Expand Up @@ -685,13 +685,13 @@ private[parquet] object ParquetRowConverter {
* actually store it as micros since epoch, why we have to apply a conversion when timezones
* change.
* @param binary
* @param binary a parquet Binary which holds one int96
* @param sessionTz the session timezone. This will be used to determine how to display the time,
* and compute functions on the timestamp which involve a timezone, eg. extract
* the hour.
* @param storageTz the timezone which was used to store the timestamp. This should come from the
* timestamp table property, or else assume its the same as the sessionTz
* @return
* @return a timestamp (millis since epoch) which will render correctly in the sessionTz
def binaryToSQLTimestamp(
binary: Binary,
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// We add the table timezone to the relation options, which automatically gets injected into the
// hadoopConf for the Parquet Converters
val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val storageTz =, "")
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone
storageTzKey -> storageTz
) -> _).toMap

private def inferIfNeeded(
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive

import java.sql.Timestamp
import java.util.TimeZone

Expand Down Expand Up @@ -52,15 +53,6 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi

override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
} finally {

private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = {
withTable("parquet_compat") {
withTempPath { dir =>
Expand Down Expand Up @@ -180,11 +172,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt)
// check with a variety of timezones. The unit tests currently are configured to always use
// America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone.
"UTC" -> "UTC",
"LA" -> "America/Los_Angeles",
"Berlin" -> "Europe/Berlin"
).foreach { case (tableName, zone) =>
testTimezones.foreach { case (tableName, zone) =>
if (zone != localTz) {
testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt)
Expand All @@ -201,7 +189,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)

private def checkHasTz(table: String, tz: Option[String]): Unit = {
private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = {
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
assert( === tz)
Expand All @@ -214,9 +202,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " +
s"sessionTzOpt = $sessionTzOpt") {
withTable(baseTable, s"like_$baseTable", s"select_$baseTable") {
val localTz = TimeZone.getDefault()
val localTzId = localTz.getID()
withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") {
// If we ever add a property to set the table timezone by default, defaultTz would change
val defaultTz = None
// check that created tables have correct TBLPROPERTIES
Expand All @@ -231,28 +217,37 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
| $tblProperties
val expectedTableTz = explicitTz.orElse(defaultTz)
checkHasTz(baseTable, expectedTableTz)
checkHasTz(spark, baseTable, expectedTableTz)
raw"""CREATE TABLE partitioned_$baseTable (
| x int
| )
| $tblProperties
checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz)
spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable")
checkHasTz(s"like_$baseTable", expectedTableTz)
checkHasTz(spark, s"like_$baseTable", expectedTableTz)
raw"""CREATE TABLE select_$baseTable
| AS
| SELECT * from $baseTable
checkHasTz(s"select_$baseTable", defaultTz)
checkHasTz(spark, s"select_$baseTable", defaultTz)

// check alter table, setting, unsetting, resetting the property
raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
checkHasTz(baseTable, Some("America/Los_Angeles"))
spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
checkHasTz(baseTable, Some("UTC"))
spark.sql( raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
checkHasTz(baseTable, None)
checkHasTz(spark, baseTable, Some("America/Los_Angeles"))
spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
checkHasTz(spark, baseTable, Some("UTC"))
spark.sql(raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
checkHasTz(spark, baseTable, None)
explicitTz.foreach { tz =>
spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
checkHasTz(baseTable, expectedTableTz)
spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
checkHasTz(spark, baseTable, expectedTableTz)
Expand All @@ -269,31 +264,29 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
// we manually adjust the timezone just to determine what the desired millis (since epoch, in utc)
// is for various "wall-clock" times in different timezones, and then we can compare against those
// in our tests.
val originalTz = TimeZone.getDefault
val timestampTimezoneToMillis = try {
(for {
timestampString <- desiredTimestampStrings
timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map {
} yield {
val timestamp = Timestamp.valueOf(timestampString)
(timestampString, timezone.getID()) -> timestamp.getTime()
} finally {
val timestampTimezoneToMillis = {
val originalTz = TimeZone.getDefault
try {
(for {
timestampString <- desiredTimestampStrings
timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map {
} yield {
val timestamp = Timestamp.valueOf(timestampString)
(timestampString, timezone.getID()) -> timestamp.getTime()
} finally {

private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = {
val rowRdd = spark.sparkContext.parallelize(desiredTimestampStrings, 1).map(Row(_))
val schema = StructType(Seq(
StructField("display", StringType, true)
val df = spark.createDataFrame(rowRdd, schema)
import spark.implicits._
val df = desiredTimestampStrings.toDF("display")
// this will get the millis corresponding to the display time given the current *session*
// timezone.
import spark.implicits._
df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)]

Expand All @@ -319,7 +312,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
// We write data into our tables, and then check the raw parquet files to see whether
// the correct conversion was applied.
checkHasTz(s"saveAsTable_$baseTable", None)
checkHasTz(spark, s"saveAsTable_$baseTable", None)
raw"""CREATE TABLE insert_$baseTable (
| display string,
Expand All @@ -328,7 +321,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
| $tblProperties
checkHasTz(s"insert_$baseTable", explicitTz)
checkHasTz(spark, s"insert_$baseTable", explicitTz)
// no matter what, roundtripping via the table should leave the data unchanged
val readFromTable = spark.table(s"insert_$baseTable").collect()
Expand Down Expand Up @@ -368,7 +361,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " +
s"sessionTzOpt = $sessionTzOpt") {
withTable(s"external_$baseTable") {
withTable(s"external_$baseTable", s"partitioned_$baseTable") {
// we intentionally save this data directly, without creating a table, so we can
// see that the data is read back differently depending on table properties.
// we'll save with adjusted millis, so that it should be the correct millis after reading
Expand All @@ -386,9 +379,11 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
case _ =>
}).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts")
withTempPath { path =>
val options = Map("path" -> path.getCanonicalPath) ++
withTempPath { basePath =>
val unpartitionedPath = new File(basePath, "flat")
val partitionedPath = new File(basePath, "partitioned")
val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++ { tz => Map(key -> tz) }.getOrElse(Map())

Expand All @@ -397,13 +392,38 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
schema = new StructType().add("display", StringType).add("ts", TimestampType),
options = options
Seq(false, true).foreach { vectorized =>
withClue(s"vectorized = $vectorized;") {

// also write out a partitioned table, to make sure we can access that correctly.
// add a column we can partition by (value doesn't particularly matter).
val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId)
// unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use
// a "CREATE TABLE" stmt.
val tblOpts = { tz => raw"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("")
spark.sql(raw"""CREATE EXTERNAL TABLE partitioned_$baseTable (
| display string,
| ts timestamp
|PARTITIONED BY (id bigint)
|STORED AS parquet
|LOCATION 'file:${partitionedPath.getCanonicalPath}'
spark.sql(s"msck repair table partitioned_$baseTable")

for {
vectorized <- Seq(false, true)
partitioned <- Seq(false, true)
} {
withClue(s"vectorized = $vectorized; partitioned = $partitioned") {
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable"
val query = s"select display, cast(ts as string) as ts_as_string, ts " +
s"from external_$baseTable"
s"from $table"
val collectedFromExternal = spark.sql(query).collect()
assert( collectedFromExternal.size === 4)
collectedFromExternal.foreach { row =>
val displayTime = row.getAs[String](0)
// the timestamp should still display the same, despite the changes in timezones
Expand Down Expand Up @@ -433,8 +453,15 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi

val hadoopConf = sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val parts = fs.listStatus(new Path(path.getCanonicalPath))
val parts = if (partitioned) {
val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath))
} else {
fs.listStatus(new Path(unpartitionedPath.getCanonicalPath))
// grab the meta data from the parquet file. The next section of asserts just make
// sure the test is configured correctly.
assert(parts.size == 1)
Expand All @@ -458,7 +485,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
"<" -> "2016-01-01 02:00:00"
).foreach { case (comparison, value) =>
val query =
s"select ts from external_$baseTable where ts $comparison '$value'"
s"select ts from $table where ts $comparison '$value'"
val countWithFilter = spark.sql(query).count()
assert(countWithFilter === 4, query)
Expand Down

