Skip to content

Commit

Permalink
[SPARK-20588][SQL] Cache TimeZone instances.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Because the method `TimeZone.getTimeZone(String ID)` is synchronized on the TimeZone class, concurrent call of this method will become a bottleneck.
This especially happens when casting from string value containing timezone info to timestamp value, which uses `DateTimeUtils.stringToTimestamp()` and gets TimeZone instance on the site.

This pr makes a cache of the generated TimeZone instances to avoid the synchronization.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #17933 from ueshin/issues/SPARK-20588.
  • Loading branch information
ueshin authored and gatorsmile committed May 15, 2017
1 parent bbd163d commit c8c878a
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression {
/** Returns a copy of this expression with the specified timeZoneId. */
def withTimeZone(timeZoneId: String): TimeZoneAwareExpression

@transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get)
@transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get)
}

/**
Expand Down Expand Up @@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa
override def dataType: DataType = IntegerType

@transient private lazy val c = {
val c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
c.setFirstDayOfWeek(Calendar.MONDAY)
c.setMinimalDaysInFirstWeek(4)
c
Expand All @@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa
nullSafeCodeGen(ctx, ev, time => {
val cal = classOf[Calendar].getName
val c = ctx.freshName("cal")
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
ctx.addMutableState(cal, c,
s"""
$c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC"));
$c = $cal.getInstance($dtu.getTimeZone("UTC"));
$c.setFirstDayOfWeek($cal.MONDAY);
$c.setMinimalDaysInFirstWeek(4);
""")
Expand Down Expand Up @@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: Expression)
val tzTerm = ctx.freshName("tz")
val utcTerm = ctx.freshName("utc")
val tzClass = classOf[TimeZone].getName
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
val eval = left.genCode(ctx)
ev.copy(code = s"""
|${eval.code}
Expand Down Expand Up @@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
val tzTerm = ctx.freshName("tz")
val utcTerm = ctx.freshName("utc")
val tzClass = classOf[TimeZone].getName
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
val eval = left.genCode(ctx)
ev.copy(code = s"""
|${eval.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[sql] class JSONOptions(
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

val timeZone: TimeZone = TimeZone.getTimeZone(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.optimizer

import java.util.TimeZone

import scala.collection.mutable

import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand Down Expand Up @@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
case CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
Literal.create(
DateTimeUtils.millisToDays(timestamp / 1000L, TimeZone.getTimeZone(timeZoneId)),
DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)),
DateType)
})
case CurrentTimestamp() => currentTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec
Expand Down Expand Up @@ -98,6 +100,15 @@ object DateTimeUtils {
sdf
}

private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
private val computeTimeZone = new JFunction[String, TimeZone] {
override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
}

def getTimeZone(timeZoneId: String): TimeZone = {
computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
}

def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
val sdf = new SimpleDateFormat(formatString, Locale.US)
sdf.setTimeZone(timeZone)
Expand Down Expand Up @@ -407,7 +418,7 @@ object DateTimeUtils {
Calendar.getInstance(timeZone)
} else {
Calendar.getInstance(
TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
}
c.set(Calendar.MILLISECOND, 0)

Expand Down Expand Up @@ -1027,15 +1038,15 @@ object DateTimeUtils {
* representation in their timezone.
*/
def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone))
convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
}

/**
* Returns a utc timestamp from a given timestamp from a given timezone, with the same
* string representation in their timezone.
*/
def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT)
convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql

import java.io.CharArrayWriter
import java.sql.{Date, Timestamp}
import java.util.TimeZone

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand Down Expand Up @@ -249,7 +248,8 @@ class Dataset[T] private[sql](
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)

lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
lazy val timeZone =
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)

// For array values, replace Seq and Array with square brackets
// For cells that are beyond `truncate` characters, replace it with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.TimeZone

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand Down Expand Up @@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
case (t: Timestamp, TimestampType) =>
DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (other, tpe) if primitiveTypes.contains(tpe) => other.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object PartitioningUtils {
typeInference: Boolean,
basePaths: Set[Path],
timeZoneId: String): PartitionSpec = {
parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId))
parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId))
}

private[datasources] def parsePartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class CSVOptions(
name.map(CompressionCodecs.getCodecClassName)
}

val timeZone: TimeZone = TimeZone.getTimeZone(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.spark.sql.execution.streaming

import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}
import java.util.{Date, UUID}

import scala.collection.mutable
import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
Expand Down Expand Up @@ -82,7 +83,7 @@ trait ProgressReporter extends Logging {
private var lastNoDataProgressEventTime = Long.MinValue

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))

@volatile
protected var currentStatus: StreamingQueryStatus = {
Expand Down

0 comments on commit c8c878a

Please sign in to comment.