Skip to content

Commit

Permalink
[SPARK-31405][SQL][3.0] Fail by default when reading/writing legacy d…
Browse files Browse the repository at this point in the history
…atetime values from/to Parquet/Avro files

### What changes were proposed in this pull request?

When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not.

### Why are the changes needed?

Rebase or not rebase have different behaviors and we should let users decide it explicitly. In most cases, users won't hit this exception as it only affects ancient datetime values.

### Does this PR introduce _any_ user-facing change?

Yes, now users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config.

### How was this patch tested?

updated tests

Closes #28526 from cloud-fan/backport.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed May 17, 2020
1 parent 3b2d394 commit afe2247
Show file tree
Hide file tree
Showing 27 changed files with 519 additions and 319 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String)
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
extends SparkException("You may get a different result due to the upgrading of Spark" +
extends RuntimeException("You may get a different result due to the upgrading of Spark" +
s" $version: $message", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize data in avro format to data in catalyst format.
*/
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
class AvroDeserializer(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {

def this(rootAvroType: Schema, rootCatalystType: DataType) {
this(rootAvroType, rootCatalystType,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
LegacyBehaviorPolicy.withName(
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
}

private lazy val decimalConversions = new DecimalConversion()

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Expand Down Expand Up @@ -96,36 +107,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
val days = value.asInstanceOf[Int]
val rebasedDays = rebaseJulianToGregorianDays(days)
updater.setInt(ordinal, rebasedDays)

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])

case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
updater.setLong(ordinal, micros)
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, micros)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
reader.sync(file.start)
val stop = file.start + file.length

val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))

val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode)

new Iterator[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
Expand All @@ -43,20 +44,24 @@ private[avro] class AvroOutputWriter(
avroSchema: Schema) extends OutputWriter {

// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean =
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))

// The input rows will never be null.
private lazy val serializer =
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode)

/**
* Overrides the couple of methods responsible for generating the output streams / files so
* that the data can be correctly partitioned
*/
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
Some(SPARK_LEGACY_DATETIME -> "")
} else {
None
}
}

new SparkAvroKeyOutputFormat(fileMeta.asJava) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._

/**
Expand All @@ -46,17 +47,24 @@ class AvroSerializer(
rootCatalystType: DataType,
rootAvroType: Schema,
nullable: Boolean,
rebaseDateTime: Boolean) extends Logging {
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {

def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
this(rootCatalystType, rootAvroType, nullable,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
Expand Down Expand Up @@ -146,24 +154,16 @@ class AvroSerializer(
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))

case (DateType, INT) if rebaseDateTime =>
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))

case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))

case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
DateTimeUtils.toMillis(rebasedMicros)
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.toMillis(getter.getLong(ordinal))
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
rebaseGregorianToJulianMicros(getter.getLong(ordinal))
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
DateTimeUtils.toMillis(timestampRebaseFunc(getter.getLong(ordinal)))
case _: TimestampMicros => (getter, ordinal) =>
timestampRebaseFunc(getter.getLong(ordinal))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
reader.sync(partitionedFile.start)
val stop = partitionedFile.start + partitionedFile.length

val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode)

val fileReader = new PartitionReader[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
""".stripMargin
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
val deserializer = new AvroDeserializer(avroSchema, dataType)

def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
assert(checkResult(
Expand Down
Loading

0 comments on commit afe2247

Please sign in to comment.