Skip to content

Commit

Permalink
[SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data s…
Browse files Browse the repository at this point in the history
…ource is incorrect

### What changes were proposed in this pull request?
#36726 supports TimestampNTZ type in JDBC data source.
But the implement is incorrect.
This PR just modify a test case and it will be failed !
The test case show below.
```
  test("SPARK-39339: TimestampNTZType with different local time zones") {
    val tableName = "timestamp_ntz_diff_tz_support_table"

    DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
      DateTimeTestUtils.withDefaultTimeZone(zoneId) {
        Seq(
          "1972-07-04 03:30:00",
          "2019-01-20 12:00:00.502",
          "2019-01-20T00:00:00.123456",
          "1500-01-20T00:00:00.123456"
        ).foreach { case datetime =>
          val df = spark.sql(s"select timestamp_ntz '$datetime'")
          df.write.format("jdbc")
            .mode("overwrite")
            .option("url", urlWithUserAndPass)
            .option("dbtable", tableName)
            .save()

          DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
            DateTimeTestUtils.withDefaultTimeZone(zoneId) {
              val res = spark.read.format("jdbc")
                .option("inferTimestampNTZType", "true")
                .option("url", urlWithUserAndPass)
                .option("dbtable", tableName)
                .load()

              checkAnswer(res, df)
            }
          }
        }
      }
    }
  }
```

The test case output failure show below.
```
Results do not match for query:
Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null]
Timezone Env:

== Parsed Logical Plan ==
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]

== Analyzed Logical Plan ==
TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]

== Optimized Logical Plan ==
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]

== Physical Plan ==
*(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] PushedFilters: [], ReadSchema: struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>

== Results ==

== Results ==
!== Correct Answer - 1 ==                                           == Spark Answer - 1 ==
 struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>   struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>
![1500-01-20T00:00:00.123456]                                       [1500-01-20T00:16:08.123456]

ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243)
org.scalatest.exceptions.TestFailedException:
```

### Why are the changes needed?
Fix an implement bug.
The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`.
`toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New test case.

Closes #37013 from beliefer/SPARK-39339_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
beliefer authored and gengliangwang committed Jun 30, 2022
1 parent a39fc87 commit ede4b7d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,19 @@ object DateTimeUtils {
* @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
* @return A `java.sql.Timestamp` from number of micros since epoch.
*/
def toJavaTimestamp(micros: Long): Timestamp = {
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND)
def toJavaTimestamp(micros: Long): Timestamp =
toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros))

/**
* Converts microseconds since the epoch to an instance of `java.sql.Timestamp`.
*
* @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
* @return A `java.sql.Timestamp` from number of micros since epoch.
*/
def toJavaTimestampNoRebase(micros: Long): Timestamp = {
val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
val ts = new Timestamp(seconds * MILLIS_PER_SECOND)
val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
ts.setNanos(nanos.toInt)
ts
}
Expand All @@ -186,10 +194,18 @@ object DateTimeUtils {
* Gregorian calendars.
* @return The number of micros since epoch from `java.sql.Timestamp`.
*/
def fromJavaTimestamp(t: Timestamp): Long = {
val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
rebaseJulianToGregorianMicros(micros)
}
def fromJavaTimestamp(t: Timestamp): Long =
rebaseJulianToGregorianMicros(fromJavaTimestampNoRebase(t))

/**
* Converts an instance of `java.sql.Timestamp` to the number of microseconds since
* 1970-01-01T00:00:00.000000Z.
*
* @param t an instance of `java.sql.Timestamp`.
* @return The number of micros since epoch from `java.sql.Timestamp`.
*/
def fromJavaTimestampNoRebase(t: Timestamp): Long =
millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS

/**
* Converts an Java object to microseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
Expand Down Expand Up @@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

case TimestampType | TimestampNTZType =>
case TimestampType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
Expand All @@ -482,6 +482,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
row.update(pos, null)
}

case TimestampNTZType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t))
} else {
row.update(pos, null)
}

case BinaryType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
Expand Down Expand Up @@ -594,16 +603,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
}

case TimestampNTZType =>
if (conf.datetimeJava8ApiEnabled) {
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
} else {
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(
pos + 1,
toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)))
)
}
(stmt: PreparedStatement, row: Row, pos: Int) =>
val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))
stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros))

case DateType =>
if (conf.datetimeJava8ApiEnabled) {
Expand Down
18 changes: 11 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1950,13 +1950,17 @@ class JDBCSuite extends QueryTest
.option("dbtable", tableName)
.save()

val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, df)
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, df)
}
}
}
}
}
Expand Down

0 comments on commit ede4b7d

Please sign in to comment.