diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index d31f3fb8f6046fb19a8a9842db0605150229e202..167833488980af286c22c19b248e37e9e5b6bcb9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -158,7 +158,8 @@ class DataFrameReader(OptionUtils):
     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
+             mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
+             timeZone=None):
         """
         Loads a JSON file (`JSON Lines text format or newline-delimited JSON
         <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
@@ -204,11 +205,13 @@ class DataFrameReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp format. Custom date
                                 formats follow the formats at ``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is set, it uses the
-                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+                         If None is set, it uses the default value, session local timezone.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -225,7 +228,7 @@ class DataFrameReader(OptionUtils):
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
-            timestampFormat=timestampFormat)
+            timestampFormat=timestampFormat, timeZone=timeZone)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -298,7 +301,7 @@ class DataFrameReader(OptionUtils):
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
             negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
-            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -341,11 +344,11 @@ class DataFrameReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp format. Custom date
                                 formats follow the formats at ``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is set, it uses the
-                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -357,6 +360,8 @@ class DataFrameReader(OptionUtils):
                                             uses the default value, ``10``.
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
+        :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+                         If None is set, it uses the default value, session local timezone.
 
                 * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
                     When a schema is set by user, it sets ``null`` for extra fields.
@@ -374,7 +379,7 @@ class DataFrameReader(OptionUtils):
             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
             dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
             maxCharsPerColumn=maxCharsPerColumn,
-            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
         if isinstance(path, basestring):
             path = [path]
         return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -591,7 +596,8 @@ class DataFrameWriter(OptionUtils):
         self._jwrite.saveAsTable(name)
 
     @since(1.4)
-    def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
+    def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
+             timeZone=None):
         """Saves the content of the :class:`DataFrame` in JSON format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -607,17 +613,20 @@ class DataFrameWriter(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp format. Custom date
                                 formats follow the formats at ``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is set, it uses the
-                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used to format timestamps.
+                         If None is set, it uses the default value, session local timezone.
 
         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(
-            compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
+            compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
+            timeZone=timeZone)
         self._jwrite.json(path)
 
     @since(1.4)
@@ -664,7 +673,7 @@ class DataFrameWriter(OptionUtils):
     @since(2.0)
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
             header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
-            timestampFormat=None):
+            timestampFormat=None, timeZone=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -699,18 +708,20 @@ class DataFrameWriter(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp format. Custom date
                                 formats follow the formats at ``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is set, it uses the
-                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+                         If None is set, it uses the default value, session local timezone.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
                        nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
-                       dateFormat=dateFormat, timestampFormat=timestampFormat)
+                       dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone)
         self._jwrite.csv(path)
 
     @since(1.5)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a10b185cd4c7b979e590e892850a0b3f973ed23e..d988e596a86d9e66ec64b11e04596ce45da6fa09 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -429,7 +429,7 @@ class DataStreamReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
-             timestampFormat=None):
+             timestampFormat=None, timeZone=None):
         """
         Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
         <http://jsonlines.org/>`_) and returns a :class`DataFrame`.
@@ -476,11 +476,13 @@ class DataStreamReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp format. Custom date
                                 formats follow the formats at ``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is set, it uses the
-                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+                         If None is set, it uses the default value, session local timezone.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
         >>> json_sdf.isStreaming
@@ -494,7 +496,7 @@ class DataStreamReader(OptionUtils):
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
-            timestampFormat=timestampFormat)
+            timestampFormat=timestampFormat, timeZone=timeZone)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
@@ -552,7 +554,7 @@ class DataStreamReader(OptionUtils):
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
             negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
-            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
         """Loads a CSV file stream and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -597,11 +599,11 @@ class DataStreamReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp format. Custom date
                                 formats follow the formats at ``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is set, it uses the
-                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -609,6 +611,8 @@ class DataStreamReader(OptionUtils):
                                   ``-1`` meaning unlimited length.
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
+        :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+                         If None is set, it uses the default value, session local timezone.
 
                 * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
                     When a schema is set by user, it sets ``null`` for extra fields.
@@ -628,7 +632,7 @@ class DataStreamReader(OptionUtils):
             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
             dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
             maxCharsPerColumn=maxCharsPerColumn,
-            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))
         else:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c410e7919a35aa552012e4aa277dd2fdc5fc8e81..bd852a50fe71e07a539f4846a207a5ba3e14960f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression])
 /**
  * Converts an json input string to a [[StructType]] with the specified schema.
  */
-case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
-  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class JsonToStruct(
+    schema: StructType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
   override def nullable: Boolean = true
 
+  def this(schema: StructType, options: Map[String, String], child: Expression) =
+    this(schema, options, child, None)
+
   @transient
   lazy val parser =
     new JacksonParser(
       schema,
       "invalid", // Not used since we force fail fast.  Invalid rows will be set to `null`.
-      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
 
   override def dataType: DataType = schema
 
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
   override def nullSafeEval(json: Any): Any = {
     try parser.parse(json.toString).headOption.orNull catch {
       case _: SparkSQLJsonProcessingException => null
@@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
 /**
  * Converts a [[StructType]] to a json output string.
  */
-case class StructToJson(options: Map[String, String], child: Expression)
-  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class StructToJson(
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
   override def nullable: Boolean = true
 
+  def this(options: Map[String, String], child: Expression) = this(options, child, None)
+
   @transient
   lazy val writer = new CharArrayWriter()
 
@@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression)
     new JacksonGenerator(
       child.dataType.asInstanceOf[StructType],
       writer,
-      new JSONOptions(options))
+      new JSONOptions(options, timeZoneId.get))
 
   override def dataType: DataType = StringType
 
@@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression)
     }
   }
 
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
   override def nullSafeEval(row: Any): Any = {
     gen.write(row.asInstanceOf[InternalRow])
     gen.flush()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02bd8dede43cd4747b059b44805cd877677c50a2..5307ce1cb711d4554f58aca1a34c087226770853 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.json
 
-import java.util.Locale
+import java.util.{Locale, TimeZone}
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.commons.lang3.time.FastDateFormat
@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-    @transient private val parameters: CaseInsensitiveMap[String])
+    @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
   extends Logging with Serializable  {
 
-  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+  def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+    this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
 
   val samplingRatio =
     parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -58,13 +59,15 @@ private[sql] class JSONOptions(
   private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
   val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
 
+  val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
   // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
   val dateFormat: FastDateFormat =
     FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
 
   val timestampFormat: FastDateFormat =
     FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
 
   // Parse mode flags
   if (!ParseModes.isValidMode(parseMode)) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index bf8e3c812ee8f3e6be70f5a1e2028c5facd5f554..dec55279c9fc58949ebcf17fba03dd38308a32d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
 private[sql] class JacksonGenerator(
     schema: StructType,
     writer: Writer,
-    options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
+    options: JSONOptions) {
   // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
   // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
   // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 8e20bd1d9724877ec8ab372906231c6cc765e819..0c46819cdb9cd7c81b6104e786dfa35728afa026 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.util.Calendar
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.ParseModes
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -305,51 +307,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   test("json_tuple - hive key 4 - null json") {
     checkJsonTuple(
       JsonTuple(Literal(null) :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - hive key 5 - null and empty fields") {
     checkJsonTuple(
       JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null)))
+      InternalRow(UTF8String.fromString(""), null, null, null, null))
   }
 
   test("json_tuple - hive key 6 - invalid json (array)") {
     checkJsonTuple(
       JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - invalid json (object start only)") {
     checkJsonTuple(
       JsonTuple(Literal("{") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - invalid json (no object end)") {
     checkJsonTuple(
       JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - invalid json (invalid json)") {
     checkJsonTuple(
       JsonTuple(Literal("\\") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - preserve newlines") {
     checkJsonTuple(
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
-      InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
+      InternalRow(UTF8String.fromString("b\nc")))
   }
 
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
   test("from_json") {
     val jsonData = """{"a": 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData)),
-      InternalRow.fromSeq(1 :: Nil)
+      JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+      InternalRow(1)
     )
   }
 
@@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val jsonData = """{"a" 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
       null
     )
 
     // Other modes should still return `null`.
     checkEvaluation(
-      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
       null
     )
   }
@@ -371,15 +375,58 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal.create(null, StringType)),
+      JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
       null
     )
   }
 
+  test("from_json with timestamp") {
+    val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+    val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}"""
+    var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+    c.set(2016, 0, 1, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 123)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+      InternalRow(c.getTimeInMillis * 1000L)
+    )
+    // The result doesn't change because the json string includes timezone string ("Z" here),
+    // which means the string represents the timestamp string in the timezone regardless of
+    // the timeZoneId parameter.
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+      InternalRow(c.getTimeInMillis * 1000L)
+    )
+
+    val jsonData2 = """{"t": "2016-01-01T00:00:00"}"""
+    for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
+      c = Calendar.getInstance(tz)
+      c.set(2016, 0, 1, 0, 0, 0)
+      c.set(Calendar.MILLISECOND, 0)
+      checkEvaluation(
+        JsonToStruct(
+          schema,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
+          Literal(jsonData2),
+          Option(tz.getID)),
+        InternalRow(c.getTimeInMillis * 1000L)
+      )
+      checkEvaluation(
+        JsonToStruct(
+          schema,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID),
+          Literal(jsonData2),
+          gmtId),
+        InternalRow(c.getTimeInMillis * 1000L)
+      )
+    }
+  }
+
   test("SPARK-19543: from_json empty input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal.create(" ", StringType)),
+      JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
       null
     )
   }
@@ -388,7 +435,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val struct = Literal.create(create_row(1), schema)
     checkEvaluation(
-      StructToJson(Map.empty, struct),
+      StructToJson(Map.empty, struct, gmtId),
       """{"a":1}"""
     )
   }
@@ -397,8 +444,40 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val struct = Literal.create(null, schema)
     checkEvaluation(
-      StructToJson(Map.empty, struct),
+      StructToJson(Map.empty, struct, gmtId),
       null
     )
   }
+
+  test("to_json with timestamp") {
+    val schema = StructType(StructField("t", TimestampType) :: Nil)
+    val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+    c.set(2016, 0, 1, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 0)
+    val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
+
+    checkEvaluation(
+      StructToJson(Map.empty, struct, gmtId),
+      """{"t":"2016-01-01T00:00:00.000Z"}"""
+    )
+    checkEvaluation(
+      StructToJson(Map.empty, struct, Option("PST")),
+      """{"t":"2015-12-31T16:00:00.000-08:00"}"""
+    )
+
+    checkEvaluation(
+      StructToJson(
+        Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get),
+        struct,
+        gmtId),
+      """{"t":"2016-01-01T00:00:00"}"""
+    )
+    checkEvaluation(
+      StructToJson(
+        Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"),
+        struct,
+        gmtId),
+      """{"t":"2015-12-31T16:00:00"}"""
+    )
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1830839aeebb70a7eb3d753d970a6ebc5c11786f..780fe51ac699dda5b8caae2797d2753af371f8b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Partition
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
@@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+   * to be used to parse timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
@@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @since 1.4.0
    */
   def json(jsonRDD: RDD[String]): DataFrame = {
-    val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+    val parsedOptions: JSONOptions =
+      new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone)
     val columnNameOfCorruptRecord =
       parsedOptions.columnNameOfCorruptRecord
         .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+   * to be used to parse timestamps.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 748ebba3e8eb28737b0561cc543d6b583823fd6a..1d834b18212efbb49f19fd4129eb631d3fe8f03d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+   * to be used to format timestamps.</li>
    * </ul>
    *
    * @since 1.4.0
@@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+   * to be used to format timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6b80ff48bbc05dab825ab4d38111d23df6c85e96..e62cd9f7bf018dfc2329bf2b8df1605e25cc3bd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.json.JacksonGenerator
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans._
@@ -2678,10 +2678,12 @@ class Dataset[T] private[sql](
    */
   def toJSON: Dataset[String] = {
     val rowSchema = this.schema
+    val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone
     val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
       val writer = new CharArrayWriter()
       // create the Generator without separator inserted between 2 records
-      val gen = new JacksonGenerator(rowSchema, writer)
+      val gen = new JacksonGenerator(rowSchema, writer,
+        new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
 
       new Iterator[String] {
         override def hasNext: Boolean = iter.hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1d2bf07047a23f3230021bd7a60c6b668674111a..566f40f454393da8922fc78bc71e20becc862276 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.sources._
@@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       files: Seq[FileStatus]): Option[StructType] = {
     require(files.nonEmpty, "Cannot infer schema from an empty set of files")
 
-    val csvOptions = new CSVOptions(options)
+    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
     val paths = files.map(_.getPath.toString)
     val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths)
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       dataSchema: StructType): OutputWriterFactory = {
     CSVUtils.verifySchema(dataSchema)
     val conf = job.getConfiguration
-    val csvOptions = new CSVOptions(options)
+    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
     csvOptions.compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }
@@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
-    val csvOptions = new CSVOptions(options)
+    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9d79ea6ed1789c27cdd95d8c09b406b5d5229f7b..b7fbaa4f44a62bab77e7fcd3e428d63574ac8040 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.nio.charset.StandardCharsets
-import java.util.Locale
+import java.util.{Locale, TimeZone}
 
 import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
 import org.apache.commons.lang3.time.FastDateFormat
@@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
 
-private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[csv] class CSVOptions(
+    @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
   extends Logging with Serializable {
 
-  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+  def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+    this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
 
   private def getChar(paramName: String, default: Char): Char = {
     val paramValue = parameters.get(paramName)
@@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
     name.map(CompressionCodecs.getCodecClassName)
   }
 
+  val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
   // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
   val dateFormat: FastDateFormat =
     FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
 
   val timestampFormat: FastDateFormat =
     FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
 
   val maxColumns = getInt("maxColumns", 20480)
 
@@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
     settings
   }
 }
-
-object CSVOptions {
-
-  def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty))
-
-  def apply(paramName: String, paramValue: String): CSVOptions = {
-    new CSVOptions(Map(paramName -> paramValue))
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index ee79138c0f19ba9967223630ed7787cda6e0a900..4082a0df8ba75093f949d3c3c27750e168804793 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
 private[csv] class UnivocityGenerator(
     schema: StructType,
     writer: Writer,
-    options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
+    options: CSVOptions) {
   private val writerSettings = options.asWriterSettings
   writerSettings.setHeaders(schema.fieldNames: _*)
   private val gen = new CsvWriter(writer, writerSettings)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3b42aa60b024e1a047f5784842d71d82a5e27d23..2e409b3f5fbfca58b7a14d9ba790a74deb17e01b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -76,7 +76,7 @@ private[csv] class UnivocityParser(
       name: String,
       dataType: DataType,
       nullable: Boolean = true,
-      options: CSVOptions = CSVOptions()): ValueConverter = dataType match {
+      options: CSVOptions): ValueConverter = dataType match {
     case _: ByteType => (d: String) =>
       nullSafeDatum(d, name, nullable, options)(_.toByte)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 98ab9d28500032edd26628a177782dacea2021ee..b4a8ff2cf01ad0c607f2e4b0d64c51a396e0a605 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
     if (files.isEmpty) {
       None
     } else {
-      val parsedOptions: JSONOptions = new JSONOptions(options)
+      val parsedOptions: JSONOptions =
+        new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
       val columnNameOfCorruptRecord =
         parsedOptions.columnNameOfCorruptRecord
           .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
     val conf = job.getConfiguration
-    val parsedOptions: JSONOptions = new JSONOptions(options)
+    val parsedOptions: JSONOptions =
+      new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
     parsedOptions.compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }
@@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
-    val parsedOptions: JSONOptions = new JSONOptions(options)
+    val parsedOptions: JSONOptions =
+      new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
     val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
       .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b7ffb3cddb472ce8d1d2bbcdefaa112841a8a556..4e706da184c0b5c1ea59deeec8afda18c40cb810 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+   * to be used to parse timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
@@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+   * to be used to parse timestamps.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index d8c6c25504781ddf11a55b2b894549165320411d..661742087112f2eb96b9a42064208a7124261c80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types._
 class CSVInferSchemaSuite extends SparkFunSuite {
 
   test("String fields types are inferred correctly from null types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(NullType, "", options) == NullType)
     assert(CSVInferSchema.inferField(NullType, null, options) == NullType)
     assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType)
@@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("String fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType)
     assert(CSVInferSchema.inferField(LongType, "test", options) == StringType)
     assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType)
@@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Timestamp field types are inferred correctly via custom data format") {
-    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
+    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
-    options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
+    options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
   }
 
   test("Timestamp field types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
     assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType)
     assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType)
   }
 
   test("Boolean fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType)
     assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType)
   }
@@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Null fields are handled properly when a nullValue is specified") {
-    var options = new CSVOptions(Map("nullValue" -> "null"))
+    var options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
     assert(CSVInferSchema.inferField(NullType, "null", options) == NullType)
     assert(CSVInferSchema.inferField(StringType, "null", options) == StringType)
     assert(CSVInferSchema.inferField(LongType, "null", options) == LongType)
 
-    options = new CSVOptions(Map("nullValue" -> "\\N"))
+    options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType)
     assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType)
     assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType)
@@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
-    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
+    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
   }
 
   test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
 
     // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9).
     assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) ==
@@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
 
   test("DoubleType should be infered when user defined nan/inf are provided") {
     val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf",
-      "positiveInf" -> "inf"))
+      "positiveInf" -> "inf"), "GMT")
     assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType)
     assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType)
     assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index df9cebbe58d5b3c5674e29d7c205100749050380..0c9a7298c3fa0caec237c7f267813bf5b4b6ce17 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -839,7 +839,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     }
   }
 
-  test("Write timestamps correctly with dateFormat option") {
+  test("Write timestamps correctly with timestampFormat option") {
     withTempDir { dir =>
       // With dateFormat option.
       val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
@@ -870,6 +870,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     }
   }
 
+  test("Write timestamps correctly with timestampFormat option and timeZone option") {
+    withTempDir { dir =>
+      // With dateFormat option and timeZone option.
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+      val timestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      timestampsWithFormat.write
+        .format("csv")
+        .option("header", "true")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringTimestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(timestampsWithFormatPath)
+      val expectedStringTimestampsWithFormat = Seq(
+        Row("2015/08/27 01:00"),
+        Row("2014/10/28 01:30"),
+        Row("2016/01/29 04:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+
+      val readBack = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .load(timestampsWithFormatPath)
+
+      checkAnswer(readBack, timestampsWithFormat)
+    }
+  }
+
   test("load duplicated field names consistently with null or empty strings - case sensitive") {
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
       withTempPath { path =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index 62dae08861df1f8d5c14732a633bfcb5a28593d8..a74b22a4a88a63b42c95f32645ecc06259d5ca59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 class UnivocityParserSuite extends SparkFunSuite {
   private val parser =
-    new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String]))
+    new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT"))
 
   private def assertNull(v: Any) = assert(v == null)
 
@@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite {
 
     stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
       val decimalValue = new BigDecimal(decimalVal.toString)
-      assert(parser.makeConverter("_1", decimalType).apply(strVal) ===
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) ===
         Decimal(decimalValue, decimalType.precision, decimalType.scale))
     }
   }
@@ -50,20 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite {
     // Nullable field with nullValue option.
     types.foreach { t =>
       // Tests that a custom nullValue.
+      val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT")
       val converter =
-        parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", "-"))
+        parser.makeConverter("_1", t, nullable = true, options = nullValueOptions)
       assertNull(converter.apply("-"))
       assertNull(converter.apply(null))
 
       // Tests that the default nullValue is empty string.
-      assertNull(parser.makeConverter("_1", t, nullable = true).apply(""))
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply(""))
     }
 
     // Not nullable field with nullValue option.
     types.foreach { t =>
       // Casts a null to not nullable field should throw an exception.
+      val options = new CSVOptions(Map("nullValue" -> "-"), "GMT")
       val converter =
-        parser.makeConverter("_1", t, nullable = false, CSVOptions("nullValue", "-"))
+        parser.makeConverter("_1", t, nullable = false, options = options)
       var message = intercept[RuntimeException] {
         converter.apply("-")
       }.getMessage
@@ -77,48 +81,52 @@ class UnivocityParserSuite extends SparkFunSuite {
     // If nullValue is different with empty string, then, empty string should not be casted into
     // null.
     Seq(true, false).foreach { b =>
+      val options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
       val converter =
-        parser.makeConverter("_1", StringType, nullable = b, CSVOptions("nullValue", "null"))
+        parser.makeConverter("_1", StringType, nullable = b, options = options)
       assert(converter.apply("") == UTF8String.fromString(""))
     }
   }
 
   test("Throws exception for empty string with non null type") {
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
     val exception = intercept[RuntimeException]{
-      parser.makeConverter("_1", IntegerType, nullable = false, CSVOptions()).apply("")
+      parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("")
     }
     assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
   }
 
   test("Types are cast correctly") {
-    assert(parser.makeConverter("_1", ByteType).apply("10") == 10)
-    assert(parser.makeConverter("_1", ShortType).apply("10") == 10)
-    assert(parser.makeConverter("_1", IntegerType).apply("10") == 10)
-    assert(parser.makeConverter("_1", LongType).apply("10") == 10)
-    assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0)
-    assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0)
-    assert(parser.makeConverter("_1", BooleanType).apply("true") == true)
-
-    val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10)
+    assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10)
+    assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10)
+    assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10)
+    assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0)
+    assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0)
+    assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)
+
+    val timestampsOptions =
+      new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
     val customTimestamp = "31/01/2015 00:00"
     val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
     val castedTimestamp =
-      parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions)
+      parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
         .apply(customTimestamp)
     assert(castedTimestamp == expectedTime * 1000L)
 
     val customDate = "31/01/2015"
-    val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+    val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
     val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
     val castedDate =
-      parser.makeConverter("_1", DateType, nullable = true, dateOptions)
+      parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
         .apply(customTimestamp)
     assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
 
     val timestamp = "2015-01-01 00:00:00"
-    assert(parser.makeConverter("_1", TimestampType).apply(timestamp) ==
+    assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) ==
       DateTimeUtils.stringToTime(timestamp).getTime  * 1000L)
-    assert(parser.makeConverter("_1", DateType).apply("2015-01-01") ==
+    assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") ==
       DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime))
   }
 
@@ -127,16 +135,18 @@ class UnivocityParserSuite extends SparkFunSuite {
     try {
       Locale.setDefault(new Locale("fr", "FR"))
       // Would parse as 1.0 in fr-FR
-      assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0)
-      assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0)
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0)
+      assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0)
     } finally {
       Locale.setDefault(originalLocale)
     }
   }
 
   test("Float NaN values are parsed correctly") {
+    val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT")
     val floatVal: Float = parser.makeConverter(
-      "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn")
+      "_1", FloatType, nullable = true, options = options
     ).apply("nn").asInstanceOf[Float]
 
     // Java implements the IEEE-754 floating point standard which guarantees that any comparison
@@ -145,36 +155,41 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Double NaN values are parsed correctly") {
+    val options = new CSVOptions(Map("nanValue" -> "-"), "GMT")
     val doubleVal: Double = parser.makeConverter(
-      "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-")
+      "_1", DoubleType, nullable = true, options = options
     ).apply("-").asInstanceOf[Double]
 
     assert(doubleVal.isNaN)
   }
 
   test("Float infinite values can be parsed") {
+    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
     val floatVal1 = parser.makeConverter(
-      "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max")
+      "_1", FloatType, nullable = true, options = negativeInfOptions
     ).apply("max").asInstanceOf[Float]
 
     assert(floatVal1 == Float.NegativeInfinity)
 
+    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
     val floatVal2 = parser.makeConverter(
-      "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max")
+      "_1", FloatType, nullable = true, options = positiveInfOptions
     ).apply("max").asInstanceOf[Float]
 
     assert(floatVal2 == Float.PositiveInfinity)
   }
 
   test("Double infinite values can be parsed") {
+    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
     val doubleVal1 = parser.makeConverter(
-      "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max")
+      "_1", DoubleType, nullable = true, options = negativeInfOptions
     ).apply("max").asInstanceOf[Double]
 
     assert(doubleVal1 == Double.NegativeInfinity)
 
+    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
     val doubleVal2 = parser.makeConverter(
-      "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max")
+      "_1", DoubleType, nullable = true, options = positiveInfOptions
     ).apply("max").asInstanceOf[Double]
 
     assert(doubleVal2 == Double.PositiveInfinity)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 156fd965b468389909782b13e80ebc11e4becb30..9344aeda00175fd3a213149fea4730c1d8018809 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
         generator.flush()
       }
 
-      val dummyOption = new JSONOptions(Map.empty[String, String])
+      val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
       val dummySchema = StructType(Seq.empty)
       val parser = new JacksonParser(dummySchema, "", dummyOption)
 
@@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
   test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
     // This is really a test that it doesn't throw an exception
-    val emptySchema = JsonInferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String]))
+    val emptySchema = JsonInferSchema.infer(
+      empty, "", new JSONOptions(Map.empty[String, String], "GMT"))
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
   test("SPARK-8093 Erase empty structs") {
     val emptySchema = JsonInferSchema.infer(
-      emptyRecords, "", new JSONOptions(Map.empty[String, String]))
+      emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT"))
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1723,7 +1724,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
   }
 
-  test("Write timestamps correctly with dateFormat option") {
+  test("Write timestamps correctly with timestampFormat option") {
     val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
     withTempDir { dir =>
       // With dateFormat option.
@@ -1751,6 +1752,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
   }
 
+  test("Write timestamps correctly with timestampFormat option and timeZone option") {
+    val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+    withTempDir { dir =>
+      // With dateFormat option and timeZone option.
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+      val timestampsWithFormat = spark.read
+        .schema(customSchema)
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+      timestampsWithFormat.write
+        .format("json")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+      val stringTimestampsWithFormat = spark.read
+        .schema(stringSchema)
+        .json(timestampsWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/27 01:00"),
+        Row("2014/10/28 01:30"),
+        Row("2016/01/29 04:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+
+      val readBack = spark.read
+        .schema(customSchema)
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .json(timestampsWithFormatPath)
+
+      checkAnswer(readBack, timestampsWithFormat)
+    }
+  }
+
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
     val records = sparkContext
       .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 76ffb949f12930ec37251dde71c375149323ecf9..9b5e364e512a26765f0e8aedeec58cf997eb6366 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.sources
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
   private def getProvidingClass(name: String): Class[_] =
-    DataSource(sparkSession = null, className = name).providingClass
+    DataSource(
+      sparkSession = null,
+      className = name,
+      options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass
 
   test("jdbc") {
     assert(