Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31405][SQL][3.0] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files #28526

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String)
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
extends SparkException("You may get a different result due to the upgrading of Spark" +
extends RuntimeException("You may get a different result due to the upgrading of Spark" +
s" $version: $message", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize data in avro format to data in catalyst format.
*/
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
class AvroDeserializer(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {

def this(rootAvroType: Schema, rootCatalystType: DataType) {
this(rootAvroType, rootCatalystType,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
LegacyBehaviorPolicy.withName(
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
}

private lazy val decimalConversions = new DecimalConversion()

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Expand Down Expand Up @@ -96,36 +107,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
val days = value.asInstanceOf[Int]
val rebasedDays = rebaseJulianToGregorianDays(days)
updater.setInt(ordinal, rebasedDays)

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])

case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
updater.setLong(ordinal, micros)
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, micros)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
reader.sync(file.start)
val stop = file.start + file.length

val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))

val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode)

new Iterator[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
Expand All @@ -43,20 +44,24 @@ private[avro] class AvroOutputWriter(
avroSchema: Schema) extends OutputWriter {

// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean =
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))

// The input rows will never be null.
private lazy val serializer =
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode)

/**
* Overrides the couple of methods responsible for generating the output streams / files so
* that the data can be correctly partitioned
*/
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
Some(SPARK_LEGACY_DATETIME -> "")
} else {
None
}
}

new SparkAvroKeyOutputFormat(fileMeta.asJava) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._

/**
Expand All @@ -46,17 +47,24 @@ class AvroSerializer(
rootCatalystType: DataType,
rootAvroType: Schema,
nullable: Boolean,
rebaseDateTime: Boolean) extends Logging {
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {

def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
this(rootCatalystType, rootAvroType, nullable,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
Expand Down Expand Up @@ -146,24 +154,16 @@ class AvroSerializer(
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))

case (DateType, INT) if rebaseDateTime =>
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))

case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))

case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
DateTimeUtils.toMillis(rebasedMicros)
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.toMillis(getter.getLong(ordinal))
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
rebaseGregorianToJulianMicros(getter.getLong(ordinal))
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
DateTimeUtils.toMillis(timestampRebaseFunc(getter.getLong(ordinal)))
case _: TimestampMicros => (getter, ordinal) =>
timestampRebaseFunc(getter.getLong(ordinal))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
reader.sync(partitionedFile.start)
val stop = partitionedFile.start + partitionedFile.length

val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode)

val fileReader = new PartitionReader[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
""".stripMargin
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
val deserializer = new AvroDeserializer(avroSchema, dataType)

def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
assert(checkResult(
Expand Down
Loading