Skip to content

Commit

Permalink
[Protocol, Spark] UTC normalize timestamp partition values (#3378)
Browse files Browse the repository at this point in the history
## Description

Currently, in the Delta Protocol, timestamps are not stored with their
time zone. This leads to unexpected behavior when querying across
systems with different timezones configured (e.g. different spark
sessions for instance). For instance in Spark, the timestamp value will
be adjusted to spark session time zone and written to the delta log
partition values without TZ. If someone were to query the same
"timestamp" from a different session timezone, the same time zone value
it can fail to surface results due to partition pruning.

What this change proposes to the delta lake protocol is to allow
timestamp partition values to be adjusted to UTC and explicitly stored
in partition values with a UTC suffix. The original approach is still
supported for compatibility but it is recommended for newer writers to
write with UTC suffix.

This is also important for Iceberg Uniform conversion because Iceberg
timestamps must be UTC adjusted. Now we have a well defined format for
UTC in delta, we can convert string partition values to Iceberg longs to
make Uniform conversion succeed.

This change updates the Spark-Delta integration to write out the UTC
adjusted values for timestamp types.

This also addresses an issue of microsecond partitions where previously
microsecond partitioning (not recommended but technically allowed) would
not work and be truncated to seconds.

## How was this patch tested?

Added unit tests for the following cases:

1.) UTC timestamp partition values round trip across different session
TZ
2.) A delta log with a mix of Non-UTC and UTC partition values round
trip across the same session TZ
3.) Timestamp No Timezone round trips across timezones (kind of a
tautology but important to make sure that the timestamp_ntz does not get
written with UTC timestamp unintentionally)
4.) Timestamp round trips across same session time zone: UTC normalized
5.) Timestamp round trips across same session time zone: session time
normalized (this case worked before this change, so it's important that
it keeps working after this change)

Mix of microsecond/second level precision and dates before epoch (to
test if everything works with negative)

## Does this PR introduce _any_ user-facing changes?
Yes in the sense that new timestamp partition values will be the
normalized UTC values.
  • Loading branch information
amogh-jahagirdar authored Aug 20, 2024
1 parent 8f1b297 commit e213023
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 27 deletions.
9 changes: 6 additions & 3 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -1791,13 +1791,16 @@ Type | Serialization Format
string | No translation required
numeric types | The string representation of the number
date | Encoded as `{year}-{month}-{day}`. For example, `1970-01-01`
timestamp | Encoded as `{year}-{month}-{day} {hour}:{minute}:{second}` or `{year}-{month}-{day} {hour}:{minute}:{second}.{microsecond}` For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456`
timestamp | Encoded as `{year}-{month}-{day} {hour}:{minute}:{second}` or `{year}-{month}-{day} {hour}:{minute}:{second}.{microsecond}`. For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456`. Timestamps may also be encoded as an ISO8601 formatted timestamp adjusted to UTC timestamp such as `1970-01-01T00:00:00.123456Z`
timestamp without timezone | Encoded as `{year}-{month}-{day} {hour}:{minute}:{second}` or `{year}-{month}-{day} {hour}:{minute}:{second}.{microsecond}` For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456` To use this type, a table must support a feature `timestampNtz`. See section [Timestamp without timezone (TimestampNtz)](#timestamp-without-timezone-timestampNtz) for more information.
boolean | Encoded as the string "true" or "false"
binary | Encoded as a string of escaped binary values. For example, `"\u0001\u0002\u0003"`

Note: A `timestamp` value in a partition value doesn't store the time zone due to historical reasons.
It means its behavior looks similar to `timestamp without time zone` when it is used in a partition column.
Note: A timestamp value in a partition value may be stored in one of the following ways:
1. Without a timezone, where the timestamp should be interpreted using the time zone of the system which wrote to the table.
2. Adjusted to UTC and stored in ISO8601 format.

It is highly recommended that modern writers adjust the timestamp to UTC and store the timestamp in ISO8601 format as outlined in 2.

## Schema Serialization Format

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.delta.files.DeltaFileFormatWriter.PartitionedTaskAttemptContextImpl
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.{DataType, StringType, TimestampType}

/**
* Writes out the files to `path` and returns a list of them in `addedStatuses`. Includes
Expand Down Expand Up @@ -67,7 +69,7 @@ class DelayedCommitProtocol(
// since there's no guarantee the stats will exist.
@transient val addedStatuses = new ArrayBuffer[AddFile]

val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]"

// Constants for CDC partition manipulation. Used only in newTaskTempFile(), but we define them
// here to avoid building a new redundant regex for every file.
Expand Down Expand Up @@ -123,23 +125,48 @@ class DelayedCommitProtocol(
}
}

protected def parsePartitions(dir: String): Map[String, String] = {
// TODO: timezones?
protected def parsePartitions(
dir: String,
taskContext: TaskAttemptContext): Map[String, String] = {
// TODO: enable validatePartitionColumns?
val useUtcNormalizedTimestamps = taskContext match {
case _: PartitionedTaskAttemptContextImpl => taskContext.getConfiguration.getBoolean(
DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES.key, true)
case _ => false
}

val partitionColumnToDataType: Map[String, DataType] =
taskContext.asInstanceOf[PartitionedTaskAttemptContextImpl]
.partitionColToDataType
.filter(partitionCol => partitionCol._2 == TimestampType)

val dateFormatter = DateFormatter()
val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)

/**
* ToDo: Remove the use of this PartitionUtils API with type inference logic
* since the types are already known from the Delta metadata!
*
* Currently types are passed to the PartitionUtils.parsePartition API to facilitate
* timestamp conversion to UTC. In all other cases, the type is just inferred as a String.
* Note: the passed in timestampFormatter and timezone detail
* is used for parsing from the string timestamp.
* If utc normalization is enabled the parsed partition value will be adjusted to UTC
* and output in iso8601 format.
*/
val parsedPartition =
PartitionUtils
.parsePartition(
new Path(dir),
typeInference = false,
Set.empty,
Map.empty,
userSpecifiedDataTypes = partitionColumnToDataType,
validatePartitionColumns = false,
java.util.TimeZone.getDefault,
dateFormatter,
timestampFormatter)
timestampFormatter,
useUtcNormalizedTimestamps)
._1
.get
parsedPartition
Expand All @@ -164,7 +191,8 @@ class DelayedCommitProtocol(
*/
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])
val partitionValues = dir.map(dir => parsePartitions(dir, taskContext))
.getOrElse(Map.empty[String, String])
val filename = getFileName(taskContext, ext, partitionValues)
val relativePath = randomPrefixLength.map { prefixLength =>
DeltaUtils.getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.FileFormatWriter._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
Expand Down Expand Up @@ -261,6 +262,8 @@ object DeltaFileFormatWriter extends LoggingShims {

val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
val partitionColumnToDataType = description.partitionColumns
.map(attr => (attr.name, attr.dataType)).toMap
sparkSession.sparkContext.runJob(
rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
Expand All @@ -272,7 +275,8 @@ object DeltaFileFormatWriter extends LoggingShims {
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
committer,
iterator = iter,
concurrentOutputWriterSpec = concurrentOutputWriterSpec
concurrentOutputWriterSpec = concurrentOutputWriterSpec,
partitionColumnToDataType
)
},
rddWithNonEmptyPartitions.partitions.indices,
Expand Down Expand Up @@ -377,6 +381,14 @@ object DeltaFileFormatWriter extends LoggingShims {
}
}

class PartitionedTaskAttemptContextImpl(
conf: Configuration,
taskId: TaskAttemptID,
partitionColumnToDataType: Map[String, DataType])
extends TaskAttemptContextImpl(conf, taskId) {
val partitionColToDataType: Map[String, DataType] = partitionColumnToDataType
}

/** Writes data out in a single Spark task. */
private def executeTask(
description: WriteJobDescription,
Expand All @@ -386,7 +398,8 @@ object DeltaFileFormatWriter extends LoggingShims {
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = {
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec],
partitionColumnToDataType: Map[String, DataType]): WriteTaskResult = {

val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
Expand All @@ -402,7 +415,11 @@ object DeltaFileFormatWriter extends LoggingShims {
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
if (partitionColumnToDataType.isEmpty) {
new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
} else {
new PartitionedTaskAttemptContextImpl(hadoopConf, taskAttemptId, partitionColumnToDataType)
}
}

committer.setupTask(taskAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,18 @@ trait DeltaSQLConfBase {
.doc("If true, post-commit hooks will by default throw an exception when they fail.")
.booleanConf
.createWithDefault(Utils.isTesting)

///////////
// UTC TIMESTAMP PARTITION VALUES
///////////////////
val UTC_TIMESTAMP_PARTITION_VALUES = buildConf("write.utcTimestampPartitionValues")
.internal()
.doc(
"""
| If true, write UTC normalized timestamp partition values to Delta Log.
|""".stripMargin)
.booleanConf
.createWithDefault(true)
}

object DeltaSQLConf extends DeltaSQLConfBase
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ package org.apache.spark.sql.delta.util

import java.lang.{Double => JDouble, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.time.ZoneId
import java.util.{Locale, TimeZone}

import scala.collection.mutable
Expand All @@ -48,6 +49,7 @@ import scala.util.Try

import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaErrors}
import org.apache.hadoop.fs.Path
import org.apache.spark.unsafe.types.UTF8String

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -117,7 +119,8 @@ object PartitionSpec {

private[delta] object PartitionUtils {

val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
lazy val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
lazy val utcFormatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSSz", ZoneId.of("Z"))

case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
{
Expand Down Expand Up @@ -279,7 +282,8 @@ private[delta] object PartitionUtils {
validatePartitionColumns: Boolean,
timeZone: TimeZone,
dateFormatter: DateFormatter,
timestampFormatter: TimestampFormatter): (Option[PartitionValues], Option[Path]) = {
timestampFormatter: TimestampFormatter,
useUtcNormalizedTimestamp: Boolean = false): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
Expand All @@ -301,7 +305,8 @@ private[delta] object PartitionUtils {
// Once we get the string, we try to parse it and find the partition column and value.
val maybeColumn =
parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes,
validatePartitionColumns, timeZone, dateFormatter, timestampFormatter)
validatePartitionColumns, timeZone, dateFormatter, timestampFormatter,
useUtcNormalizedTimestamp)
maybeColumn.foreach(columns += _)

// Now, we determine if we should stop.
Expand Down Expand Up @@ -338,7 +343,8 @@ private[delta] object PartitionUtils {
validatePartitionColumns: Boolean,
timeZone: TimeZone,
dateFormatter: DateFormatter,
timestampFormatter: TimestampFormatter): Option[(String, Literal)] = {
timestampFormatter: TimestampFormatter,
useUtcNormalizedTimestamp: Boolean = false): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
Expand All @@ -360,12 +366,25 @@ private[delta] object PartitionUtils {
dateFormatter,
timestampFormatter)
val columnValue = columnValueLiteral.eval()
val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval()
if (validatePartitionColumns && columnValue != null && castedValue == null) {
throw DeltaErrors.partitionColumnCastFailed(
columnValue.toString, dataType.toString, columnName)
if (dataType == DataTypes.TimestampType) {
if (useUtcNormalizedTimestamp) {
Try {
Literal.create(
utcFormatter.format(
timestampFormatter.parse(columnValue.asInstanceOf[UTF8String].toString)),
StringType)
}.getOrElse(columnValueLiteral)
} else {
columnValueLiteral
}
} else {
val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval()
if (validatePartitionColumns && columnValue != null && castedValue == null) {
throw DeltaErrors.partitionColumnCastFailed(
columnValue.toString, dataType.toString, columnName)
}
Literal.create(castedValue, dataType)
}
Literal.create(castedValue, dataType)
} else {
inferPartitionColumnValue(
rawColumnValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ sealed trait TimestampFormatter extends Serializable {

class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
timeZone: ZoneId,
locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
@transient
protected lazy val formatter = getOrCreateFormatter(pattern, locale)

private def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
toInstantWithZoneId(temporalAccessor, timeZone.toZoneId)
toInstantWithZoneId(temporalAccessor, timeZone)
} else {
Instant.from(temporalAccessor)
}
Expand All @@ -85,7 +85,7 @@ class Iso8601TimestampFormatter(

override def format(us: Long): String = {
val instant = DateTimeUtils.microsToInstant(us)
formatter.withZone(timeZone.toZoneId).format(instant)
formatter.withZone(timeZone).format(instant)
}
}

Expand All @@ -98,7 +98,7 @@ class Iso8601TimestampFormatter(
* @param timeZone the time zone in which the formatter parses or format timestamps
*/
class FractionTimestampFormatter(timeZone: TimeZone)
extends Iso8601TimestampFormatter("", timeZone, TimestampFormatter.defaultLocale) {
extends Iso8601TimestampFormatter("", timeZone.toZoneId, TimestampFormatter.defaultLocale) {

@transient
override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter
Expand All @@ -108,8 +108,12 @@ object TimestampFormatter {
val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
val defaultLocale: Locale = Locale.US

def apply(format: String, zoneId: ZoneId): TimestampFormatter = {
new Iso8601TimestampFormatter(format, zoneId, defaultLocale)
}

def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
new Iso8601TimestampFormatter(format, timeZone, locale)
new Iso8601TimestampFormatter(format, timeZone.toZoneId, locale)
}

def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
Expand Down
Loading

0 comments on commit e213023

Please sign in to comment.