-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31405][SQL] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files #28477
Conversation
@@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String) | |||
* Exception thrown when Spark returns different result after upgrading to a new version. | |||
*/ | |||
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable) | |||
extends SparkException("You may get a different result due to the upgrading of Spark" + | |||
extends RuntimeException("You may get a different result due to the upgrading of Spark" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to throw this exception in vectorized parquet reader, which only allows IOException to be thrown. So change it to unchecked exception. cc @xuanyuanking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy that.
@@ -102,14 +103,14 @@ | |||
// The timezone conversion to apply to int96 timestamps. Null if no conversion. | |||
private final ZoneId convertTz; | |||
private static final ZoneId UTC = ZoneOffset.UTC; | |||
private final boolean rebaseDateTime; | |||
private final String datetimeRebaseMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's very hard to use Scala enum in Java, so I use string instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider to pass enum's id? Like LegacyBehaviorPolicy.EXCEPTION.id
. This could be less expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it affects perf, and the code will be less readable if we see mode == 0
instead of "LEGACY".equals(mode)
Test build #122407 has finished for PR 28477 at commit
|
@@ -46,17 +47,40 @@ class AvroSerializer( | |||
rootCatalystType: DataType, | |||
rootAvroType: Schema, | |||
nullable: Boolean, | |||
rebaseDateTime: Boolean) extends Logging { | |||
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about to add a type alias to LegacyBehaviorPolicy:
object LegacyBehaviorPolicy extends Enumeration {
type LegacyBehaviorPolicy = Value
val EXCEPTION, LEGACY, CORRECTED = Value
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't help match, as we need to access both object LegacyBehaviorPolicy
and type LegacyBehaviorPolicy
and we still need some prefix to distinguish them.
|
||
def newRebaseExceptionInRead(format: String): SparkUpgradeException = { | ||
new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " + | ||
s"1900-01-01 from $format files can be ambiguous, as the files may be written by Spark 2.x " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1900-01-01 - it floating time point (local date). We can point out concrete timestamp 1900-01-01 00:00:00Z
val LEGACY_PARQUET_REBASE_MODE_IN_READ = | ||
buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead") | ||
.internal() | ||
.doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, opposite "from hybrid to Proleptic Gregorian"
val LEGACY_AVRO_REBASE_MODE_IN_READ = | ||
buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead") | ||
.internal() | ||
.doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, fix this too
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Show resolved
Hide resolved
"from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can " + | ||
s"set ${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to rebase the datetime " + | ||
"values w.r.t. the calendar switch during writing, to get maximum interoperability, or set " + | ||
s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'CORRECTED' to write the datetime " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IN_WRITE too
s"1900-01-01 from $format files can be ambiguous, as the files may be written by Spark 2.x " + | ||
"or legacy versions of Hive, which uses a legacy hybrid calendar that is different from " + | ||
"Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set " + | ||
s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to rebase the datetime " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function can be called from Avro too, right. The config can be not relevant for Avro.
private val dateRebaseFunc: Int => Int = datetimeRebaseMode match { | ||
case LegacyBehaviorPolicy.EXCEPTION => | ||
days: Int => | ||
if (days < RebaseDateTime.lastSwitchGregorianDay) { | ||
throw DataSourceUtils.newRebaseExceptionInWrite("Parquet") | ||
} | ||
days | ||
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays | ||
case LegacyBehaviorPolicy.CORRECTED => identity[Int] | ||
} | ||
|
||
private val timestampRebaseFunc: Long => Long = datetimeRebaseMode match { | ||
case LegacyBehaviorPolicy.EXCEPTION => | ||
micros: Long => | ||
if (micros < RebaseDateTime.lastSwitchGregorianTs) { | ||
throw DataSourceUtils.newRebaseExceptionInWrite("Parquet") | ||
} | ||
micros | ||
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros | ||
case LegacyBehaviorPolicy.CORRECTED => identity[Long] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is repeated again. Maybe move it to some common place somehow.
SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) { | ||
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mode has String type, so, no need to call toString()
} | ||
} | ||
} | ||
|
||
Seq(false, true).foreach { vectorized => | ||
Seq(true).foreach { vectorized => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just for debugging?
"When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + | ||
"When EXCEPTION, which is the default, Spark will fail the reading if it sees " + | ||
"ancient dates/timestamps that are ambiguous between the two calendars. This config is " + | ||
"only affective if the writer info (like Spark, Hive) of the Parquet files is unknown.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
affective -> effective ?
"When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + | ||
"When EXCEPTION, which is the default, Spark will fail the reading if it sees " + | ||
"ancient dates/timestamps that are ambiguous between the two calendars. This config is " + | ||
"only affective if the writer info (like Spark, Hive) of the Avro files is unknown.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
affective -> effective ?
@cloud-fan I have a question from a user's standpoint. I know that we are not really excepting this exception as it only happens when the data has really old date/timestamps. But say, i get this error while running an existing work-load during read. What are my options ? Should i be choosing the LEGACY or CORRECTED option ? Are we in a position to recommend one option vs other ? |
This needs the knowledge from users and Spark really doesn't know. If the files were written by Spark 2.x, let's choose LEGACY. If the files were written by Impala or other systems that use the standard calendar, let's choose CORRECTED. For the write side, it's similar and depends on who will read the files later. |
@cloud-fan Thanks for the explanation. |
Test build #122431 has finished for PR 28477 at commit
|
Test build #122435 has finished for PR 28477 at commit
|
Test build #122489 has finished for PR 28477 at commit
|
.options(extraOptions) | ||
.save(path) | ||
withSQLConf( | ||
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LegacyBehaviorPolicy.CORRECTED.toString ?
@@ -161,9 +161,10 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { | |||
Seq(true, false).foreach { modernDates => | |||
Seq(false, true).foreach { rebase => | |||
benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ => | |||
val mode = if (rebase) "LEGACY" else "CORRECTED" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LEGACY.toString?
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Outdated
Show resolved
Hide resolved
Test build #122495 has finished for PR 28477 at commit
|
@@ -211,24 +212,25 @@ public void readIntegersWithRebase( | |||
WritableColumnVector c, | |||
int rowId, | |||
int level, | |||
VectorizedValuesReader data) throws IOException { | |||
VectorizedValuesReader data, | |||
final boolean failIfRebase) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in case, what is purpose of final
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
means it's immutable and may help to optimize if (failIfRebase)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #122494 has finished for PR 28477 at commit
|
Test build #122496 has finished for PR 28477 at commit
|
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too. Some nits
Test build #122535 has finished for PR 28477 at commit
|
retest this please |
Test build #122539 has finished for PR 28477 at commit
|
retest this please |
Test build #122541 has finished for PR 28477 at commit
|
retest this please |
Test build #122546 has finished for PR 28477 at commit
|
retest this please |
Test build #122549 has finished for PR 28477 at commit
|
Test build #122552 has finished for PR 28477 at commit
|
Test build #5001 has finished for PR 28477 at commit
|
Test build #122571 has finished for PR 28477 at commit
|
Test build #122576 has finished for PR 28477 at commit
|
Test build #5002 has finished for PR 28477 at commit
|
retest this please |
Test build #122603 has finished for PR 28477 at commit
|
I am just going to merge to unblock Spark 3.0. All tests passed and the last change was only nits. I also manually ran the tests here to double check. |
Merged to master. |
@cloud-fan, can you open a backport PR? seems there are some conflicts. |
…me values from/to Parquet/Avro files When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not. Rebase or not rebase have different behaviors and we should let users decide it explicitly. In most cases, users won't hit this exception as it only affects ancient datetime values. Yes, now users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config. updated tests Closes apache#28477 from cloud-fan/rebase. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not.
Why are the changes needed?
Rebase or not rebase have different behaviors and we should let users decide it explicitly. In most cases, users won't hit this exception as it only affects ancient datetime values.
Does this PR introduce any user-facing change?
Yes, now users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config.
How was this patch tested?
updated tests