diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index de4c94d12abdc787d1941c812189866ef7c139a3..43ca2cff588254de485f48d01a01df1bebb4361e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression {
   /** Returns a copy of this expression with the specified timeZoneId. */
   def withTimeZone(timeZoneId: String): TimeZoneAwareExpression
 
-  @transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get)
+  @transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get)
 }
 
 /**
@@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa
   override def dataType: DataType = IntegerType
 
   @transient private lazy val c = {
-    val c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+    val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
     c.setFirstDayOfWeek(Calendar.MONDAY)
     c.setMinimalDaysInFirstWeek(4)
     c
@@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa
     nullSafeCodeGen(ctx, ev, time => {
       val cal = classOf[Calendar].getName
       val c = ctx.freshName("cal")
+      val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
       ctx.addMutableState(cal, c,
         s"""
-          $c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC"));
+          $c = $cal.getInstance($dtu.getTimeZone("UTC"));
           $c.setFirstDayOfWeek($cal.MONDAY);
           $c.setMinimalDaysInFirstWeek(4);
          """)
@@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: Expression)
         val tzTerm = ctx.freshName("tz")
         val utcTerm = ctx.freshName("utc")
         val tzClass = classOf[TimeZone].getName
-        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
-        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
+        val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
+        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
         val eval = left.genCode(ctx)
         ev.copy(code = s"""
            |${eval.code}
@@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
         val tzTerm = ctx.freshName("tz")
         val utcTerm = ctx.freshName("utc")
         val tzClass = classOf[TimeZone].getName
-        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
-        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
+        val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
+        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
         val eval = left.genCode(ctx)
         ev.copy(code = s"""
            |${eval.code}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 23ba5ed4d50dc5080ba3165ce181f37666ac2494..7930515038355bd6955fe850d07bb0c250313958 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -70,7 +70,7 @@ private[sql] class JSONOptions(
   val columnNameOfCorruptRecord =
     parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
 
-  val timeZone: TimeZone = TimeZone.getTimeZone(
+  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 89e1dc9e322e05e8a949591ad25f39c26d63d474..af0837e36e8adf08d88c7146825e41e88c0dad81 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.util.TimeZone
-
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
       case CurrentDate(Some(timeZoneId)) =>
         currentDates.getOrElseUpdate(timeZoneId, {
           Literal.create(
-            DateTimeUtils.millisToDays(timestamp / 1000L, TimeZone.getTimeZone(timeZoneId)),
+            DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)),
             DateType)
         })
       case CurrentTimestamp() => currentTime
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 0a3bb514ad214e0e26a59aa1a4e5a002a2925777..b9a3d986bc58aa18fdbd3370862d41ed293420ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
 import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
 import java.util.{Calendar, Locale, TimeZone}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.{Function => JFunction}
 import javax.xml.bind.DatatypeConverter
 
 import scala.annotation.tailrec
@@ -98,6 +100,15 @@ object DateTimeUtils {
     sdf
   }
 
+  private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
+  private val computeTimeZone = new JFunction[String, TimeZone] {
+    override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
+  }
+
+  def getTimeZone(timeZoneId: String): TimeZone = {
+    computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
+  }
+
   def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
     val sdf = new SimpleDateFormat(formatString, Locale.US)
     sdf.setTimeZone(timeZone)
@@ -407,7 +418,7 @@ object DateTimeUtils {
       Calendar.getInstance(timeZone)
     } else {
       Calendar.getInstance(
-        TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
+        getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
     }
     c.set(Calendar.MILLISECOND, 0)
 
@@ -1027,7 +1038,7 @@ object DateTimeUtils {
    * representation in their timezone.
    */
   def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone))
+    convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
   }
 
   /**
@@ -1035,7 +1046,7 @@ object DateTimeUtils {
    * string representation in their timezone.
    */
   def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT)
+    convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c75921e867f647e5d9faca92bcfdb12fc603b355..53773f18ce553831756a6840d9d1f278764db2b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
 
 import java.io.CharArrayWriter
 import java.sql.{Date, Timestamp}
-import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -249,7 +248,8 @@ class Dataset[T] private[sql](
     val hasMoreData = takeResult.length > numRows
     val data = takeResult.take(numRows)
 
-    lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
+    lazy val timeZone =
+      DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
 
     // For array values, replace Seq and Array with square brackets
     // For cells that are beyond `truncate` characters, replace it with the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 8e8210e334a1dd99bc6e87828c05e99461d4012d..2e05e5d65923c6d6f8e35606c6c52518016fd77b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.util.TimeZone
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
         DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
       case (t: Timestamp, TimestampType) =>
         DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
-          TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
+          DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
       case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
       case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
       case (other, tpe) if primitiveTypes.contains(tpe) => other.toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 2d70172487e17ad33a4c9b53db49e21195f0f332..f61c673baaa58dd37aabcf64877e8c188117068b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -94,7 +94,7 @@ object PartitioningUtils {
       typeInference: Boolean,
       basePaths: Set[Path],
       timeZoneId: String): PartitionSpec = {
-    parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId))
+    parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId))
   }
 
   private[datasources] def parsePartitions(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 62e4c6e4b4ea0e77f0b75ca5bd454c165e118639..78c16b75ee68430346c24faad3e1d92e39c111d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -117,7 +117,7 @@ class CSVOptions(
     name.map(CompressionCodecs.getCodecClassName)
   }
 
-  val timeZone: TimeZone = TimeZone.getTimeZone(
+  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 693933f95a231d5b5e4e4b70494e0a01b00d9303..a4e4ca821374ca9fd6705bf95b244b3df9f5808e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.text.SimpleDateFormat
-import java.util.{Date, TimeZone, UUID}
+import java.util.{Date, UUID}
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -82,7 +83,7 @@ trait ProgressReporter extends Logging {
   private var lastNoDataProgressEventTime = Long.MinValue
 
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
-  timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
 
   @volatile
   protected var currentStatus: StreamingQueryStatus = {