From c8c878a4166415728f6e940504766a099a2f6744 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN <ueshin@databricks.com> Date: Mon, 15 May 2017 16:52:22 -0700 Subject: [PATCH] [SPARK-20588][SQL] Cache TimeZone instances. ## What changes were proposed in this pull request? Because the method `TimeZone.getTimeZone(String ID)` is synchronized on the TimeZone class, concurrent call of this method will become a bottleneck. This especially happens when casting from string value containing timezone info to timestamp value, which uses `DateTimeUtils.stringToTimestamp()` and gets TimeZone instance on the site. This pr makes a cache of the generated TimeZone instances to avoid the synchronization. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #17933 from ueshin/issues/SPARK-20588. --- .../expressions/datetimeExpressions.scala | 17 ++++++++++------- .../spark/sql/catalyst/json/JSONOptions.scala | 2 +- .../sql/catalyst/optimizer/finishAnalysis.scala | 4 +--- .../spark/sql/catalyst/util/DateTimeUtils.scala | 17 ++++++++++++++--- .../scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../spark/sql/execution/QueryExecution.scala | 3 +-- .../datasources/PartitioningUtils.scala | 2 +- .../execution/datasources/csv/CSVOptions.scala | 2 +- .../execution/streaming/ProgressReporter.scala | 5 +++-- 9 files changed, 34 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index de4c94d12a..43ca2cff58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression { /** Returns a copy of this expression with the specified timeZoneId. */ def withTimeZone(timeZoneId: String): TimeZoneAwareExpression - @transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get) + @transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get) } /** @@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa override def dataType: DataType = IntegerType @transient private lazy val c = { - val c = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC")) c.setFirstDayOfWeek(Calendar.MONDAY) c.setMinimalDaysInFirstWeek(4) c @@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa nullSafeCodeGen(ctx, ev, time => { val cal = classOf[Calendar].getName val c = ctx.freshName("cal") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") ctx.addMutableState(cal, c, s""" - $c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC")); + $c = $cal.getInstance($dtu.getTimeZone("UTC")); $c.setFirstDayOfWeek($cal.MONDAY); $c.setMinimalDaysInFirstWeek(4); """) @@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: Expression) val tzTerm = ctx.freshName("tz") val utcTerm = ctx.freshName("utc") val tzClass = classOf[TimeZone].getName - ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") - ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""") + ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""") val eval = left.genCode(ctx) ev.copy(code = s""" |${eval.code} @@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: Expression) val tzTerm = ctx.freshName("tz") val utcTerm = ctx.freshName("utc") val tzClass = classOf[TimeZone].getName - ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") - ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""") + ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""") val eval = left.genCode(ctx) ev.copy(code = s""" |${eval.code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 23ba5ed4d5..7930515038 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -70,7 +70,7 @@ private[sql] class JSONOptions( val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) - val timeZone: TimeZone = TimeZone.getTimeZone( + val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 89e1dc9e32..af0837e36e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import java.util.TimeZone - import scala.collection.mutable import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { case CurrentDate(Some(timeZoneId)) => currentDates.getOrElseUpdate(timeZoneId, { Literal.create( - DateTimeUtils.millisToDays(timestamp / 1000L, TimeZone.getTimeZone(timeZoneId)), + DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)), DateType) }) case CurrentTimestamp() => currentTime diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 0a3bb514ad..b9a3d986bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} import java.util.{Calendar, Locale, TimeZone} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.{Function => JFunction} import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec @@ -98,6 +100,15 @@ object DateTimeUtils { sdf } + private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] + private val computeTimeZone = new JFunction[String, TimeZone] { + override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId) + } + + def getTimeZone(timeZoneId: String): TimeZone = { + computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) + } + def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = { val sdf = new SimpleDateFormat(formatString, Locale.US) sdf.setTimeZone(timeZone) @@ -407,7 +418,7 @@ object DateTimeUtils { Calendar.getInstance(timeZone) } else { Calendar.getInstance( - TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d")) + getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d")) } c.set(Calendar.MILLISECOND, 0) @@ -1027,7 +1038,7 @@ object DateTimeUtils { * representation in their timezone. */ def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone)) + convertTz(time, TimeZoneGMT, getTimeZone(timeZone)) } /** @@ -1035,7 +1046,7 @@ object DateTimeUtils { * string representation in their timezone. */ def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT) + convertTz(time, getTimeZone(timeZone), TimeZoneGMT) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c75921e867..53773f18ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter import java.sql.{Date, Timestamp} -import java.util.TimeZone import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -249,7 +248,8 @@ class Dataset[T] private[sql]( val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) - lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone) + lazy val timeZone = + DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone) // For array values, replace Seq and Array with square brackets // For cells that are beyond `truncate` characters, replace it with the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8e8210e334..2e05e5d659 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.util.TimeZone import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d)) case (t: Timestamp, TimestampType) => DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), - TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)) + DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)) case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal) case (other, tpe) if primitiveTypes.contains(tpe) => other.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 2d70172487..f61c673baa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -94,7 +94,7 @@ object PartitioningUtils { typeInference: Boolean, basePaths: Set[Path], timeZoneId: String): PartitionSpec = { - parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId)) + parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 62e4c6e4b4..78c16b75ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -117,7 +117,7 @@ class CSVOptions( name.map(CompressionCodecs.getCodecClassName) } - val timeZone: TimeZone = TimeZone.getTimeZone( + val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 693933f95a..a4e4ca8213 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.text.SimpleDateFormat -import java.util.{Date, TimeZone, UUID} +import java.util.{Date, UUID} import scala.collection.mutable import scala.collection.JavaConverters._ @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -82,7 +83,7 @@ trait ProgressReporter extends Logging { private var lastNoDataProgressEventTime = Long.MinValue private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 - timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @volatile protected var currentStatus: StreamingQueryStatus = { -- GitLab