-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps #16781
Conversation
Test build #72287 has finished for PR 16781 at commit
|
Test build #72288 has finished for PR 16781 at commit
|
Test build #74031 has finished for PR 16781 at commit
|
Test build #74042 has finished for PR 16781 at commit
|
Please update the pull request description, because the one dated Feb 2 does not correspond to the fix any more. |
// The conf is sometimes null in tests. | ||
String tzString = | ||
conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY()); | ||
if (tzString == null || tzString == "") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is java code, not scala, you probably meant tzString.equals("") instead of tzString == ""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or even better, isEmpty().
@@ -674,6 +674,12 @@ object SQLConf { | |||
.stringConf | |||
.createWithDefault(TimeZone.getDefault().getID()) | |||
|
|||
val PARQUET_TABLE_INCLUDE_TIMEZONE = | |||
buildConf("spark.sql.session.parquet.timeZone") | |||
.doc("""Enables inclusion of parquet timezone property in newly created parquet tables""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a config option for writing "UTC" to the table property when creating tables, not for writing the local timezone.
testParquetHiveCompatibility( | ||
Row(Seq(Row(1))), | ||
"ARRAY<STRUCT<array_element: INT>>") | ||
} | ||
|
||
test(s"SPARK-12297: Parquet Timestamp & Hive timezone") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to have separate test cases for adjustments when reading, adjustments when writing and setting the table property when creating tables.
@ueshin thanks for taking a look. Yes, that understanding is correct. Another way to think about it is to compare those same operations with different file formats, eg. textfile. Those work more like parquet does after this patch. I had that explanation in a comment on the jira -- I just updated the jira description to include it. I'll address your comments, they also are making me take a closer look at a couple of things. I should push an update tomorrow. |
@ueshin I've pushed an update which addresses your comments. I also realized that partitioned tables weren't handled correctly! I fixed that as well. |
) | ||
Seq(false, true).foreach { vectorized => | ||
withClue(s"vectorized = $vectorized;") { | ||
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was initially using SQLTestUtils.withSQLConf
, but I discovered that it wasn't actually taking any effect. I dunno if that is because TestHiveSingleton
does something strange, or maybe I'm doing something else weird in this test by creating many new spark sessions. But I did that because it was the only way I could get the conf changes applied consistently.
Since I am creating new sessions, I don't think this has any risk of a failed test not cleaning and triggering failures in other tests outside of this suite. But it still seems like I might be doing something wrong ...
Test build #75966 has finished for PR 16781 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito Thank you for working on this!
I checked updates and added some comments.
Btw, can you fix the partitioned tables?
// hadoopConf for the Parquet Converters | ||
val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY | ||
val storageTz = relation.tableMeta.properties.getOrElse(storageTzKey, "") | ||
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sessionTz
isn't used.
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone | ||
Map( | ||
storageTzKey -> storageTz | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should return Map.empty
if the value isn't included to the table properties?
val schema = StructType(Seq( | ||
StructField("display", StringType, true) | ||
)) | ||
val df = spark.createDataFrame(rowRdd, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use val df = desiredTimestampStrings.toDF("display")
after import spark.implicits._
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, appreciate the help simplifying this. I had a feeling it was more complex than it needed to be :)
// is for various "wall-clock" times in different timezones, and then we can compare against those | ||
// in our tests. | ||
val originalTz = TimeZone.getDefault | ||
val timestampTimezoneToMillis = try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we initialize this in the block like this?
val timestampTimezoneToMillis = {
val originalTz = TimeZone.getDefault
try {
...
} finally {
TimeZone.setDefault(originalTz)
}
}
"UTC" -> "UTC", | ||
"LA" -> "America/Los_Angeles", | ||
"Berlin" -> "Europe/Berlin" | ||
).foreach { case (tableName, zone) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be testTimezones.foreach { ...
?
@@ -42,6 +52,15 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi | |||
""".stripMargin) | |||
} | |||
|
|||
override def afterEach(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was probably just being a little paranoid, perhaps I had missed a withTable
somewhere. In the current code, things work just fine if I remove them.
* the hour. | ||
* @param storageTz the timezone which was used to store the timestamp. This should come from the | ||
* timestamp table property, or else assume its the same as the sessionTz | ||
* @return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add descriptions for @param binary
and @return
?
} | ||
|
||
private def checkHasTz(table: String, tz: Option[String]): Unit = { | ||
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should explicitly pass sparkSession
and use it here?
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY | ||
withTable(baseTable, s"like_$baseTable", s"select_$baseTable") { | ||
val localTz = TimeZone.getDefault() | ||
val localTzId = localTz.getID() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
localTz
and localTzId
aren't used.
spark.sql( | ||
raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""") | ||
checkHasTz(baseTable, Some("America/Los_Angeles")) | ||
spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra white space, and two more below this.
@ueshin updated per your feedback. I should have explained that the last update did handle partition tables (it added the second call to |
Test build #76141 has finished for PR 16781 at commit
|
|
||
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { | ||
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton | ||
with BeforeAndAfterEach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need BeforeAndAfterEach
anymore.
| display string, | ||
| ts timestamp | ||
|) | ||
|PARTITIONED BY (id bigint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should test for the partitioned table like PARTITIONED BY (ts timestamp)
?
@ueshin sorry it took me a while to figure out how a table partitioned by timestamps work (I didn't even realize that was possible, I don't think it is in hive?) and I was traveling. The good news is that partitioning by timestamp works just fine. Since the ts is stored as a string anyway, and converted using the session tz already, it already works. I added one minimal test on this -- when the partitioned table is written, the correct partition dirs are created regardless of the timezone combinations. In particular, it doesn't make sense to do tests like the existing ones, where we write or read "unadjusted" data, bypassing the hive tables, and then make sure the right adjustments are applied when you perform the reverse action via the hive table; the partition values are correct whether you use the hive table & adjustment property or not. Let me know if you think more tests are required. |
Test build #76391 has finished for PR 16781 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito Thank you for working on this!
The behavior looks good for me.
I left some minor comments.
Thanks!
import org.apache.hadoop.fs.{FileSystem, Path} | ||
import org.apache.parquet.hadoop.ParquetFileReader | ||
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName | ||
import org.scalatest.BeforeAndAfterEach |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary import.
val defaultTz = None | ||
// check that created tables have correct TBLPROPERTIES | ||
val tblProperties = explicitTz.map { | ||
tz => raw"""TBLPROPERTIES ($key="$tz")""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use s""
instead of raw""
if possible. And also elsewhere in the same way.
val timestampTimezoneToMillis = { | ||
val originalTz = TimeZone.getDefault | ||
try { | ||
(for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use flatMap { .. map { ... } }
.
@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} | |||
import org.apache.spark.sql.catalyst.catalog._ | |||
import org.apache.spark.sql.catalyst.plans.logical._ | |||
import org.apache.spark.sql.execution.datasources._ | |||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | |||
import org.apache.spark.sql.internal.SQLConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary import.
tz => raw"""TBLPROPERTIES ($key="$tz")""" | ||
}.getOrElse("") | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra line.
baseTable: String, | ||
explicitTz: Option[String], | ||
sessionTzOpt: Option[String]): Unit = { | ||
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
Test build #76419 has finished for PR 16781 at commit
|
Jenkins, retest this please. |
LGTM, pending Jenkins. |
Test build #76556 has finished for PR 16781 at commit
|
Thanks! Merging to master. |
great! thanks @ueshin |
Did we conduct any performance tests on this patch? |
## What changes were proposed in this pull request? This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats. If the storage timezone is the same as the session timezone, this conversion is a no-op. When data is read from a hive table, the table property is *always* respected. This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is). Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same scheme in HIVE-12767 / HIVE-16231. The default for Spark remains unchanged; created tables do not include the new table property. This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied. ## How was this patch tested? Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off). Author: Imran Rashid <irashid@cloudera.com> Closes apache#16781 from squito/SPARK-12297.
What changes were proposed in this pull request?
This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats. If the storage timezone is the same as the session timezone, this conversion is a no-op. When data is read from a hive table, the table property is always respected. This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is).
Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same scheme in HIVE-12767 / HIVE-16231.
The default for Spark remains unchanged; created tables do not include the new table property.
This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied.
How was this patch tested?
Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off).