From ede4b7da6932c756f26a93eb1006819366fdbb70 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 30 Jun 2022 13:55:04 -0700 Subject: [PATCH] [SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/36726 supports TimestampNTZ type in JDBC data source. But the implement is incorrect. This PR just modify a test case and it will be failed ! The test case show below. ``` test("SPARK-39339: TimestampNTZType with different local time zones") { val tableName = "timestamp_ntz_diff_tz_support_table" DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => DateTimeTestUtils.withDefaultTimeZone(zoneId) { Seq( "1972-07-04 03:30:00", "2019-01-20 12:00:00.502", "2019-01-20T00:00:00.123456", "1500-01-20T00:00:00.123456" ).foreach { case datetime => val df = spark.sql(s"select timestamp_ntz '$datetime'") df.write.format("jdbc") .mode("overwrite") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => DateTimeTestUtils.withDefaultTimeZone(zoneId) { val res = spark.read.format("jdbc") .option("inferTimestampNTZType", "true") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .load() checkAnswer(res, df) } } } } } } ``` The test case output failure show below. ``` Results do not match for query: Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null] Timezone Env: == Parsed Logical Plan == Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Analyzed Logical Plan == TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Optimized Logical Plan == Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] PushedFilters: [], ReadSchema: struct == Results == == Results == !== Correct Answer - 1 == == Spark Answer - 1 == struct struct ![1500-01-20T00:00:00.123456] [1500-01-20T00:16:08.123456] ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243) org.scalatest.exceptions.TestFailedException: ``` ### Why are the changes needed? Fix an implement bug. The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`. `toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test case. Closes #37013 from beliefer/SPARK-39339_followup. Authored-by: Jiaan Geng Signed-off-by: Gengliang Wang --- .../sql/catalyst/util/DateTimeUtils.scala | 32 ++++++++++++++----- .../datasources/jdbc/JdbcUtils.scala | 26 ++++++++------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 18 +++++++---- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index cc61491dc95d7..5045d1479f207 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -158,11 +158,19 @@ object DateTimeUtils { * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. * @return A `java.sql.Timestamp` from number of micros since epoch. */ - def toJavaTimestamp(micros: Long): Timestamp = { - val rebasedMicros = rebaseGregorianToJulianMicros(micros) - val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND) + def toJavaTimestamp(micros: Long): Timestamp = + toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros)) + + /** + * Converts microseconds since the epoch to an instance of `java.sql.Timestamp`. + * + * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. + * @return A `java.sql.Timestamp` from number of micros since epoch. + */ + def toJavaTimestampNoRebase(micros: Long): Timestamp = { + val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) val ts = new Timestamp(seconds * MILLIS_PER_SECOND) - val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS + val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS ts.setNanos(nanos.toInt) ts } @@ -186,10 +194,18 @@ object DateTimeUtils { * Gregorian calendars. * @return The number of micros since epoch from `java.sql.Timestamp`. */ - def fromJavaTimestamp(t: Timestamp): Long = { - val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS - rebaseJulianToGregorianMicros(micros) - } + def fromJavaTimestamp(t: Timestamp): Long = + rebaseJulianToGregorianMicros(fromJavaTimestampNoRebase(t)) + + /** + * Converts an instance of `java.sql.Timestamp` to the number of microseconds since + * 1970-01-01T00:00:00.000000Z. + * + * @param t an instance of `java.sql.Timestamp`. + * @return The number of micros since epoch from `java.sql.Timestamp`. + */ + def fromJavaTimestampNoRebase(t: Timestamp): Long = + millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS /** * Converts an Java object to microseconds. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cc8746ea5c407..fa4c032fcb012 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference @@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - case TimestampType | TimestampNTZType => + case TimestampType => (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { @@ -482,6 +482,15 @@ object JdbcUtils extends Logging with SQLConfHelper { row.update(pos, null) } + case TimestampNTZType => + (rs: ResultSet, row: InternalRow, pos: Int) => + val t = rs.getTimestamp(pos + 1) + if (t != null) { + row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t)) + } else { + row.update(pos, null) + } + case BinaryType => (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) @@ -594,16 +603,9 @@ object JdbcUtils extends Logging with SQLConfHelper { } case TimestampNTZType => - if (conf.datetimeJava8ApiEnabled) { - (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos)))) - } else { - (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setTimestamp( - pos + 1, - toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))) - ) - } + (stmt: PreparedStatement, row: Row, pos: Int) => + val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)) + stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros)) case DateType => if (conf.datetimeJava8ApiEnabled) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 9c28e20429621..494ae6d548784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1950,13 +1950,17 @@ class JDBCSuite extends QueryTest .option("dbtable", tableName) .save() - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") - .option("url", urlWithUserAndPass) - .option("dbtable", tableName) - .load() - - checkAnswer(res, df) + DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => + DateTimeTestUtils.withDefaultTimeZone(zoneId) { + val res = spark.read.format("jdbc") + .option("inferTimestampNTZType", "true") + .option("url", urlWithUserAndPass) + .option("dbtable", tableName) + .load() + + checkAnswer(res, df) + } + } } } }