Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow custom timestamp with Spark timezone property #621

Merged
merged 17 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Defaults to `false`. New in 0.11.0.
* `timestampFormat`: Specifies an additional timestamp format that will be tried when parsing values as `TimestampType`
columns. The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to try several formats, including [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT),
including variations with offset timezones or no timezone (defaults to UTC). New in 0.12.0.
including variations with offset timezones or no timezone (defaults to UTC). New in 0.12.0. As of 0.16.0, if a custom format pattern is used without a timezone, the default Spark timezone specified by `spark.sql.session.timeZone` will be used.
* `dateFormat`: Specifies an additional timestamp format that will be tried when parsing values as `DateType`
columns. The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_DATE](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_DATE). New in 0.12.0.
Expand All @@ -83,7 +83,7 @@ When writing files the API accepts several options:
* `compression`: compression codec to use when saving to file. Should be the fully qualified name of a class implementing `org.apache.hadoop.io.compress.CompressionCodec` or one of case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). Defaults to no compression when a codec is not specified.
* `timestampFormat`: Controls the format used to write `TimestampType` format columns.
The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT). New in 0.12.0.
Defaults to [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT). New in 0.12.0. As of 0.16.0, if a custom format pattern is used without a timezone, the default Spark timezone specified by `spark.sql.session.timeZone` will be used.
* `dateFormat`: Controls the format used to write `DateType` format columns.
The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_DATE](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_DATE). New in 0.12.0.
Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/com/databricks/spark/xml/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ class DefaultSource
(options.charset, options.rowTag)
}

val paramsWithTZ =
sqlContext.sparkContext.getConf.getOption("spark.sql.session.timeZone") match {
case Some(tz) => parameters.updated("timezone", tz)
case None => parameters
}

XmlRelation(
() => XmlFile.withCharset(sqlContext.sparkContext, path, charset, rowTag),
Some(path),
parameters,
paramsWithTZ,
schema)(sqlContext)
}

Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/databricks/spark/xml/XmlOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[xml] class XmlOptions(
parameters.getOrElse("wildcardColName", XmlOptions.DEFAULT_WILDCARD_COL_NAME)
val ignoreNamespace = parameters.get("ignoreNamespace").map(_.toBoolean).getOrElse(false)
val timestampFormat = parameters.get("timestampFormat")
val timezone = parameters.get("timezone")
val dateFormat = parameters.get("dateFormat")
}

Expand Down
34 changes: 26 additions & 8 deletions src/main/scala/com/databricks/spark/xml/util/TypeCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ package com.databricks.spark.xml.util

import java.math.BigDecimal
import java.sql.{Date, Timestamp}
import java.text.NumberFormat
import java.time.{LocalDate, ZoneId, ZonedDateTime}
import java.text.{NumberFormat, ParsePosition}
import java.time.{Instant, LocalDate, ZoneId}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.util.Locale

import scala.util.Try
import scala.util.control.Exception._

import org.apache.spark.sql.types._
import com.databricks.spark.xml.XmlOptions

import java.time.temporal.TemporalQueries

/**
* Utility functions for type casting
*/
Expand Down Expand Up @@ -116,11 +116,29 @@ private[xml] object TypeCast {
)

private def parseXmlTimestamp(value: String, options: XmlOptions): Timestamp = {
val formatters = options.timestampFormat.map(DateTimeFormatter.ofPattern).
map(supportedXmlTimestampFormatters :+ _).getOrElse(supportedXmlTimestampFormatters)
formatters.foreach { format =>
supportedXmlTimestampFormatters.foreach { format =>
try {
return Timestamp.from(Instant.from(format.parse(value)))
} catch {
case _: Exception => // continue
}
}
options.timestampFormat.foreach { formatString =>
// Check if there is offset or timezone and apply Spark timeZone if not
// Useful to support Java 8 and Java 11+ as they prioritize zone and offset differently
val hasTemporalInformation = formatString.indexOf("V") +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this - I found that the docs for "withZone" say that it's ignored if the pattern contains timezone info. So it seems like it will just be a default. OK to write a test for that though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I also saw this, but then I encountered some problems between Java 8 and Java 11. We can see in Java 8 here that zone is prioritized over offset, but in Java 9 here, offset is prioritized over zone. I made an example below to see the differences:

  • Java 8: image
  • Java 11: image

So I don't think we can always apply withZone(), and that's why I wanted to use hasTemporalInformation. unfortunately, I haven't been able to think of a cleaner way to do this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's leave it this way for now. If you have a sec, throw in a comment about the Java version

formatString.indexOf("z") +
formatString.indexOf("O") +
formatString.indexOf("X") +
formatString.indexOf("x") +
formatString.indexOf("Z") != (-6)
val format = if (hasTemporalInformation) {
DateTimeFormatter.ofPattern(formatString)
} else {
DateTimeFormatter.ofPattern(formatString).withZone(options.timezone.map(ZoneId.of).orNull)
}
try {
return Timestamp.from(ZonedDateTime.parse(value, format).toInstant)
return Timestamp.from(Instant.from(format.parse(value)))
} catch {
case _: Exception => // continue
}
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/time.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
<author>John Smith</author>
<time>2011-12-03T10:15:30Z</time>
<time2>12-03-2011 10:15:30 PST</time2>
<time3>2011/12/03 06:15:30</time3>
<time4>2011/12/03 16:15:30 +1000</time4>
</book>
53 changes: 51 additions & 2 deletions src/test/scala/com/databricks/spark/xml/XmlSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,13 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
.option("rowTag", "book")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(field("author"), field("time", TimestampType), field("time2", StringType))
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", StringType),
field("time3", StringType),
field("time4", StringType)
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
}
Expand All @@ -1393,11 +1399,54 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
.option("timestampFormat", "MM-dd-yyyy HH:mm:ss z")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(field("author"), field("time", TimestampType), field("time2", TimestampType))
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", TimestampType),
field("time3", StringType),
field("time4", StringType),
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
assert(df.collect().head.getAs[Timestamp](2).getTime === 1322936130000L)
}

test("Test custom timestampFormat without timezone") {
val df = spark.read
.option("rowTag", "book")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", StringType),
field("time3", TimestampType),
field("time4", StringType)
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
assert(df.collect().head.getAs[Timestamp](3).getTime === 1322892930000L)
}

test("Test custom timestampFormat with offset") {
val df = spark.read
.option("rowTag", "book")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss xx")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", StringType),
field("time3", StringType),
field("time4", TimestampType)
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
assert(df.collect().head.getAs[Timestamp](4).getTime === 1322892930000L)
}

test("Test null number type is null not 0.0") {
val schema = buildSchema(
struct("Header",
Expand Down
69 changes: 69 additions & 0 deletions src/test/scala/com/databricks/spark/xml/util/TypeCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,73 @@ final class TypeCastSuite extends AnyFunSuite {
Locale.setDefault(defaultLocale)
}
}

test("Parsing built-in timestamp formatters") {
val options = XmlOptions(Map())
val expectedResult = Timestamp.from(
ZonedDateTime.of(2002, 5, 30, 21, 46, 54, 0, ZoneId.of("UTC"))
.toInstant
)
assert(
TypeCast.castTo("2002-05-30 21:46:54", TimestampType, options) === expectedResult
)
assert(
TypeCast.castTo("2002-05-30T21:46:54", TimestampType, options) === expectedResult
)
assert(
TypeCast.castTo("2002-05-30T21:46:54+00:00", TimestampType, options) === expectedResult
)
assert(
TypeCast.castTo("2002-05-30T21:46:54.0000Z", TimestampType, options) === expectedResult
)
}

test("Custom timestamp format is used to parse correctly") {
var options = XmlOptions(Map("timestampFormat" -> "MM-dd-yyyy HH:mm:ss", "timezone" -> "UTC"))
assert(
TypeCast.castTo("12-03-2011 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss", "timezone" -> "UTC"))
assert(
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss",
"timezone" -> "Asia/Shanghai"))
assert(
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) !==
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss",
"timezone" -> "Asia/Shanghai"))
assert(
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("Asia/Shanghai"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss"))
intercept[IllegalArgumentException](
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)
}
}