diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index cc0cbba275b81b0a77248dc6571c9c64c31bac44..c39017ebbfe60cb16c113b341c18985167778a72 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -132,10 +132,10 @@ case class CatalogTablePartition(
   /**
    * Given the partition schema, returns a row with that schema holding the partition values.
    */
-  def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
+  def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = {
     val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
     val timeZoneId = caseInsensitiveProperties.getOrElse(
-      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
+      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
     InternalRow.fromSeq(partitionSchema.map { field =>
       val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
         null
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 6c1592fd8881dfc85be94a7b1242241da1035f1d..bf596fa0a89db553f1bd6f1452b97b770a1800e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -498,6 +498,11 @@ object DateTimeUtils {
     false
   }
 
+  lazy val validTimezones = TimeZone.getAvailableIDs().toSet
+  def isValidTimezone(timezoneId: String): Boolean = {
+    validTimezones.contains(timezoneId)
+  }
+
   /**
    * Returns the microseconds since year zero (-17999) from microseconds since epoch.
    */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 9d641b528723a6dda5d1fe6b343b6e414d53042d..dabbc2b6387e4c32e9c9ea86a2c5e1ac6d91453b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.util.TimeZone;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -30,6 +32,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -90,11 +93,30 @@ public class VectorizedColumnReader {
 
   private final PageReader pageReader;
   private final ColumnDescriptor descriptor;
+  private final TimeZone storageTz;
+  private final TimeZone sessionTz;
 
-  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
+  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader,
+                                Configuration conf)
       throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
+    // If the table has a timezone property, apply the correct conversions.  See SPARK-12297.
+    // The conf is sometimes null in tests.
+    String sessionTzString =
+        conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key());
+    if (sessionTzString == null || sessionTzString.isEmpty()) {
+      sessionTz = DateTimeUtils.defaultTimeZone();
+    } else {
+      sessionTz = TimeZone.getTimeZone(sessionTzString);
+    }
+    String storageTzString =
+        conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY());
+    if (storageTzString == null || storageTzString.isEmpty()) {
+      storageTz = sessionTz;
+    } else {
+      storageTz = TimeZone.getTimeZone(storageTzString);
+    }
     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
 
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -289,7 +311,7 @@ public class VectorizedColumnReader {
             // TODO: Convert dictionary of Binaries to dictionary of Longs
             if (!column.isNullAt(i)) {
               Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
-              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
+              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz));
             }
           }
         } else {
@@ -422,7 +444,7 @@ public class VectorizedColumnReader {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putLong(rowId + i,
               // Read 12 bytes for INT96
-              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
+              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz));
         } else {
           column.putNull(rowId + i);
         }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 51bdf0f0f2291878bc7eeb37ce5bc15ecbe01056..d8974ddf247041a939ab728039931e9e251f0a2a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -95,6 +96,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    */
   private boolean returnColumnarBatch;
 
+  private Configuration conf;
+
   /**
    * The default config on whether columnarBatch should be offheap.
    */
@@ -107,6 +110,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException, UnsupportedOperationException {
     super.initialize(inputSplit, taskAttemptContext);
+    this.conf = taskAttemptContext.getConfiguration();
     initializeInternal();
   }
 
@@ -277,7 +281,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i),
-          pages.getPageReader(columns.get(i)));
+          pages.getPageReader(columns.get(i)), conf);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index ebf03e1bf8869753add213d1cda89268c04c713f..5843c5b56d44cbbaa9bf1ede06f051df1a51c3fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 import scala.util.Try
 
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -74,6 +73,10 @@ case class CreateTableLikeCommand(
       sourceTableDesc.provider
     }
 
+    val properties = sourceTableDesc.properties.filter { case (k, _) =>
+      k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    }
+
     // If the location is specified, we create an external table internally.
     // Otherwise create a managed table.
     val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL
@@ -86,6 +89,7 @@ case class CreateTableLikeCommand(
           locationUri = location.map(CatalogUtils.stringToURI(_))),
         schema = sourceTableDesc.schema,
         provider = newProvider,
+        properties = properties,
         partitionColumnNames = sourceTableDesc.partitionColumnNames,
         bucketSpec = sourceTableDesc.bucketSpec)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2f3a2c62b912c1f245fa7ceca996294ece134347..8113768cd793f99051d774fdead3ed0e10c12b20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -632,4 +632,6 @@ object ParquetFileFormat extends Logging {
         Failure(cause)
     }.toOption
   }
+
+  val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index f1a35dd8a6200fb4c14069dd75e629d3707d08a9..bf395a0bef745e500aedcb3e1d7cd0df1954571d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
-      new ParquetSchemaConverter(conf))
+      new ParquetSchemaConverter(conf),
+      conf)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 4e49a0dac97c0e00e96c46392cbf71dcd2e48fe2..df041996cdea9fa387f0942ee216709566073915 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
 import org.apache.parquet.schema.MessageType
 
@@ -29,13 +30,17 @@ import org.apache.spark.sql.types.StructType
  * @param parquetSchema Parquet schema of the records to be read
  * @param catalystSchema Catalyst schema of the rows to be constructed
  * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
+ * @param hadoopConf hadoop Configuration for passing extra params for parquet conversion
  */
 private[parquet] class ParquetRecordMaterializer(
-    parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
+    parquetSchema: MessageType,
+    catalystSchema: StructType,
+    schemaConverter: ParquetSchemaConverter,
+    hadoopConf: Configuration)
   extends RecordMaterializer[UnsafeRow] {
 
   private val rootConverter =
-    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
+    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater)
 
   override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 32e6c60cd9766324f378c9cce950774480abc60d..d52ff62d93b26c90c6ec57818bc94ecb31baf5ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
+import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
@@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -117,12 +120,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
  *        types should have been expanded.
+ * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class ParquetRowConverter(
     schemaConverter: ParquetSchemaConverter,
     parquetType: GroupType,
     catalystType: StructType,
+    hadoopConf: Configuration,
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
@@ -261,18 +266,18 @@ private[parquet] class ParquetRowConverter(
 
       case TimestampType =>
         // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+        // If the table has a timezone property, apply the correct conversions.  See SPARK-12297.
+        val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
+        val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
+          .getOrElse(DateTimeUtils.defaultTimeZone())
+        val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
+        val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
         new ParquetPrimitiveConverter(updater) {
           // Converts nanosecond timestamps stored as INT96
           override def addBinary(value: Binary): Unit = {
-            assert(
-              value.length() == 12,
-              "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " +
-              s"but got a ${value.length()}-byte binary.")
-
-            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
-            val timeOfDayNanos = buf.getLong
-            val julianDay = buf.getInt
-            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
+            val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz,
+              storageTz = storageTz)
+            updater.setLong(timestamp)
           }
         }
 
@@ -302,7 +307,7 @@ private[parquet] class ParquetRowConverter(
 
       case t: StructType =>
         new ParquetRowConverter(
-          schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
+          schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater {
             override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
           })
 
@@ -651,6 +656,7 @@ private[parquet] class ParquetRowConverter(
 }
 
 private[parquet] object ParquetRowConverter {
+
   def binaryToUnscaledLong(binary: Binary): Long = {
     // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
     // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
@@ -673,12 +679,35 @@ private[parquet] object ParquetRowConverter {
     unscaled
   }
 
-  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
+  /**
+   * Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone.
+   * The timestamp is really meant to be interpreted as a "floating time", but since we
+   * actually store it as micros since epoch, why we have to apply a conversion when timezones
+   * change.
+   *
+   * @param binary a parquet Binary which holds one int96
+   * @param sessionTz the session timezone.  This will be used to determine how to display the time,
+    *                  and compute functions on the timestamp which involve a timezone, eg. extract
+    *                  the hour.
+   * @param storageTz the timezone which was used to store the timestamp.  This should come from the
+    *                  timestamp table property, or else assume its the same as the sessionTz
+   * @return a timestamp (millis since epoch) which will render correctly in the sessionTz
+   */
+  def binaryToSQLTimestamp(
+      binary: Binary,
+      sessionTz: TimeZone,
+      storageTz: TimeZone): SQLTimestamp = {
     assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
       s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
     val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
     val timeOfDayNanos = buffer.getLong
     val julianDay = buffer.getInt
-    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+    val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+    // avoid expensive time logic if possible.
+    if (sessionTz.getID() != storageTz.getID()) {
+      DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz)
+    } else {
+      utcEpochMicros
+    }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 38b0e33937f3cee2135b1bba0da1c9426958aa28..679ed8e361b74f005fe3b9428f30f8d809454b1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.{ByteBuffer, ByteOrder}
 import java.util
+import java.util.TimeZone
 
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 
@@ -75,6 +76,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
   // Reusable byte array used to write decimal values
   private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
 
+  private var storageTz: TimeZone = _
+  private var sessionTz: TimeZone = _
+
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
     this.schema = StructType.fromString(schemaString)
@@ -91,6 +95,19 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
 
 
     this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
+    // If the table has a timezone property, apply the correct conversions.  See SPARK-12297.
+    val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
+    sessionTz = if (sessionTzString == null || sessionTzString == "") {
+      TimeZone.getDefault()
+    } else {
+      TimeZone.getTimeZone(sessionTzString)
+    }
+    val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
+    storageTz = if (storageTzString == null || storageTzString == "") {
+      sessionTz
+    } else {
+      TimeZone.getTimeZone(storageTzString)
+    }
 
     val messageType = new ParquetSchemaConverter(configuration).convert(schema)
     val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
@@ -178,7 +195,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
 
           // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
           // precision.  Nanosecond parts of timestamp values read from INT96 are simply stripped.
-          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+          val rawMicros = row.getLong(ordinal)
+          val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) {
+            rawMicros
+          } else {
+            DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz)
+          }
+          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros)
           val buf = ByteBuffer.wrap(timestampBuffer)
           buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
           recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ba48facff2933c3452eec5d0104476a2cf5929ff..8fef467f5f5cbac1e3f4d40e1bc697f1f231182a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -39,9 +39,10 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -224,6 +225,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
 
+    val tableTz = tableDefinition.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
+    tableTz.foreach { tz =>
+      if (!DateTimeUtils.isValidTimezone(tz)) {
+        throw new AnalysisException(s"Cannot set" +
+          s" ${ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY} to invalid timezone $tz")
+      }
+    }
+
     if (tableDefinition.tableType == VIEW) {
       client.createTable(tableDefinition, ignoreIfExists)
     } else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6b98066cb76c826d6f27b71b53fa64f17c2841c5..e0b565c0d79a07d606c0423de7bbc48259dc905a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
@@ -174,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             // We don't support hive bucketed tables, only ones we write out.
             bucketSpec = None,
             fileFormat = fileFormat,
-            options = options)(sparkSession = sparkSession)
+            options = options ++ getStorageTzOptions(relation))(sparkSession = sparkSession)
           val created = LogicalRelation(fsRelation, updatedTable)
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -201,7 +202,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 userSpecifiedSchema = Option(dataSchema),
                 // We don't support hive bucketed tables, only ones we write out.
                 bucketSpec = None,
-                options = options,
+                options = options ++ getStorageTzOptions(relation),
                 className = fileType).resolveRelation(),
               table = updatedTable)
 
@@ -222,6 +223,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     result.copy(output = newOutput)
   }
 
+  private def getStorageTzOptions(relation: CatalogRelation): Map[String, String] = {
+    // We add the table timezone to the relation options, which automatically gets injected into the
+    // hadoopConf for the Parquet Converters
+    val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap
+  }
+
   private def inferIfNeeded(
       relation: CatalogRelation,
       options: Map[String, String],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 05b6059472f59bc4f65b89375f077a4d43d5d0ef..2bfd63d9b56e6e301a0db209d669a659f60bea99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,12 +17,22 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+import java.net.URLDecoder
 import java.sql.Timestamp
+import java.util.TimeZone
 
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompatibilityTest, ParquetFileFormat}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
 
 class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton {
   /**
@@ -141,4 +151,369 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
       Row(Seq(Row(1))),
       "ARRAY<STRUCT<array_element: INT>>")
   }
+
+  val testTimezones = Seq(
+    "UTC" -> "UTC",
+    "LA" -> "America/Los_Angeles",
+    "Berlin" -> "Europe/Berlin"
+  )
+  // Check creating parquet tables with timestamps, writing data into them, and reading it back out
+  // under a variety of conditions:
+  // * tables with explicit tz and those without
+  // * altering table properties directly
+  // * variety of timezones, local & non-local
+  val sessionTimezones = testTimezones.map(_._2).map(Some(_)) ++ Seq(None)
+  sessionTimezones.foreach { sessionTzOpt =>
+    val sparkSession = spark.newSession()
+    sessionTzOpt.foreach { tz => sparkSession.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, tz) }
+    testCreateWriteRead(sparkSession, "no_tz", None, sessionTzOpt)
+    val localTz = TimeZone.getDefault.getID()
+    testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt)
+    // check with a variety of timezones.  The unit tests currently are configured to always use
+    // America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone.
+    testTimezones.foreach { case (tableName, zone) =>
+      if (zone != localTz) {
+        testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt)
+      }
+    }
+  }
+
+  private def testCreateWriteRead(
+      sparkSession: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]): Unit = {
+    testCreateAlterTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
+    testWriteTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
+    testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
+  }
+
+  private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = {
+    val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
+    assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz)
+  }
+
+  private def testCreateAlterTablesWithTimezone(
+      spark: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]): Unit = {
+    test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " +
+      s"sessionTzOpt = $sessionTzOpt") {
+      val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+      withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") {
+        // If we ever add a property to set the table timezone by default, defaultTz would change
+        val defaultTz = None
+        // check that created tables have correct TBLPROPERTIES
+        val tblProperties = explicitTz.map {
+          tz => s"""TBLPROPERTIES ($key="$tz")"""
+        }.getOrElse("")
+        spark.sql(
+          s"""CREATE TABLE $baseTable (
+                |  x int
+                | )
+                | STORED AS PARQUET
+                | $tblProperties
+            """.stripMargin)
+        val expectedTableTz = explicitTz.orElse(defaultTz)
+        checkHasTz(spark, baseTable, expectedTableTz)
+        spark.sql(
+          s"""CREATE TABLE partitioned_$baseTable (
+                |  x int
+                | )
+                | PARTITIONED BY (y int)
+                | STORED AS PARQUET
+                | $tblProperties
+            """.stripMargin)
+        checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz)
+        spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable")
+        checkHasTz(spark, s"like_$baseTable", expectedTableTz)
+        spark.sql(
+          s"""CREATE TABLE select_$baseTable
+                | STORED AS PARQUET
+                | AS
+                | SELECT * from $baseTable
+            """.stripMargin)
+        checkHasTz(spark, s"select_$baseTable", defaultTz)
+
+        // check alter table, setting, unsetting, resetting the property
+        spark.sql(
+          s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
+        checkHasTz(spark, baseTable, Some("America/Los_Angeles"))
+        spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
+        checkHasTz(spark, baseTable, Some("UTC"))
+        spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
+        checkHasTz(spark, baseTable, None)
+        explicitTz.foreach { tz =>
+          spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
+          checkHasTz(spark, baseTable, expectedTableTz)
+        }
+      }
+    }
+  }
+
+  val desiredTimestampStrings = Seq(
+    "2015-12-31 22:49:59.123",
+    "2015-12-31 23:50:59.123",
+    "2016-01-01 00:39:59.123",
+    "2016-01-01 01:29:59.123"
+  )
+  // We don't want to mess with timezones inside the tests themselves, since we use a shared
+  // spark context, and then we might be prone to issues from lazy vals for timezones.  Instead,
+  // we manually adjust the timezone just to determine what the desired millis (since epoch, in utc)
+  // is for various "wall-clock" times in different timezones, and then we can compare against those
+  // in our tests.
+  val timestampTimezoneToMillis = {
+    val originalTz = TimeZone.getDefault
+    try {
+      desiredTimestampStrings.flatMap { timestampString =>
+        Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId =>
+          TimeZone.setDefault(TimeZone.getTimeZone(tzId))
+          val timestamp = Timestamp.valueOf(timestampString)
+          (timestampString, tzId) -> timestamp.getTime()
+        }
+      }.toMap
+    } finally {
+      TimeZone.setDefault(originalTz)
+    }
+  }
+
+  private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = {
+    import spark.implicits._
+    val df = desiredTimestampStrings.toDF("display")
+    // this will get the millis corresponding to the display time given the current *session*
+    // timezone.
+    df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)]
+  }
+
+  private def testWriteTablesWithTimezone(
+      spark: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]) : Unit = {
+    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    test(s"SPARK-12297: Write to Parquet tables with Timestamps; explicitTz = $explicitTz; " +
+        s"sessionTzOpt = $sessionTzOpt") {
+
+      withTable(s"saveAsTable_$baseTable", s"insert_$baseTable", s"partitioned_ts_$baseTable") {
+        val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
+        // check that created tables have correct TBLPROPERTIES
+        val tblProperties = explicitTz.map {
+          tz => s"""TBLPROPERTIES ($key="$tz")"""
+        }.getOrElse("")
+
+        val rawData = createRawData(spark)
+        // Check writing data out.
+        // We write data into our tables, and then check the raw parquet files to see whether
+        // the correct conversion was applied.
+        rawData.write.saveAsTable(s"saveAsTable_$baseTable")
+        checkHasTz(spark, s"saveAsTable_$baseTable", None)
+        spark.sql(
+          s"""CREATE TABLE insert_$baseTable (
+                |  display string,
+                |  ts timestamp
+                | )
+                | STORED AS PARQUET
+                | $tblProperties
+               """.stripMargin)
+        checkHasTz(spark, s"insert_$baseTable", explicitTz)
+        rawData.write.insertInto(s"insert_$baseTable")
+        // no matter what, roundtripping via the table should leave the data unchanged
+        val readFromTable = spark.table(s"insert_$baseTable").collect()
+          .map { row => (row.getAs[String](0), row.getAs[Timestamp](1)).toString() }.sorted
+        assert(readFromTable === rawData.collect().map(_.toString()).sorted)
+
+        // Now we load the raw parquet data on disk, and check if it was adjusted correctly.
+        // Note that we only store the timezone in the table property, so when we read the
+        // data this way, we're bypassing all of the conversion logic, and reading the raw
+        // values in the parquet file.
+        val onDiskLocation = spark.sessionState.catalog
+          .getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath
+        // we test reading the data back with and without the vectorized reader, to make sure we
+        // haven't broken reading parquet from non-hive tables, with both readers.
+        Seq(false, true).foreach { vectorized =>
+          spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
+          val readFromDisk = spark.read.parquet(onDiskLocation).collect()
+          val storageTzId = explicitTz.getOrElse(sessionTzId)
+          readFromDisk.foreach { row =>
+            val displayTime = row.getAs[String](0)
+            val millis = row.getAs[Timestamp](1).getTime()
+            val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId))
+            assert(expectedMillis === millis, s"Display time '$displayTime' was stored " +
+              s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, expected " +
+              s"$expectedMillis (delta = ${millis - expectedMillis})")
+          }
+        }
+
+        // check tables partitioned by timestamps.  We don't compare the "raw" data in this case,
+        // since they are adjusted even when we bypass the hive table.
+        rawData.write.partitionBy("ts").saveAsTable(s"partitioned_ts_$baseTable")
+        val partitionDiskLocation = spark.sessionState.catalog
+          .getTableMetadata(TableIdentifier(s"partitioned_ts_$baseTable")).location.getPath
+        // no matter what mix of timezones we use, the dirs should specify the value with the
+        // same time we use for display.
+        val parts = new File(partitionDiskLocation).list().collect {
+          case name if name.startsWith("ts=") => URLDecoder.decode(name.stripPrefix("ts="))
+        }.toSet
+        assert(parts === desiredTimestampStrings.toSet)
+      }
+    }
+  }
+
+  private def testReadTablesWithTimezone(
+      spark: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]): Unit = {
+    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " +
+      s"sessionTzOpt = $sessionTzOpt") {
+      withTable(s"external_$baseTable", s"partitioned_$baseTable") {
+        // we intentionally save this data directly, without creating a table, so we can
+        // see that the data is read back differently depending on table properties.
+        // we'll save with adjusted millis, so that it should be the correct millis after reading
+        // back.
+        val rawData = createRawData(spark)
+        // to avoid closing over entire class
+        val timestampTimezoneToMillis = this.timestampTimezoneToMillis
+        import spark.implicits._
+        val adjustedRawData = (explicitTz match {
+          case Some(tzId) =>
+            rawData.map { case (displayTime, _) =>
+              val storageMillis = timestampTimezoneToMillis((displayTime, tzId))
+              (displayTime, new Timestamp(storageMillis))
+            }
+          case _ =>
+            rawData
+        }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts")
+        withTempPath { basePath =>
+          val unpartitionedPath = new File(basePath, "flat")
+          val partitionedPath = new File(basePath, "partitioned")
+          adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath)
+          val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++
+            explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map())
+
+          spark.catalog.createTable(
+            tableName = s"external_$baseTable",
+            source = "parquet",
+            schema = new StructType().add("display", StringType).add("ts", TimestampType),
+            options = options
+          )
+
+          // also write out a partitioned table, to make sure we can access that correctly.
+          // add a column we can partition by (value doesn't particularly matter).
+          val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId)
+          partitionedData.write.partitionBy("id")
+            .parquet(partitionedPath.getCanonicalPath)
+          // unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use
+          // a "CREATE TABLE" stmt.
+          val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("")
+          spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable (
+                         |  display string,
+                         |  ts timestamp
+                         |)
+                         |PARTITIONED BY (id bigint)
+                         |STORED AS parquet
+                         |LOCATION 'file:${partitionedPath.getCanonicalPath}'
+                         |$tblOpts
+                          """.stripMargin)
+          spark.sql(s"msck repair table partitioned_$baseTable")
+
+          for {
+            vectorized <- Seq(false, true)
+            partitioned <- Seq(false, true)
+          } {
+            withClue(s"vectorized = $vectorized; partitioned = $partitioned") {
+              spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
+              val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
+              val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable"
+              val query = s"select display, cast(ts as string) as ts_as_string, ts " +
+                s"from $table"
+              val collectedFromExternal = spark.sql(query).collect()
+              assert( collectedFromExternal.size === 4)
+              collectedFromExternal.foreach { row =>
+                val displayTime = row.getAs[String](0)
+                // the timestamp should still display the same, despite the changes in timezones
+                assert(displayTime === row.getAs[String](1).toString())
+                // we'll also check that the millis behind the timestamp has the appropriate
+                // adjustments.
+                val millis = row.getAs[Timestamp](2).getTime()
+                val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz))
+                val delta = millis - expectedMillis
+                val deltaHours = delta / (1000L * 60 * 60)
+                assert(millis === expectedMillis, s"Display time '$displayTime' did not have " +
+                  s"correct millis: was $millis, expected $expectedMillis; delta = $delta " +
+                  s"($deltaHours hours)")
+              }
+
+              // Now test that the behavior is still correct even with a filter which could get
+              // pushed down into parquet.  We don't need extra handling for pushed down
+              // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet
+              // does not read statistics from int96 fields, as they are unsigned.  See
+              // scalastyle:off line.size.limit
+              // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
+              // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
+              // scalastyle:on line.size.limit
+              //
+              // Just to be defensive in case anything ever changes in parquet, this test checks
+              // the assumption on column stats, and also the end-to-end behavior.
+
+              val hadoopConf = sparkContext.hadoopConfiguration
+              val fs = FileSystem.get(hadoopConf)
+              val parts = if (partitioned) {
+                val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath))
+                  .filter(_.getPath().getName().startsWith("id="))
+                fs.listStatus(subdirs.head.getPath())
+                  .filter(_.getPath().getName().endsWith(".parquet"))
+              } else {
+                fs.listStatus(new Path(unpartitionedPath.getCanonicalPath))
+                  .filter(_.getPath().getName().endsWith(".parquet"))
+              }
+              // grab the meta data from the parquet file.  The next section of asserts just make
+              // sure the test is configured correctly.
+              assert(parts.size == 1)
+              val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath)
+              assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2)
+              assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() ===
+                PrimitiveTypeName.INT96)
+              val oneBlockMeta = oneFooter.getBlocks().get(0)
+              val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1)
+              val columnStats = oneBlockColumnMeta.getStatistics
+              // This is the important assert.  Column stats are written, but they are ignored
+              // when the data is read back as mentioned above, b/c int96 is unsigned.  This
+              // assert makes sure this holds even if we change parquet versions (if eg. there
+              // were ever statistics even on unsigned columns).
+              assert(columnStats.isEmpty)
+
+              // These queries should return the entire dataset, but if the predicates were
+              // applied to the raw values in parquet, they would incorrectly filter data out.
+              Seq(
+                ">" -> "2015-12-31 22:00:00",
+                "<" -> "2016-01-01 02:00:00"
+              ).foreach { case (comparison, value) =>
+                val query =
+                  s"select ts from $table where ts $comparison '$value'"
+                val countWithFilter = spark.sql(query).count()
+                assert(countWithFilter === 4, query)
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-12297: exception on bad timezone") {
+    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    val badTzException = intercept[AnalysisException] {
+      spark.sql(
+        s"""CREATE TABLE bad_tz_table (
+              |  x int
+              | )
+              | STORED AS PARQUET
+              | TBLPROPERTIES ($key="Blart Versenwald III")
+            """.stripMargin)
+    }
+    assert(badTzException.getMessage.contains("Blart Versenwald III"))
+  }
 }