From 9f14144d191c8c41d8b3bd9585d78eb7ddae8407 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 14 May 2020 12:32:40 +0900 Subject: [PATCH] [SPARK-31405][SQL] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not. Rebase or not rebase have different behaviors and we should let users decide it explicitly. In most cases, users won't hit this exception as it only affects ancient datetime values. Yes, now users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config. updated tests Closes #28477 from cloud-fan/rebase. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../org/apache/spark/SparkException.scala | 2 +- .../spark/sql/avro/AvroDeserializer.scala | 37 +++---- .../spark/sql/avro/AvroFileFormat.scala | 10 +- .../spark/sql/avro/AvroOutputWriter.scala | 13 ++- .../spark/sql/avro/AvroSerializer.scala | 30 ++--- .../v2/avro/AvroPartitionReaderFactory.scala | 9 +- .../AvroCatalystDataConversionSuite.scala | 2 +- .../org/apache/spark/sql/avro/AvroSuite.scala | 91 +++++++++++----- .../sql/catalyst/json/JacksonParser.scala | 1 + .../sql/catalyst/util/RebaseDateTime.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 100 ++++++++--------- .../parquet/VectorizedColumnReader.java | 91 +++++++++++----- .../VectorizedParquetRecordReader.java | 12 +- .../parquet/VectorizedPlainValuesReader.java | 29 +++-- .../parquet/VectorizedRleValuesReader.java | 31 +++--- .../parquet/VectorizedValuesReader.java | 4 +- .../datasources/DataSourceUtils.scala | 103 +++++++++++++++++- .../execution/datasources/FileScanRDD.scala | 6 +- .../parquet/ParquetFileFormat.scala | 11 +- .../parquet/ParquetReadSupport.scala | 7 +- .../parquet/ParquetRecordMaterializer.scala | 9 +- .../parquet/ParquetRowConverter.scala | 68 +++++------- .../parquet/ParquetWriteSupport.scala | 46 ++++---- .../ParquetPartitionReaderFactory.scala | 23 ++-- .../benchmark/DateTimeRebaseBenchmark.scala | 4 +- .../datasources/parquet/ParquetIOSuite.scala | 89 +++++++++------ .../sql/sources/HadoopFsRelationTest.scala | 6 +- 27 files changed, 519 insertions(+), 319 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 81c087e314be1..41382133bd84c 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String) * Exception thrown when Spark returns different result after upgrading to a new version. */ private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable) - extends SparkException("You may get a different result due to the upgrading of Spark" + + extends RuntimeException("You may get a different result due to the upgrading of Spark" + s" $version: $message", cause) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 27206edb287f9..4fc804067d54f 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * A deserializer to deserialize data in avro format to data in catalyst format. */ -class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) { +class AvroDeserializer( + rootAvroType: Schema, + rootCatalystType: DataType, + datetimeRebaseMode: LegacyBehaviorPolicy.Value) { def this(rootAvroType: Schema, rootCatalystType: DataType) { this(rootAvroType, rootCatalystType, - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)) + LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))) } private lazy val decimalConversions = new DecimalConversion() + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead( + datetimeRebaseMode, "Avro") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( + datetimeRebaseMode, "Avro") + private val converter: Any => Any = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => @@ -96,13 +107,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) - case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) => - val days = value.asInstanceOf[Int] - val rebasedDays = rebaseJulianToGregorianDays(days) - updater.setInt(ordinal, rebasedDays) - case (INT, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) case (LONG, LongType) => (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long]) @@ -110,22 +116,13 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD case (LONG, TimestampType) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), the value is processed as timestamp type with millisecond precision. - case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) => - val millis = value.asInstanceOf[Long] - val micros = DateTimeUtils.fromMillis(millis) - val rebasedMicros = rebaseJulianToGregorianMicros(micros) - updater.setLong(ordinal, rebasedMicros) case null | _: TimestampMillis => (updater, ordinal, value) => val millis = value.asInstanceOf[Long] val micros = DateTimeUtils.fromMillis(millis) - updater.setLong(ordinal, micros) - case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) => - val micros = value.asInstanceOf[Long] - val rebasedMicros = rebaseJulianToGregorianMicros(micros) - updater.setLong(ordinal, rebasedMicros) + updater.setLong(ordinal, timestampRebaseFunc(micros)) case _: TimestampMicros => (updater, ordinal, value) => val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, micros) + updater.setLong(ordinal, timestampRebaseFunc(micros)) case other => throw new IncompatibleSchemaException( s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index e69c95b797c73..59d54bc433f8b 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat reader.sync(file.start) val stop = file.start + file.length - val rebaseDateTime = DataSourceUtils.needRebaseDateTime( - reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + reader.asInstanceOf[DataFileReader[_]].getMetaString, + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) + val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime) + userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode) new Iterator[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala index 82a568049990e..ac9608c867937 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ // NOTE: This class is instantiated and used on executor side only, no need to be serializable. @@ -43,12 +44,12 @@ private[avro] class AvroOutputWriter( avroSchema: Schema) extends OutputWriter { // Whether to rebase datetimes from Gregorian to Julian calendar in write - private val rebaseDateTime: Boolean = - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE) + private val datetimeRebaseMode = LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)) // The input rows will never be null. private lazy val serializer = - new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime) + new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode) /** * Overrides the couple of methods responsible for generating the output streams / files so @@ -56,7 +57,11 @@ private[avro] class AvroOutputWriter( */ private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = { val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ { - if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None + if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) { + Some(SPARK_LEGACY_DATETIME -> "") + } else { + None + } } new SparkAvroKeyOutputFormat(fileMeta.asJava) { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index dc232168fd241..d6cfbc5ffe758 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ /** @@ -46,17 +47,24 @@ class AvroSerializer( rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean, - rebaseDateTime: Boolean) extends Logging { + datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging { def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) { this(rootCatalystType, rootAvroType, nullable, - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)) + LegacyBehaviorPolicy.withName(SQLConf.get.getConf( + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))) } def serialize(catalystData: Any): Any = { converter.apply(catalystData) } + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite( + datetimeRebaseMode, "Avro") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( + datetimeRebaseMode, "Avro") + private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = rootCatalystType match { @@ -146,24 +154,16 @@ class AvroSerializer( case (BinaryType, BYTES) => (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) - case (DateType, INT) if rebaseDateTime => - (getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal)) - case (DateType, INT) => - (getter, ordinal) => getter.getInt(ordinal) + (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal)) case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. - case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) => - val micros = getter.getLong(ordinal) - val rebasedMicros = rebaseGregorianToJulianMicros(micros) - DateTimeUtils.toMillis(rebasedMicros) case null | _: TimestampMillis => (getter, ordinal) => - DateTimeUtils.toMillis(getter.getLong(ordinal)) - case _: TimestampMicros if rebaseDateTime => (getter, ordinal) => - rebaseGregorianToJulianMicros(getter.getLong(ordinal)) - case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) + DateTimeUtils.toMillis(timestampRebaseFunc(getter.getLong(ordinal))) + case _: TimestampMicros => (getter, ordinal) => + timestampRebaseFunc(getter.getLong(ordinal)) case other => throw new IncompatibleSchemaException( s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}") } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 712aec6acbd56..15918f46a83bb 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory( reader.sync(partitionedFile.start) val stop = partitionedFile.start + partitionedFile.length - val rebaseDateTime = DataSourceUtils.needRebaseDateTime( - reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + reader.asInstanceOf[DataFileReader[_]].getMetaString, + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime) + userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode) val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 64d790bc4acd4..c8a1f670bda9e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) val dataType = SchemaConverters.toSqlType(avroSchema).dataType - val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false) + val deserializer = new AvroDeserializer(avroSchema, dataType) def checkDeserialization(data: GenericData.Record, expected: Any): Unit = { assert(checkResult( diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 3e754f02911dc..a5c1fb15add5c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils -import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.sql._ import org.apache.spark.sql.TestingUDT.IntervalData import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.v2.avro.AvroScan @@ -1538,13 +1539,28 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { val path3_0_rebase = paths(1).getCanonicalPath if (dt == "date") { val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) - df.write.format("avro").save(path3_0) - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException](df.write.format("avro").save(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.format("avro").mode("overwrite").save(path3_0) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.format("avro").save(path3_0_rebase) } - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + + // For Avro files written by Spark 3.0, we know the writer info and don't need the config + // to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + } } else { val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) val avroSchema = @@ -1556,24 +1572,39 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { | {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}} | ] |}""".stripMargin - df.write.format("avro").option("avroSchema", avroSchema).save(path3_0) - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException] { + df.write.format("avro").option("avroSchema", avroSchema).save(path3_0) + } + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_0) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase) } - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + + // For Avro files written by Spark 3.0, we know the writer info and don't need the config + // to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + } } } } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01") - checkReadMixedFiles( - "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456") - checkReadMixedFiles( - "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124") - } + checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124") } test("SPARK-31183: rebasing microseconds timestamps in write") { @@ -1581,7 +1612,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { val nonRebased = "1001-01-07 01:09:05.123456" withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq(tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .write.format("avro") @@ -1589,9 +1620,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } // The file metadata indicates if it needs rebase or not, so we can always get the correct - // result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) } } @@ -1622,7 +1653,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { |}""".stripMargin withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq(tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .write @@ -1632,9 +1663,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } // The file metadata indicates if it needs rebase or not, so we can always get the correct - // result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.schema("ts timestamp").format("avro").load(path), Row(Timestamp.valueOf(rebased))) @@ -1655,7 +1686,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { test("SPARK-31183: rebasing dates in write") { withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq("1001-01-01").toDF("dateS") .select($"dateS".cast("date").as("date")) .write.format("avro") @@ -1663,9 +1694,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } // The file metadata indicates if it needs rebase or not, so we can always get the correct - // result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index a52c3450e83df..ef987931e928a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -456,6 +456,7 @@ class JacksonParser( } } } catch { + case e: SparkUpgradeException => throw e case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) => // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index eb67ff71f6694..e29fa4b37844d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -146,6 +146,8 @@ object RebaseDateTime { -354226, -317702, -244653, -208129, -171605, -141436, -141435, -141434, -141433, -141432, -141431, -141430, -141429, -141428, -141427) + final val lastSwitchGregorianDay: Int = gregJulianDiffSwitchDay.last + // The first days of Common Era (CE) which is mapped to the '0001-01-01' date // in Proleptic Gregorian calendar. private final val gregorianCommonEraStartDay = gregJulianDiffSwitchDay(0) @@ -295,7 +297,7 @@ object RebaseDateTime { } // The switch time point after which all diffs between Gregorian and Julian calendars // across all time zones are zero - private final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap) + final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap) private final val gregorianStartTs = LocalDateTime.of(gregorianStartDate, LocalTime.MIDNIGHT) private final val julianEndTs = LocalDateTime.of( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6c18280ce4d55..f0b714a7a439f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2509,57 +2509,63 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE = - buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled") - .internal() - .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + - "to the hybrid calendar (Julian + Gregorian) in write. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + val LEGACY_PARQUET_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " + + "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the writing if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars.") .version("3.0.0") - .booleanConf - .createWithDefault(false) - - val LEGACY_PARQUET_REBASE_DATETIME_IN_READ = - buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled") - .internal() - .doc("When true, rebase dates/timestamps " + - "from the hybrid calendar to Proleptic Gregorian calendar in read. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + + val LEGACY_PARQUET_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " + + "Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " + + "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the reading if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars. This config is " + + "only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) - val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE = - buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled") + val LEGACY_AVRO_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite") .internal() - .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + - "to the hybrid calendar (Julian + Gregorian) in write. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " + + "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the writing if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars.") .version("3.0.0") - .booleanConf - .createWithDefault(false) - - val LEGACY_AVRO_REBASE_DATETIME_IN_READ = - buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled") - .internal() - .doc("When true, rebase dates/timestamps " + - "from the hybrid calendar to Proleptic Gregorian calendar in read. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + + val LEGACY_AVRO_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " + + "Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " + + "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the reading if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars. This config is " + + "only effective if the writer info (like Spark, Hive) of the Avro files is unknown.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) /** * Holds information about keys that have been deprecated. @@ -3139,10 +3145,6 @@ class SQLConf extends Serializable with Logging { def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED) - def parquetRebaseDateTimeInReadEnabled: Boolean = { - getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) - } - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 11ce11dd7219c..f264281c02f45 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.DataTypes; @@ -102,14 +103,14 @@ public class VectorizedColumnReader { // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final ZoneId convertTz; private static final ZoneId UTC = ZoneOffset.UTC; - private final boolean rebaseDateTime; + private final String datetimeRebaseMode; public VectorizedColumnReader( ColumnDescriptor descriptor, OriginalType originalType, PageReader pageReader, ZoneId convertTz, - boolean rebaseDateTime) throws IOException { + String datetimeRebaseMode) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; @@ -132,7 +133,9 @@ public VectorizedColumnReader( if (totalValueCount == 0) { throw new IOException("totalValueCount == 0"); } - this.rebaseDateTime = rebaseDateTime; + assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) || + "CORRECTED".equals(datetimeRebaseMode); + this.datetimeRebaseMode = datetimeRebaseMode; } /** @@ -156,11 +159,11 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName boolean isSupported = false; switch (typeName) { case INT32: - isSupported = originalType != OriginalType.DATE || !rebaseDateTime; + isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode); break; case INT64: if (originalType == OriginalType.TIMESTAMP_MICROS) { - isSupported = !rebaseDateTime; + isSupported = "CORRECTED".equals(datetimeRebaseMode); } else { isSupported = originalType != OriginalType.TIMESTAMP_MILLIS; } @@ -174,6 +177,30 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName return isSupported; } + static int rebaseDays(int julianDays, final boolean failIfRebase) { + if (failIfRebase) { + if (julianDays < RebaseDateTime.lastSwitchJulianDay()) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + return julianDays; + } + } else { + return RebaseDateTime.rebaseJulianToGregorianDays(julianDays); + } + } + + static long rebaseMicros(long julianMicros, final boolean failIfRebase) { + if (failIfRebase) { + if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + return julianMicros; + } + } else { + return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); + } + } + /** * Reads `total` values from this columnReader into column. */ @@ -283,7 +310,7 @@ private void decodeDictionaryIds( case INT32: if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType()) || - (column.dataType() == DataTypes.DateType && !rebaseDateTime)) { + (column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); @@ -302,11 +329,11 @@ private void decodeDictionaryIds( } } } else if (column.dataType() == DataTypes.DateType) { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i)); - int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays); - column.putInt(i, gregorianDays); + column.putInt(i, rebaseDays(julianDays, failIfRebase)); } } } else { @@ -317,36 +344,37 @@ private void decodeDictionaryIds( case INT64: if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType()) || - (originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) { + (originalType == OriginalType.TIMESTAMP_MICROS && + "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); } } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { - if (rebaseDateTime) { + if ("CORRECTED".equals(datetimeRebaseMode)) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - long julianMicros = DateTimeUtils.fromMillis(julianMillis); - long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); - column.putLong(i, gregorianMicros); + long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis)); } } } else { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis)); + long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + long julianMicros = DateTimeUtils.fromMillis(julianMillis); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } } } else if (originalType == OriginalType.TIMESTAMP_MICROS) { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); - column.putLong(i, gregorianMicros); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } } else { @@ -466,12 +494,13 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw defColumn.readShorts( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.DateType ) { - if (rebaseDateTime) { - defColumn.readIntegersWithRebase( + if ("CORRECTED".equals(datetimeRebaseMode)) { + defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { - defColumn.readIntegers( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + defColumn.readIntegersWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } } else { throw constructConvertNotSupportedException(descriptor, column); @@ -485,27 +514,29 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (originalType == OriginalType.TIMESTAMP_MICROS) { - if (rebaseDateTime) { - defColumn.readLongsWithRebase( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { + if ("CORRECTED".equals(datetimeRebaseMode)) { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + defColumn.readLongsWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { - if (rebaseDateTime) { + if ("CORRECTED".equals(datetimeRebaseMode)) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - long micros = DateTimeUtils.fromMillis(dataColumn.readLong()); - column.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(micros)); + column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); } else { column.putNull(rowId + i); } } } else { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); + long julianMicros = DateTimeUtils.fromMillis(dataColumn.readLong()); + column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase)); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index c9590b97ce9cd..b40cc154d76fe 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -89,9 +89,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private final ZoneId convertTz; /** - * true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar. + * The mode of rebasing date/timestamp from Julian to Proleptic Gregorian calendar. */ - private final boolean rebaseDateTime; + private final String datetimeRebaseMode; /** * columnBatch object that is used for batch decoding. This is created on first use and triggers @@ -122,16 +122,16 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private final MemoryMode MEMORY_MODE; public VectorizedParquetRecordReader( - ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) { + ZoneId convertTz, String datetimeRebaseMode, boolean useOffHeap, int capacity) { this.convertTz = convertTz; - this.rebaseDateTime = rebaseDateTime; + this.datetimeRebaseMode = datetimeRebaseMode; MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.capacity = capacity; } // For test only. public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) { - this(null, false, useOffHeap, capacity); + this(null, "CORRECTED", useOffHeap, capacity); } /** @@ -321,7 +321,7 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), - pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime); + pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 2ed2e11b60c03..eddbf39178e9a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -21,13 +21,14 @@ import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.ParquetDecodingException; + import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.api.Binary; - /** * An implementation of the Parquet PLAIN decoder that supports the vectorized interface. */ @@ -86,7 +87,8 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. @Override - public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + public final void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { int requiredBytes = total * 4; ByteBuffer buffer = getBuffer(requiredBytes); boolean rebase = false; @@ -94,8 +96,12 @@ public final void readIntegersWithRebase(int total, WritableColumnVector c, int rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay(); } if (rebase) { - for (int i = 0; i < total; i += 1) { - c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + } } } else { if (buffer.hasArray()) { @@ -128,7 +134,8 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) { // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. @Override - public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + public final void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { int requiredBytes = total * 8; ByteBuffer buffer = getBuffer(requiredBytes); boolean rebase = false; @@ -136,8 +143,12 @@ public final void readLongsWithRebase(int total, WritableColumnVector c, int row rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs(); } if (rebase) { - for (int i = 0; i < total; i += 1) { - c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + } } } else { if (buffer.hasArray()) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 4d72a33fcf774..24347a4e3a0c5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -26,12 +29,8 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; -import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import java.io.IOException; -import java.nio.ByteBuffer; - /** * A values reader for Parquet's run-length encoded data. This is based off of the version in * parquet-mr with these changes: @@ -211,7 +210,8 @@ public void readIntegersWithRebase( WritableColumnVector c, int rowId, int level, - VectorizedValuesReader data) throws IOException { + VectorizedValuesReader data, + final boolean failIfRebase) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -219,7 +219,7 @@ public void readIntegersWithRebase( switch (mode) { case RLE: if (currentValue == level) { - data.readIntegersWithRebase(n, c, rowId); + data.readIntegersWithRebase(n, c, rowId, failIfRebase); } else { c.putNulls(rowId, n); } @@ -227,8 +227,8 @@ public void readIntegersWithRebase( case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putInt(rowId + i, - RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger())); + int julianDays = data.readInteger(); + c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase)); } else { c.putNull(rowId + i); } @@ -387,7 +387,8 @@ public void readLongsWithRebase( WritableColumnVector c, int rowId, int level, - VectorizedValuesReader data) throws IOException { + VectorizedValuesReader data, + final boolean failIfRebase) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -395,7 +396,7 @@ public void readLongsWithRebase( switch (mode) { case RLE: if (currentValue == level) { - data.readLongsWithRebase(n, c, rowId); + data.readLongsWithRebase(n, c, rowId, failIfRebase); } else { c.putNulls(rowId, n); } @@ -403,8 +404,8 @@ public void readLongsWithRebase( case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putLong(rowId + i, - RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong())); + long julianMicros = data.readLong(); + c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase)); } else { c.putNull(rowId + i); } @@ -584,7 +585,8 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } @Override - public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + public void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { throw new UnsupportedOperationException("only readInts is valid."); } @@ -604,7 +606,8 @@ public void readLongs(int total, WritableColumnVector c, int rowId) { } @Override - public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + public void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { throw new UnsupportedOperationException("only readInts is valid."); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 809ac44cc8272..35db8f235ed60 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -40,9 +40,9 @@ public interface VectorizedValuesReader { void readBooleans(int total, WritableColumnVector c, int rowId); void readBytes(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); - void readIntegersWithRebase(int total, WritableColumnVector c, int rowId); + void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); void readLongs(int total, WritableColumnVector c, int rowId); - void readLongsWithRebase(int total, WritableColumnVector c, int rowId); + void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); void readFloats(int total, WritableColumnVector c, int rowId); void readDoubles(int total, WritableColumnVector c, int rowId); void readBinary(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 45a9b1a808cf3..abb74d8d09ec6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -23,9 +23,12 @@ import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -84,17 +87,107 @@ object DataSourceUtils { case _ => false } - def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = { + def datetimeRebaseMode( + lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - return Some(false) + return LegacyBehaviorPolicy.CORRECTED } - // If there is no version, we return None and let the caller side to decide. + // If there is no version, we return the mode specified by the config. Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to // rebase the datetime values. // Files written by Spark 3.0 and latter may also need the rebase if they were written with - // the "rebaseInWrite" config enabled. - version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null + // the "LEGACY" rebase mode. + if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + + def newRebaseExceptionInRead(format: String): SparkUpgradeException = { + val config = if (format == "Parquet") { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key + } else if (format == "Avro") { + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key + } else { + throw new IllegalStateException("unrecognized format " + format) } + new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " + + s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files may be written by " + + "Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is " + + "different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " + + s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " + + s"the calendar difference during reading. Or set $config to 'CORRECTED' to read the " + + "datetime values as it is.", null) + } + + def newRebaseExceptionInWrite(format: String): SparkUpgradeException = { + val config = if (format == "Parquet") { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key + } else if (format == "Avro") { + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key + } else { + throw new IllegalStateException("unrecognized format " + format) + } + new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or timestamps before " + + s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files may be read by " + + "Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is " + + "different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " + + s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " + + "the calendar difference during writing, to get maximum interoperability. Or set " + + s"$config to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that " + + "the written files will only be read by Spark 3.0+ or other systems that use Proleptic " + + "Gregorian calendar.", null) + } + + def creteDateRebaseFuncInRead( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Int => Int = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => days: Int => + if (days < RebaseDateTime.lastSwitchJulianDay) { + throw DataSourceUtils.newRebaseExceptionInRead(format) + } + days + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays + case LegacyBehaviorPolicy.CORRECTED => identity[Int] + } + + def creteDateRebaseFuncInWrite( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Int => Int = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => days: Int => + if (days < RebaseDateTime.lastSwitchGregorianDay) { + throw DataSourceUtils.newRebaseExceptionInWrite(format) + } + days + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays + case LegacyBehaviorPolicy.CORRECTED => identity[Int] + } + + def creteTimestampRebaseFuncInRead( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Long => Long = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => micros: Long => + if (micros < RebaseDateTime.lastSwitchJulianTs) { + throw DataSourceUtils.newRebaseExceptionInRead(format) + } + micros + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros + case LegacyBehaviorPolicy.CORRECTED => identity[Long] + } + + def creteTimestampRebaseFuncInWrite( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Long => Long = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => micros: Long => + if (micros < RebaseDateTime.lastSwitchGregorianTs) { + throw DataSourceUtils.newRebaseExceptionInWrite(format) + } + micros + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros + case LegacyBehaviorPolicy.CORRECTED => identity[Long] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 542c996a5342d..fc59336d6107c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -21,7 +21,7 @@ import java.io.{FileNotFoundException, IOException} import org.apache.parquet.io.ParquetDecodingException -import org.apache.spark.{Partition => RDDPartition, TaskContext} +import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession @@ -178,7 +178,9 @@ class FileScanRDD( s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" throw new QueryExecutionException(message, e) case e: ParquetDecodingException => - if (e.getMessage.contains("Can not read value at")) { + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { val message = "Encounter error while reading parquet files. " + "One possible cause: Parquet column cannot be converted in the " + "corresponding files. Details: " diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c6d9ddf370e22..71874104fcf4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -300,10 +300,9 @@ class ParquetFileFormat None } - val rebaseDateTime = DataSourceUtils.needRebaseDateTime( - footerFileMetaData.getKeyValueMetaData.get).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = @@ -318,7 +317,7 @@ class ParquetFileFormat if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - rebaseDateTime, + datetimeRebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) @@ -337,7 +336,7 @@ class ParquetFileFormat logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( - convertTz, enableVectorizedReader = false, rebaseDateTime) + convertTz, enableVectorizedReader = false, datetimeRebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 28165e0bbecde..a30d1c26b3b2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -32,6 +32,7 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ /** @@ -53,7 +54,7 @@ import org.apache.spark.sql.types._ class ParquetReadSupport( val convertTz: Option[ZoneId], enableVectorizedReader: Boolean, - rebaseDateTime: Boolean) + datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -61,7 +62,7 @@ class ParquetReadSupport( // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz/rebaseDateTime value directly, // and the values here are ignored. - this(None, enableVectorizedReader = true, rebaseDateTime = false) + this(None, enableVectorizedReader = true, datetimeRebaseMode = LegacyBehaviorPolicy.CORRECTED) } /** @@ -130,7 +131,7 @@ class ParquetReadSupport( ParquetReadSupport.expandUDT(catalystRequestedSchema), new ParquetToSparkSchemaConverter(conf), convertTz, - rebaseDateTime) + datetimeRebaseMode) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index ec037130aa7e9..bb528d548b6ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -23,6 +23,7 @@ import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types.StructType /** @@ -32,19 +33,19 @@ import org.apache.spark.sql.types.StructType * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters * @param convertTz the optional time zone to convert to int96 data - * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian - * calendar + * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian + * calendar */ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, convertTz: Option[ZoneId], - rebaseDateTime: Boolean) + datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends RecordMaterializer[InternalRow] { private val rootConverter = new ParquetRowConverter( - schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime, NoopUpdater) + schemaConverter, parquetSchema, catalystSchema, convertTz, datetimeRebaseMode, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 08fbca2995c04..9d37f17a24fb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -35,8 +35,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -121,8 +122,8 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. * @param convertTz the optional time zone to convert to int96 data - * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian - * calendar + * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian + * calendar * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( @@ -130,7 +131,7 @@ private[parquet] class ParquetRowConverter( parquetType: GroupType, catalystType: StructType, convertTz: Option[ZoneId], - rebaseDateTime: Boolean, + datetimeRebaseMode: LegacyBehaviorPolicy.Value, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -181,6 +182,12 @@ private[parquet] class ParquetRowConverter( */ def currentRecord: InternalRow = currentRow + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead( + datetimeRebaseMode, "Parquet") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( + datetimeRebaseMode, "Parquet") + // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false @@ -275,35 +282,17 @@ private[parquet] class ParquetRowConverter( new ParquetStringConverter(updater) case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => - if (rebaseDateTime) { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - val rebased = rebaseJulianToGregorianMicros(value) - updater.setLong(rebased) - } - } - } else { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - updater.setLong(value) - } + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + updater.setLong(timestampRebaseFunc(value)) } } case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => - if (rebaseDateTime) { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - val micros = DateTimeUtils.fromMillis(value) - val rebased = rebaseJulianToGregorianMicros(micros) - updater.setLong(rebased) - } - } - } else { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - updater.setLong(DateTimeUtils.fromMillis(value)) - } + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + val micros = DateTimeUtils.fromMillis(value) + updater.setLong(timestampRebaseFunc(micros)) } } @@ -328,17 +317,9 @@ private[parquet] class ParquetRowConverter( } case DateType => - if (rebaseDateTime) { - new ParquetPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = { - updater.set(rebaseJulianToGregorianDays(value)) - } - } - } else { - new ParquetPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = { - updater.set(value) - } + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = { + updater.set(dateRebaseFunc(value)) } } @@ -386,7 +367,12 @@ private[parquet] class ParquetRowConverter( } } new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, convertTz, rebaseDateTime, wrappedUpdater) + schemaConverter, + parquetType.asGroupType(), + t, + convertTz, + datetimeRebaseMode, + wrappedUpdater) case t => throw new RuntimeException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index e367b9cc774f3..4e535c0c5ea99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -35,8 +35,9 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ /** @@ -78,9 +79,14 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private val decimalBuffer = new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION)) - // Whether to rebase datetimes from Gregorian to Julian calendar in write - private val rebaseDateTime: Boolean = - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE) + private val datetimeRebaseMode = LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)) + + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite( + datetimeRebaseMode, "Parquet") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( + datetimeRebaseMode, "Parquet") override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) @@ -103,7 +109,13 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val metadata = Map( SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT, ParquetReadSupport.SPARK_METADATA_KEY -> schemaString - ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None) + ) ++ { + if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) { + Some(SPARK_LEGACY_DATETIME -> "") + } else { + None + } + } logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: @@ -152,12 +164,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getShort(ordinal)) - case DateType if rebaseDateTime => + case DateType => (row: SpecializedGetters, ordinal: Int) => - val rebasedDays = rebaseGregorianToJulianDays(row.getInt(ordinal)) - recordConsumer.addInteger(rebasedDays) + recordConsumer.addInteger(dateRebaseFunc(row.getInt(ordinal))) - case IntegerType | DateType => + case IntegerType => (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getInt(ordinal)) @@ -187,24 +198,15 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime => - (row: SpecializedGetters, ordinal: Int) => - val rebasedMicros = rebaseGregorianToJulianMicros(row.getLong(ordinal)) - recordConsumer.addLong(rebasedMicros) - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addLong(row.getLong(ordinal)) - - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime => - (row: SpecializedGetters, ordinal: Int) => - val rebasedMicros = rebaseGregorianToJulianMicros(row.getLong(ordinal)) - val millis = DateTimeUtils.toMillis(rebasedMicros) - recordConsumer.addLong(millis) + val micros = row.getLong(ordinal) + recordConsumer.addLong(timestampRebaseFunc(micros)) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => (row: SpecializedGetters, ordinal: Int) => - val millis = DateTimeUtils.toMillis(row.getLong(ordinal)) + val micros = row.getLong(ordinal) + val millis = DateTimeUtils.toMillis(timestampRebaseFunc(micros)) recordConsumer.addLong(millis) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 1925fa1796d48..3b482b0c8ab62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{AtomicType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -116,8 +117,9 @@ case class ParquetPartitionReaderFactory( private def buildReaderBase[T]( file: PartitionedFile, buildReaderFunc: ( - ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate], - Option[ZoneId], Boolean) => RecordReader[Void, T]): RecordReader[Void, T] = { + ParquetInputSplit, InternalRow, TaskAttemptContextImpl, + Option[FilterPredicate], Option[ZoneId], + LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value val filePath = new Path(new URI(file.filePath)) @@ -169,12 +171,11 @@ case class ParquetPartitionReaderFactory( if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - val rebaseDatetime = DataSourceUtils.needRebaseDateTime( - footerFileMetaData.getKeyValueMetaData.get).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) val reader = buildReaderFunc( - split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, rebaseDatetime) + split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, datetimeRebaseMode) reader.initialize(split, hadoopAttemptContext) reader } @@ -189,12 +190,12 @@ case class ParquetPartitionReaderFactory( hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], convertTz: Option[ZoneId], - needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = { + datetimeRebaseMode: LegacyBehaviorPolicy.Value): RecordReader[Void, InternalRow] = { logDebug(s"Falling back to parquet-mr") val taskContext = Option(TaskContext.get()) // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( - convertTz, enableVectorizedReader = false, needDateTimeRebase) + convertTz, enableVectorizedReader = false, datetimeRebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -220,11 +221,11 @@ case class ParquetPartitionReaderFactory( hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], convertTz: Option[ZoneId], - rebaseDatetime: Boolean): VectorizedParquetRecordReader = { + datetimeRebaseMode: LegacyBehaviorPolicy.Value): VectorizedParquetRecordReader = { val taskContext = Option(TaskContext.get()) val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - rebaseDatetime, + datetimeRebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala index aa47d36fe8c87..d6167f98b5a51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType object DateTime extends Enumeration { @@ -161,9 +162,10 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { Seq(true, false).foreach { modernDates => Seq(false, true).foreach { rebase => benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ => + val mode = if (rebase) LEGACY else CORRECTED withSQLConf( SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> getOutputType(dateTime), - SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) { genDF(rowsNum, dateTime, modernDates) .write .mode("overwrite") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index cf2c7c8b2f516..87b4db3fe087a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -41,7 +41,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, SparkUpgradeException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -892,41 +893,67 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val path3_0_rebase = paths(1).getCanonicalPath if (dt == "date") { val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) - df.write.parquet(path3_0) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException](df.write.parquet(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.parquet(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.mode("overwrite").parquet(path3_0) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + } } else { val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) { - df.write.parquet(path3_0) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException](df.write.parquet(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.parquet(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.mode("overwrite").parquet(path3_0) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } } - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + } } } } Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { - checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") - checkReadMixedFiles( - "before_1582_timestamp_micros_v2_4.snappy.parquet", - "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456") - checkReadMixedFiles( - "before_1582_timestamp_millis_v2_4.snappy.parquet", - "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123") - } + checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_timestamp_micros_v2_4.snappy.parquet", + "TIMESTAMP_MICROS", + "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_timestamp_millis_v2_4.snappy.parquet", + "TIMESTAMP_MILLIS", + "1001-01-01 01:02:03.123") // INT96 is a legacy timestamp format and we always rebase the seconds for it. checkAnswer(readResourceParquetFile( @@ -948,7 +975,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) { withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq.tabulate(N)(_ => tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .repartition(1) @@ -960,10 +987,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { // The file metadata indicates if it needs rebase or not, so we can always get the - // correct result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => + // correct result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => withSQLConf( - SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.parquet(path), Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) @@ -991,7 +1018,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { dictionaryEncoding => withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS") .select($"dateS".cast("date").as("date")) .repartition(1) @@ -1002,10 +1029,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - // The file metadata indicates if it needs rebase or not, so we can always get - // the correct result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // The file metadata indicates if it needs rebase or not, so we can always get the + // correct result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.parquet(path), Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 42b6862907d8d..cbea74103343e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -151,7 +152,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes Seq(false) } java8ApiConfValues.foreach { java8Api => - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString, + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { val dataGenerator = RandomDataGenerator.forType( dataType = dataType, nullable = true,