Skip to content

Commit

Permalink
[UNIFORM] Fix converting non-ISO timestamp partition values to Iceberg (
Browse files Browse the repository at this point in the history
#4012)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X ] Spark (Uniform)
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [] Other 

## Description

This change fixes support for timestamp partition values in Uniform
specifically when timestamp partition values which are not UTC instants
are stored in the Delta log (Delta protocol supports both TS partition
values where there is no timezone and ISO-8601 instant timestamps).

In the fix these partition values will be interpreted in the system time
of the system doing the metadata conversion and then UTC adjusted for
the Iceberg metadata translation. Previously the fix assumed UTC
instants but for compatibility reasons, we do want to be able to support
the older form of the metadata for Iceberg conversion.

Example:

Prior to this fix, the code would fail with a parsing exception for the
non-instant case since a string like "2021-01-01 10:00:00.123" would
fail to be parsed by Instant.parse(str) since it's not a valid instant.

After this fix: "2021-01-01 10:00:00.123", when conversion is run on a
Spark cluster with UTC-8 session timezone, will be interpreted as
""2021-01-01 10:00:00.123UTC-08" and then converted to "2021-01-01
16:00:00.123UTC", which will be stored as microseconds since epoch in
Iceberg metadata.

## How was this patch tested?
Added unit tests

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
amogh-jahagirdar authored Jan 6, 2025
1 parent 8a4cfd4 commit acfb8df
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.icebergShaded

import java.nio.ByteBuffer
import java.time.Instant
import java.time.format.DateTimeParseException

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand All @@ -26,6 +27,8 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfig, DeltaConfigs
import org.apache.spark.sql.delta.DeltaConfigs.parseCalendarInterval
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.PartitionUtils.{timestampPartitionPattern, utcFormatter}
import org.apache.spark.sql.delta.util.TimestampFormatter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
Expand Down Expand Up @@ -207,6 +210,9 @@ object IcebergTransactionUtils
builder
}

private lazy val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)

/**
* Follows deserialization as specified here
* https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Partition-Value-Serialization
Expand Down Expand Up @@ -235,13 +241,25 @@ object IcebergTransactionUtils
case _: TimestampNTZType =>
java.sql.Timestamp.valueOf(partitionVal).getNanos/1000.asInstanceOf[Long]
case _: TimestampType =>
Instant.parse(partitionVal).getNano/1000.asInstanceOf[Long]
try {
getMicrosSinceEpoch(partitionVal)
} catch {
case _: DateTimeParseException =>
// In case of non-ISO timestamps, parse and interpret the timestamp as system time
// and then convert to UTC
val utcInstant = utcFormatter.format(timestampFormatter.parse(partitionVal))
getMicrosSinceEpoch(utcInstant)
}
case _ =>
throw DeltaErrors.universalFormatConversionFailedException(
version, "iceberg", "Unexpected partition data type " + elemType)
}
}

private def getMicrosSinceEpoch(instant: String): Long = {
Instant.parse(instant).getNano/1000.asInstanceOf[Long]
}

private def getMetricsForIcebergDataFile(
statsParser: String => InternalRow,
stats: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class DelayedCommitProtocol(
// since there's no guarantee the stats will exist.
@transient val addedStatuses = new ArrayBuffer[AddFile]

val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]"

// Constants for CDC partition manipulation. Used only in newTaskTempFile(), but we define them
// here to avoid building a new redundant regex for every file.
protected val cdcPartitionFalse = s"${CDC_PARTITION_COL}=false"
Expand Down Expand Up @@ -145,7 +143,7 @@ class DelayedCommitProtocol(

val dateFormatter = DateFormatter()
val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)
TimestampFormatter(PartitionUtils.timestampPartitionPattern, java.util.TimeZone.getDefault)

/**
* ToDo: Remove the use of this PartitionUtils API with type inference logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object PartitionSpec {

private[delta] object PartitionUtils {

lazy val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
lazy val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]"
lazy val utcFormatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSSz", ZoneId.of("Z"))

case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.TimeZone
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.DeltaInsertIntoTableSuiteShims._
import org.apache.spark.sql.delta.DeltaTestUtils.withTimeZone
import org.apache.spark.sql.delta.schema.InvariantViolationException
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -1467,16 +1468,6 @@ abstract class DeltaInsertIntoTests(
}
}

private def withTimeZone(zone: String)(f: => Unit): Unit = {
val currentDefault = TimeZone.getDefault
try {
TimeZone.setDefault(TimeZone.getTimeZone(zone))
f
} finally {
TimeZone.setDefault(currentDefault)
}
}

// This behavior is specific to Delta
testQuietly("insertInto: schema enforcement") {
val t1 = "tbl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.io.{BufferedReader, File, InputStreamReader}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale
import java.util.{Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -451,6 +451,16 @@ object DeltaTestUtils extends DeltaTestUtilsBase {
def fulfillsVersionRequirements(actual: Protocol, requirement: Protocol): Boolean =
lteq(requirement, actual)
}

def withTimeZone(zone: String)(f: => Unit): Unit = {
val currentDefault = TimeZone.getDefault
try {
TimeZone.setDefault(TimeZone.getTimeZone(zone))
f
} finally {
TimeZone.setDefault(currentDefault)
}
}
}

trait DeltaTestUtilsForTempViews
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package org.apache.spark.sql.delta.uniform

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.DeltaTestUtils.withTimeZone
import org.apache.spark.sql.delta.sources.DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types._

abstract class UniFormE2EIcebergSuiteBase extends UniFormE2ETest {
Expand Down Expand Up @@ -108,6 +113,38 @@ abstract class UniFormE2EIcebergSuiteBase extends UniFormE2ETest {
}
}

test("Insert Partitioned Table - UTC Adjustment for Non-ISO Timestamp Partition values") {
withTable(testTableName) {
withTimeZone("GMT-8") {
withSQLConf(UTC_TIMESTAMP_PARTITION_VALUES.key -> "false") {
write(
s"""CREATE TABLE $testTableName (id int, ts timestamp)
| USING DELTA
| PARTITIONED BY (ts)
| TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
write(s"INSERT INTO $testTableName" +
s" VALUES (1, timestamp'2021-06-30 00:00:00.123456')")

// Verify partition values in Delta Log
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName))
val partitionColName = deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
val partitionValues = deltaLog.update().allFiles.head.partitionValues
assert(partitionValues === Map(partitionColName -> "2021-06-30 00:00:00.123456"))

// Verify against Delta read and Iceberg read
val verificationQuery = s"SELECT id FROM $testTableName " +
s"where ts=TIMESTAMP'2021-06-30 08:00:00.123456UTC'"
checkAnswer(spark.sql(verificationQuery), Seq(Row(1)))
checkAnswer(createReaderSparkSession.sql(verificationQuery), Seq(Row(1)))
}
}
}
}

test("CIUD") {
withTable(testTableName) {
write(
Expand Down

0 comments on commit acfb8df

Please sign in to comment.