Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps #16781

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
53d0744
very basic test for adjusting read parquet data
squito Jan 27, 2017
69a3c8c
wip
squito Jan 27, 2017
51e24f2
working version for non-vectorized read -- lots of garbage too
squito Jan 31, 2017
7e61841
working for vectorized reads -- not sure about all code paths
squito Jan 31, 2017
9fbde13
more tests for write path
squito Feb 1, 2017
bac9eb0
expand tests; fix some metastore interaction; cleanup a lot of garbage
squito Feb 1, 2017
1b05978
more cleanup
squito Feb 1, 2017
b622d27
handle bad timezones; include unit test
squito Feb 1, 2017
0604403
write support; lots more unit tests
squito Feb 2, 2017
f45516d
add tests for alter table
squito Feb 2, 2017
d4511a6
utc or gmt; cleanup
squito Feb 2, 2017
223ce2c
more cleanup
squito Feb 2, 2017
5b49ae0
fix compatibility
squito Feb 2, 2017
9ef60a4
Merge branch 'master' into SPARK-12297
squito Mar 1, 2017
0b6883c
Merge branch 'master' into SPARK-12297
squito Mar 2, 2017
69b8142
wip
squito Mar 3, 2017
7ca2c86
fix
squito Mar 3, 2017
6f982d3
fixes; passes tests now
squito Mar 6, 2017
1ad2f83
Merge branch 'master' into SPARK-12297
squito Mar 6, 2017
2c8a228
fix merge
squito Mar 6, 2017
f0b89fd
fix
squito Mar 6, 2017
db0216f
Merge branch 'master' into SPARK-12297
squito Mar 13, 2017
46fab8d
refactor the test
squito Mar 13, 2017
c242fb8
cleanup
squito Mar 13, 2017
c87a573
reset timezone property after tests; make tests more granular
squito Mar 14, 2017
db7e514
Merge branch 'master' into SPARK-12297
squito Mar 14, 2017
f4dca27
separate tests for reading & writing
squito Mar 15, 2017
2891582
Merge branch 'master' into SPARK-12297
squito Mar 15, 2017
d951443
fix merge
squito Mar 15, 2017
38e19cd
Merge branch 'master' into SPARK-12297
squito Mar 16, 2017
1e3b768
Merge branch 'master' into SPARK-12297
squito Mar 23, 2017
39f506c
cleanup
squito Mar 23, 2017
f33bc91
Merge branch 'master' into SPARK-12297
squito Mar 24, 2017
17565e8
remove config for setting table time zone automatically
squito Mar 24, 2017
a96806f
fixup
squito Mar 24, 2017
7582b2c
predicate pushdown tests
squito Apr 4, 2017
5817064
minor cleanup
squito Apr 6, 2017
773704a
Merge branch 'master' into SPARK-12297
squito Apr 10, 2017
be134be
fix merge
squito Apr 10, 2017
d15b660
swap conversion logic
squito Apr 10, 2017
283b1c7
update tests a bunch; tests session timezones, and tests are easier t…
squito Apr 11, 2017
6ccaa92
session timezones
squito Apr 11, 2017
71c7e60
cleanup
squito Apr 11, 2017
75e8579
Merge branch 'master' into SPARK-12297
squito Apr 11, 2017
e4e88a5
partial review feedback
squito Apr 19, 2017
44a8bbb
better param names and docs
squito Apr 20, 2017
e31657a
review feedback
squito Apr 25, 2017
d4ff9fd
Merge branch 'master' into SPARK-12297
squito May 2, 2017
acc72ea
add check for partitioned tables
squito May 2, 2017
b9c03e9
fix typo
squito May 2, 2017
fc17a2e
review feedback
squito May 2, 2017
2537437
review feedback
squito May 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ private[parquet] class ParquetRowConverter(
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTsString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTsString).map(TimeZone.getTimeZone(_))
val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
.getOrElse(DateTimeUtils.defaultTimeZone())
val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
Expand Down Expand Up @@ -685,13 +685,13 @@ private[parquet] object ParquetRowConverter {
* actually store it as micros since epoch, why we have to apply a conversion when timezones
* change.
*
* @param binary
* @param binary a parquet Binary which holds one int96
* @param sessionTz the session timezone. This will be used to determine how to display the time,
* and compute functions on the timestamp which involve a timezone, eg. extract
* the hour.
* @param storageTz the timezone which was used to store the timestamp. This should come from the
* timestamp table property, or else assume its the same as the sessionTz
* @return
* @return a timestamp (millis since epoch) which will render correctly in the sessionTz
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add @fromTz, @toTz and descriptions of these?

def binaryToSQLTimestamp(
binary: Binary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// We add the table timezone to the relation options, which automatically gets injected into the
// hadoopConf for the Parquet Converters
val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val storageTz = relation.tableMeta.properties.getOrElse(storageTzKey, "")
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone
Map(
storageTzKey -> storageTz
)
relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap
}

private def inferIfNeeded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive

import java.io.File
import java.sql.Timestamp
import java.util.TimeZone

Expand Down Expand Up @@ -52,15 +53,6 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
""".stripMargin)
}

override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
spark.sessionState.catalog.reset()
} finally {
super.afterEach()
}
}

private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = {
withTable("parquet_compat") {
withTempPath { dir =>
Expand Down Expand Up @@ -180,11 +172,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt)
// check with a variety of timezones. The unit tests currently are configured to always use
// America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone.
Seq(
"UTC" -> "UTC",
"LA" -> "America/Los_Angeles",
"Berlin" -> "Europe/Berlin"
).foreach { case (tableName, zone) =>
testTimezones.foreach { case (tableName, zone) =>
if (zone != localTz) {
testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt)
}
Expand All @@ -201,7 +189,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
}

private def checkHasTz(table: String, tz: Option[String]): Unit = {
private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = {
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should explicitly pass sparkSession and use it here?

assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz)
}
Expand All @@ -214,9 +202,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " +
s"sessionTzOpt = $sessionTzOpt") {
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
withTable(baseTable, s"like_$baseTable", s"select_$baseTable") {
val localTz = TimeZone.getDefault()
val localTzId = localTz.getID()
withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") {
// If we ever add a property to set the table timezone by default, defaultTz would change
val defaultTz = None
// check that created tables have correct TBLPROPERTIES
Expand All @@ -231,28 +217,37 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
| $tblProperties
""".stripMargin)
val expectedTableTz = explicitTz.orElse(defaultTz)
checkHasTz(baseTable, expectedTableTz)
checkHasTz(spark, baseTable, expectedTableTz)
spark.sql(
raw"""CREATE TABLE partitioned_$baseTable (
| x int
| )
| PARTITIONED BY (y int)
| STORED AS PARQUET
| $tblProperties
""".stripMargin)
checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz)
spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable")
checkHasTz(s"like_$baseTable", expectedTableTz)
checkHasTz(spark, s"like_$baseTable", expectedTableTz)
spark.sql(
raw"""CREATE TABLE select_$baseTable
| STORED AS PARQUET
| AS
| SELECT * from $baseTable
""".stripMargin)
checkHasTz(s"select_$baseTable", defaultTz)
checkHasTz(spark, s"select_$baseTable", defaultTz)

// check alter table, setting, unsetting, resetting the property
spark.sql(
raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
checkHasTz(baseTable, Some("America/Los_Angeles"))
spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
checkHasTz(baseTable, Some("UTC"))
spark.sql( raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
checkHasTz(baseTable, None)
checkHasTz(spark, baseTable, Some("America/Los_Angeles"))
spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
checkHasTz(spark, baseTable, Some("UTC"))
spark.sql(raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
checkHasTz(spark, baseTable, None)
explicitTz.foreach { tz =>
spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
checkHasTz(baseTable, expectedTableTz)
spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
checkHasTz(spark, baseTable, expectedTableTz)
}
}
}
Expand All @@ -269,31 +264,29 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
// we manually adjust the timezone just to determine what the desired millis (since epoch, in utc)
// is for various "wall-clock" times in different timezones, and then we can compare against those
// in our tests.
val originalTz = TimeZone.getDefault
val timestampTimezoneToMillis = try {
(for {
timestampString <- desiredTimestampStrings
timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map {
TimeZone.getTimeZone(_)
}
} yield {
TimeZone.setDefault(timezone)
val timestamp = Timestamp.valueOf(timestampString)
(timestampString, timezone.getID()) -> timestamp.getTime()
}).toMap
} finally {
TimeZone.setDefault(originalTz)
val timestampTimezoneToMillis = {
val originalTz = TimeZone.getDefault
try {
(for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use flatMap { .. map { ... } }.

timestampString <- desiredTimestampStrings
timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map {
TimeZone.getTimeZone(_)
}
} yield {
TimeZone.setDefault(timezone)
val timestamp = Timestamp.valueOf(timestampString)
(timestampString, timezone.getID()) -> timestamp.getTime()
}).toMap
} finally {
TimeZone.setDefault(originalTz)
}
}

private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = {
val rowRdd = spark.sparkContext.parallelize(desiredTimestampStrings, 1).map(Row(_))
val schema = StructType(Seq(
StructField("display", StringType, true)
))
val df = spark.createDataFrame(rowRdd, schema)
import spark.implicits._
val df = desiredTimestampStrings.toDF("display")
// this will get the millis corresponding to the display time given the current *session*
// timezone.
import spark.implicits._
df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)]
}

Expand All @@ -319,7 +312,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
// We write data into our tables, and then check the raw parquet files to see whether
// the correct conversion was applied.
rawData.write.saveAsTable(s"saveAsTable_$baseTable")
checkHasTz(s"saveAsTable_$baseTable", None)
checkHasTz(spark, s"saveAsTable_$baseTable", None)
spark.sql(
raw"""CREATE TABLE insert_$baseTable (
| display string,
Expand All @@ -328,7 +321,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
| STORED AS PARQUET
| $tblProperties
""".stripMargin)
checkHasTz(s"insert_$baseTable", explicitTz)
checkHasTz(spark, s"insert_$baseTable", explicitTz)
rawData.write.insertInto(s"insert_$baseTable")
// no matter what, roundtripping via the table should leave the data unchanged
val readFromTable = spark.table(s"insert_$baseTable").collect()
Expand Down Expand Up @@ -368,7 +361,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " +
s"sessionTzOpt = $sessionTzOpt") {
withTable(s"external_$baseTable") {
withTable(s"external_$baseTable", s"partitioned_$baseTable") {
// we intentionally save this data directly, without creating a table, so we can
// see that the data is read back differently depending on table properties.
// we'll save with adjusted millis, so that it should be the correct millis after reading
Expand All @@ -386,9 +379,11 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
case _ =>
rawData
}).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts")
withTempPath { path =>
adjustedRawData.write.parquet(path.getCanonicalPath)
val options = Map("path" -> path.getCanonicalPath) ++
withTempPath { basePath =>
val unpartitionedPath = new File(basePath, "flat")
val partitionedPath = new File(basePath, "partitioned")
adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath)
val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++
explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map())

spark.catalog.createTable(
Expand All @@ -397,13 +392,38 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
schema = new StructType().add("display", StringType).add("ts", TimestampType),
options = options
)
Seq(false, true).foreach { vectorized =>
withClue(s"vectorized = $vectorized;") {

// also write out a partitioned table, to make sure we can access that correctly.
// add a column we can partition by (value doesn't particularly matter).
val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId)
partitionedData.write.partitionBy("id")
.parquet(partitionedPath.getCanonicalPath)
// unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use
// a "CREATE TABLE" stmt.
val tblOpts = explicitTz.map { tz => raw"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("")
spark.sql(raw"""CREATE EXTERNAL TABLE partitioned_$baseTable (
| display string,
| ts timestamp
|)
|PARTITIONED BY (id bigint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test for the partitioned table like PARTITIONED BY (ts timestamp)?

|STORED AS parquet
|LOCATION 'file:${partitionedPath.getCanonicalPath}'
|$tblOpts
""".stripMargin)
spark.sql(s"msck repair table partitioned_$baseTable")

for {
vectorized <- Seq(false, true)
partitioned <- Seq(false, true)
} {
withClue(s"vectorized = $vectorized; partitioned = $partitioned") {
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially using SQLTestUtils.withSQLConf, but I discovered that it wasn't actually taking any effect. I dunno if that is because TestHiveSingleton does something strange, or maybe I'm doing something else weird in this test by creating many new spark sessions. But I did that because it was the only way I could get the conf changes applied consistently.

Since I am creating new sessions, I don't think this has any risk of a failed test not cleaning and triggering failures in other tests outside of this suite. But it still seems like I might be doing something wrong ...

val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable"
val query = s"select display, cast(ts as string) as ts_as_string, ts " +
s"from external_$baseTable"
s"from $table"
val collectedFromExternal = spark.sql(query).collect()
assert( collectedFromExternal.size === 4)
collectedFromExternal.foreach { row =>
val displayTime = row.getAs[String](0)
// the timestamp should still display the same, despite the changes in timezones
Expand Down Expand Up @@ -433,8 +453,15 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi

val hadoopConf = sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val parts = fs.listStatus(new Path(path.getCanonicalPath))
.filter(_.getPath().getName().endsWith(".parquet"))
val parts = if (partitioned) {
val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath))
.filter(_.getPath().getName().startsWith("id="))
fs.listStatus(subdirs.head.getPath())
.filter(_.getPath().getName().endsWith(".parquet"))
} else {
fs.listStatus(new Path(unpartitionedPath.getCanonicalPath))
.filter(_.getPath().getName().endsWith(".parquet"))
}
// grab the meta data from the parquet file. The next section of asserts just make
// sure the test is configured correctly.
assert(parts.size == 1)
Expand All @@ -458,7 +485,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
"<" -> "2016-01-01 02:00:00"
).foreach { case (comparison, value) =>
val query =
s"select ts from external_$baseTable where ts $comparison '$value'"
s"select ts from $table where ts $comparison '$value'"
val countWithFilter = spark.sql(query).count()
assert(countWithFilter === 4, query)
}
Expand Down