Skip to content

Commit

Permalink
[SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception.

One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`.

## How was this patch tested?

The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.

Closes #24195 from MaxGekk/conv-utc-timestamp-deprecate.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
3 people authored and cloud-fan committed Apr 3, 2019
1 parent b8b5acd commit 1d20d13
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 152 deletions.
2 changes: 2 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,7 @@ setMethod("schema_of_csv", signature(x = "characterOrColumn"),
#' @note from_utc_timestamp since 1.5.0
setMethod("from_utc_timestamp", signature(y = "Column", x = "character"),
function(y, x) {
.Deprecated(msg = "from_utc_timestamp is deprecated. See SPARK-25496.")
jc <- callJStatic("org.apache.spark.sql.functions", "from_utc_timestamp", y@jc, x)
column(jc)
})
Expand Down Expand Up @@ -2517,6 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = "character"),
#' @note to_utc_timestamp since 1.5.0
setMethod("to_utc_timestamp", signature(y = "Column", x = "character"),
function(y, x) {
.Deprecated(msg = "to_utc_timestamp is deprecated. See SPARK-25496.")
jc <- callJStatic("org.apache.spark.sql.functions", "to_utc_timestamp", y@jc, x)
column(jc)
})
Expand Down
18 changes: 14 additions & 4 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1905,10 +1905,20 @@ test_that("date functions on a DataFrame", {
df2 <- createDataFrame(l2)
expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24))
expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34))
expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC")))
expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC")))
conf <- callJMethod(sparkSession, "conf")
isUtcTimestampFuncEnabled <- callJMethod(conf, "get", "spark.sql.legacy.utcTimestampFunc.enabled")
callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true")
tryCatch({
# Both from_utc_timestamp and to_utc_timestamp are deprecated as of SPARK-25496
expect_equal(suppressWarnings(collect(select(df2, from_utc_timestamp(df2$b, "JST"))))[, 1],
c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC")))
expect_equal(suppressWarnings(collect(select(df2, to_utc_timestamp(df2$b, "JST"))))[, 1],
c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC")))
},
finally = {
# Reverting the conf back
callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", isUtcTimestampFuncEnabled)
})
expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0)
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,10 @@ def from_utc_timestamp(timestamp, tz):
[Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))]
>>> df.select(from_utc_timestamp(df.ts, df.tz).alias('local_time')).collect()
[Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))]
.. note:: Deprecated in 3.0. See SPARK-25496
"""
warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning)
sc = SparkContext._active_spark_context
if isinstance(tz, Column):
tz = _to_java_column(tz)
Expand Down Expand Up @@ -1340,7 +1343,10 @@ def to_utc_timestamp(timestamp, tz):
[Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))]
>>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect()
[Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))]
.. note:: Deprecated in 3.0. See SPARK-25496
"""
warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning)
sc = SparkContext._active_spark_context
if isinstance(tz, Column):
tz = _to_java_column(tz)
Expand Down Expand Up @@ -3191,9 +3197,13 @@ def _test():
globs['sc'] = sc
globs['spark'] = spark
globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)])

spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true")
(failure_count, test_count) = doctest.testmod(
pyspark.sql.functions, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled")

spark.stop()
if failure_count:
sys.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -1021,6 +1023,11 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S
case class FromUTCTimestamp(left: Expression, right: Expression)
extends BinaryExpression with ImplicitCastInputTypes {

if (!SQLConf.get.utcTimestampFuncEnabled) {
throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." +
s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.")
}

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
override def dataType: DataType = TimestampType
override def prettyName: String = "from_utc_timestamp"
Expand Down Expand Up @@ -1227,6 +1234,11 @@ case class MonthsBetween(
case class ToUTCTimestamp(left: Expression, right: Expression)
extends BinaryExpression with ImplicitCastInputTypes {

if (!SQLConf.get.utcTimestampFuncEnabled) {
throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " +
s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.")
}

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
override def dataType: DataType = TimestampType
override def prettyName: String = "to_utc_timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,12 @@ object SQLConf {
"and java.sql.Date are used for the same purpose.")
.booleanConf
.createWithDefault(false)

val UTC_TIMESTAMP_FUNC_ENABLED = buildConf("spark.sql.legacy.utcTimestampFunc.enabled")
.doc("The configuration property enables the to_utc_timestamp() " +
"and from_utc_timestamp() functions.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down Expand Up @@ -1916,6 +1922,8 @@ class SQLConf extends Serializable with Logging {

def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED)

def utcTimestampFuncEnabled: Boolean = getConf(UTC_TIMESTAMP_FUNC_ENABLED)

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp

import org.apache.log4j.{Appender, AppenderSkeleton, Logger}
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.SparkFunSuite
Expand All @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -189,36 +190,42 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}

test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") {
val length = 5000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))

if (actual != expected) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
val length = 5000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))

if (actual != expected) {
fail(
s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
}

test("SPARK-22226: group splitted expressions into one method per nested class") {
val length = 10000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00")))

if (actual != expected) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
val length = 10000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00")))

if (actual != expected) {
fail(
s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

Expand Down Expand Up @@ -816,21 +818,29 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
NonFoldableLiteral.create(tz, StringType)),
if (expected != null) Timestamp.valueOf(expected) else null)
}
test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
}
val msg = intercept[AnalysisException] {
test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
}.getMessage
assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
}

test("to_utc_timestamp - invalid time zone id") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(
ToUTCTimestamp(
Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(
ToUTCTimestamp(
Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
}
}
}

Expand All @@ -847,19 +857,28 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
NonFoldableLiteral.create(tz, StringType)),
if (expected != null) Timestamp.valueOf(expected) else null)
}
test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
}
val msg = intercept[AnalysisException] {
test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
}.getMessage
assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
}

test("from_utc_timestamp - invalid time zone id") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(
FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
}
}
}
}
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2988,6 +2988,7 @@ object functions {
* @group datetime_funcs
* @since 1.5.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def from_utc_timestamp(ts: Column, tz: String): Column = withExpr {
FromUTCTimestamp(ts.expr, Literal(tz))
}
Expand All @@ -2999,6 +3000,7 @@ object functions {
* @group datetime_funcs
* @since 2.4.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
FromUTCTimestamp(ts.expr, tz.expr)
}
Expand All @@ -3017,6 +3019,7 @@ object functions {
* @group datetime_funcs
* @since 1.5.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def to_utc_timestamp(ts: Column, tz: String): Column = withExpr {
ToUTCTimestamp(ts.expr, Literal(tz))
}
Expand All @@ -3028,6 +3031,7 @@ object functions {
* @group datetime_funcs
* @since 2.4.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
ToUTCTimestamp(ts.expr, tz.expr)
}
Expand Down
Loading

0 comments on commit 1d20d13

Please sign in to comment.