diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index be8c5c2c1522e6a324174b2834c4bfb96ddec378..22664b419f5cb73af74f6cc7b9cd5fa732f0db3f 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -580,6 +580,15 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
     flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
   </td>
 </tr>
+<tr>
+  <td><code>spark.sql.parquet.int96AsTimestamp</code></td>
+  <td>true</td>
+  <td>
+    Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also
+    store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This
+    flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
+  </td>
+</tr>
 <tr>
   <td><code>spark.sql.parquet.cacheMetadata</code></td>
   <td>true</td>
diff --git a/pom.xml b/pom.xml
index e25eced87757800ebe925fffa57ecd775f4a8582..542efbaf06eb02540914381b4a083ef78d8d644e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,7 @@
     <scala.binary.version>2.10</scala.binary.version>
     <jline.version>${scala.version}</jline.version>
     <jline.groupid>org.scala-lang</jline.groupid>
+    <jodd.version>3.6.3</jodd.version>
     <codehaus.jackson.version>1.8.8</codehaus.jackson.version>
     <snappy.version>1.1.1.6</snappy.version>
 
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 1a0c77d282307e40c07e11855dfa3dcfc51c48ac..03a5c9e7c24a05ed9adef99e1f921224e2fa3da6 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -66,6 +66,11 @@
       <artifactId>jackson-databind</artifactId>
       <version>2.3.0</version>
     </dependency>
+    <dependency>
+      <groupId>org.jodd</groupId>
+      <artifactId>jodd-core</artifactId>
+      <version>${jodd.version}</version>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 561a91d2d60eeea916307faf3c72a1ae8eee33dc..7fe17944a734e2ffcffcc584dcc99564e813f99f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -33,6 +33,7 @@ private[spark] object SQLConf {
   val DIALECT = "spark.sql.dialect"
 
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
+  val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
   val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
   val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
   val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
@@ -143,6 +144,12 @@ private[sql] class SQLConf extends Serializable {
   private[spark] def isParquetBinaryAsString: Boolean =
     getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
 
+  /**
+   * When set to true, we always treat INT96Values in Parquet files as timestamp.
+   */
+  private[spark] def isParquetINT96AsTimestamp: Boolean =
+    getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean
+
   /**
    * When set to true, partition pruning for in-memory columnar tables is enabled.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 10df8c3310092cdbffa177755da88968a05290c4..d87ddfeabda77d28f2e234f4e26b7eef0d244ffe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -17,8 +17,12 @@
 
 package org.apache.spark.sql.parquet
 
+import java.sql.Timestamp
+import java.util.{TimeZone, Calendar}
+
 import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
 
+import jodd.datetime.JDateTime
 import parquet.column.Dictionary
 import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
 import parquet.schema.MessageType
@@ -26,6 +30,7 @@ import parquet.schema.MessageType
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.parquet.CatalystConverter.FieldType
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.parquet.timestamp.NanoTime
 
 /**
  * Collection of converters of Parquet types (group and primitive types) that
@@ -123,6 +128,12 @@ private[sql] object CatalystConverter {
             parent.updateDecimal(fieldIndex, value, d)
         }
       }
+      case TimestampType => {
+        new CatalystPrimitiveConverter(parent, fieldIndex) {
+          override def addBinary(value: Binary): Unit =
+            parent.updateTimestamp(fieldIndex, value)
+        }
+      }
       // All other primitive types use the default converter
       case ctype: PrimitiveType => { // note: need the type tag here!
         new CatalystPrimitiveConverter(parent, fieldIndex)
@@ -197,9 +208,11 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
   protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
     updateField(fieldIndex, value)
 
-  protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
+  protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
+    updateField(fieldIndex, readTimestamp(value))
+
+  protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
     updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
-  }
 
   protected[parquet] def isRootConverter: Boolean = parent == null
 
@@ -232,6 +245,13 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
     unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
     dest.set(unscaled, precision, scale)
   }
+
+  /**
+   * Read a Timestamp value from a Parquet Int96Value
+   */
+  protected[parquet] def readTimestamp(value: Binary): Timestamp = {
+    CatalystTimestampConverter.convertToTimestamp(value)
+  }
 }
 
 /**
@@ -384,6 +404,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
   override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
     current.setString(fieldIndex, value)
 
+  override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
+    current.update(fieldIndex, readTimestamp(value))
+
   override protected[parquet] def updateDecimal(
       fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
     var decimal = current(fieldIndex).asInstanceOf[Decimal]
@@ -454,6 +477,73 @@ private[parquet] object CatalystArrayConverter {
   val INITIAL_ARRAY_SIZE = 20
 }
 
+private[parquet] object CatalystTimestampConverter {
+  // TODO most part of this comes from Hive-0.14
+  // Hive code might have some issues, so we need to keep an eye on it.
+  // Also we use NanoTime and Int96Values from parquet-examples.
+  // We utilize jodd to convert between NanoTime and Timestamp
+  val parquetTsCalendar = new ThreadLocal[Calendar]
+  def getCalendar = {
+    // this is a cache for the calendar instance.
+    if (parquetTsCalendar.get == null) {
+      parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
+    }
+    parquetTsCalendar.get
+  }
+  val NANOS_PER_SECOND: Long = 1000000000
+  val SECONDS_PER_MINUTE: Long = 60
+  val MINUTES_PER_HOUR: Long = 60
+  val NANOS_PER_MILLI: Long = 1000000
+
+  def convertToTimestamp(value: Binary): Timestamp = {
+    val nt = NanoTime.fromBinary(value)
+    val timeOfDayNanos = nt.getTimeOfDayNanos
+    val julianDay = nt.getJulianDay
+    val jDateTime = new JDateTime(julianDay.toDouble)
+    val calendar = getCalendar
+    calendar.set(Calendar.YEAR, jDateTime.getYear)
+    calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
+    calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)
+
+    // written in command style
+    var remainder = timeOfDayNanos
+    calendar.set(
+      Calendar.HOUR_OF_DAY,
+      (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
+    remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
+    calendar.set(
+      Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
+    remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
+    calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
+    val nanos = remainder % NANOS_PER_SECOND
+    val ts = new Timestamp(calendar.getTimeInMillis)
+    ts.setNanos(nanos.toInt)
+    ts
+  }
+
+  def convertFromTimestamp(ts: Timestamp): Binary = {
+    val calendar = getCalendar
+    calendar.setTime(ts)
+    val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
+      calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
+    // Hive-0.14 didn't set hour before get day number, while the day number should
+    // has something to do with hour, since julian day number grows at 12h GMT
+    // here we just follow what hive does.
+    val julianDay = jDateTime.getJulianDayNumber
+
+    val hour = calendar.get(Calendar.HOUR_OF_DAY)
+    val minute = calendar.get(Calendar.MINUTE)
+    val second = calendar.get(Calendar.SECOND)
+    val nanos = ts.getNanos
+    // Hive-0.14 would use hours directly, that might be wrong, since the day starts
+    // from 12h in Julian. here we just follow what hive does.
+    val nanosOfDay = nanos + second * NANOS_PER_SECOND +
+      minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
+      hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
+    NanoTime(julianDay, nanosOfDay).toBinary
+  }
+}
+
 /**
  * A `parquet.io.api.GroupConverter` that converts a single-element groups that
  * match the characteristics of an array (see
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index a54485e719dadfdbae5dc8e264b99cc44cdd1176..b0db9943a506c5abf063b874c6d23547872bd1b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -65,8 +65,8 @@ private[sql] case class ParquetRelation(
     ParquetTypesConverter.readSchemaFromFile(
       new Path(path.split(",").head),
       conf,
-      sqlContext.conf.isParquetBinaryAsString)
-
+      sqlContext.conf.isParquetBinaryAsString,
+      sqlContext.conf.isParquetINT96AsTimestamp)
   lazy val attributeMap = AttributeMap(output.map(o => o -> o))
 
   override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index fd63ad8144064bc73eebc53efac92846eb32464c..3fb1cc410521ecfa13f3d89db62c2f195d547885 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -83,7 +83,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
     // TODO: Why it can be null?
     if (schema == null)  {
       log.debug("falling back to Parquet read schema")
-      schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
+      schema = ParquetTypesConverter.convertToAttributes(
+        parquetSchema, false, true)
     }
     log.debug(s"list of attributes that will be read: $schema")
     new RowRecordMaterializer(parquetSchema, schema)
@@ -184,12 +185,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
         case t @ StructType(_) => writeStruct(
           t,
           value.asInstanceOf[CatalystConverter.StructScalaType[_]])
-        case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value)
+        case _ => writePrimitive(schema.asInstanceOf[NativeType], value)
       }
     }
   }
 
-  private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
+  private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
     if (value != null) {
       schema match {
         case StringType => writer.addBinary(
@@ -202,6 +203,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
         case IntegerType => writer.addInteger(value.asInstanceOf[Int])
         case ShortType => writer.addInteger(value.asInstanceOf[Short])
         case LongType => writer.addLong(value.asInstanceOf[Long])
+        case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
         case ByteType => writer.addInteger(value.asInstanceOf[Byte])
         case DoubleType => writer.addDouble(value.asInstanceOf[Double])
         case FloatType => writer.addFloat(value.asInstanceOf[Float])
@@ -307,6 +309,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
     writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
   }
 
+  private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
+    val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
+    writer.addBinary(binaryNanoTime)
+  }
 }
 
 // Optimized for non-nested rows
@@ -351,6 +357,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
       case DoubleType => writer.addDouble(record.getDouble(index))
       case FloatType => writer.addFloat(record.getFloat(index))
       case BooleanType => writer.addBoolean(record.getBoolean(index))
+      case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
       case d: DecimalType =>
         if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
           sys.error(s"Unsupported datatype $d, cannot write to consumer")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index d5993656e02256b24d373164083f44ba89eea327..e4a10aa2ae6c36a3e7c395716e379c2052afeaf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.sql.test.TestSQLContext
 
 import parquet.example.data.{GroupWriter, Group}
-import parquet.example.data.simple.SimpleGroup
+import parquet.example.data.simple.{NanoTime, SimpleGroup}
 import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
 import parquet.hadoop.api.WriteSupport
 import parquet.hadoop.api.WriteSupport.WriteContext
@@ -63,6 +63,7 @@ private[sql] object ParquetTestData {
       optional int64 mylong;
       optional float myfloat;
       optional double mydouble;
+      optional int96 mytimestamp;
       }"""
 
   // field names for test assertion error messages
@@ -72,7 +73,8 @@ private[sql] object ParquetTestData {
     "mystring:String",
     "mylong:Long",
     "myfloat:Float",
-    "mydouble:Double"
+    "mydouble:Double",
+    "mytimestamp:Timestamp"
   )
 
   val subTestSchema =
@@ -98,6 +100,7 @@ private[sql] object ParquetTestData {
       optional int64 myoptlong;
       optional float myoptfloat;
       optional double myoptdouble;
+      optional int96 mytimestamp;
       }
     """
 
@@ -236,6 +239,7 @@ private[sql] object ParquetTestData {
       record.add(3, i.toLong << 33)
       record.add(4, 2.5F)
       record.add(5, 4.5D)
+      record.add(6, new NanoTime(1,2))
       writer.write(record)
     }
     writer.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 6d8c682ccced820c670b53b21bc3134042efc08b..f1d4ff2387709c3702ad5fa3e259a49edc669dd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -54,7 +54,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
 
   def toPrimitiveDataType(
       parquetType: ParquetPrimitiveType,
-      binaryAsString: Boolean): DataType = {
+      binaryAsString: Boolean,
+      int96AsTimestamp: Boolean): DataType = {
     val originalType = parquetType.getOriginalType
     val decimalInfo = parquetType.getDecimalMetadata
     parquetType.getPrimitiveTypeName match {
@@ -66,6 +67,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
       case ParquetPrimitiveTypeName.FLOAT => FloatType
       case ParquetPrimitiveTypeName.INT32 => IntegerType
       case ParquetPrimitiveTypeName.INT64 => LongType
+      case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
       case ParquetPrimitiveTypeName.INT96 =>
         // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
         sys.error("Potential loss of precision: cannot convert INT96")
@@ -103,7 +105,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
    * @param parquetType The type to convert.
    * @return The corresponding Catalyst type.
    */
-  def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
+  def toDataType(parquetType: ParquetType,
+                 isBinaryAsString: Boolean,
+                 isInt96AsTimestamp: Boolean): DataType = {
     def correspondsToMap(groupType: ParquetGroupType): Boolean = {
       if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
         false
@@ -125,7 +129,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
 
     if (parquetType.isPrimitive) {
-      toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
+      toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString, isInt96AsTimestamp)
     } else {
       val groupType = parquetType.asGroupType()
       parquetType.getOriginalType match {
@@ -137,9 +141,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
           if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
             val bag = field.asGroupType()
             assert(bag.getFieldCount == 1)
-            ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
+            ArrayType(
+              toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
+              containsNull = true)
           } else {
-            ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+            ArrayType(
+              toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
           }
         }
         case ParquetOriginalType.MAP => {
@@ -152,8 +159,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
             "Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
           assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
 
-          val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
-          val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
+          val keyType =
+            toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
+          val valueType =
+            toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
           MapType(keyType, valueType,
             keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
         }
@@ -163,8 +172,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
             val keyValueGroup = groupType.getFields.apply(0).asGroupType()
             assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
 
-            val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
-            val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
+            val keyType =
+              toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
+            val valueType =
+              toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
             MapType(keyType, valueType,
               keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
           } else if (correspondsToArray(groupType)) { // ArrayType
@@ -172,16 +183,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
             if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
               val bag = field.asGroupType()
               assert(bag.getFieldCount == 1)
-              ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
+              ArrayType(
+                toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
+                containsNull = true)
             } else {
-              ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+              ArrayType(
+                toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
             }
           } else { // everything else: StructType
             val fields = groupType
               .getFields
               .map(ptype => new StructField(
               ptype.getName,
-              toDataType(ptype, isBinaryAsString),
+              toDataType(ptype, isBinaryAsString, isInt96AsTimestamp),
               ptype.getRepetition != Repetition.REQUIRED))
             StructType(fields)
           }
@@ -210,6 +224,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
     case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
     case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
+    case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
     case DecimalType.Fixed(precision, scale) if precision <= 18 =>
       // TODO: for now, our writer only supports decimals that fit in a Long
       Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
@@ -345,7 +360,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
   }
 
-  def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = {
+  def convertToAttributes(parquetSchema: ParquetType,
+                          isBinaryAsString: Boolean,
+                          isInt96AsTimestamp: Boolean): Seq[Attribute] = {
     parquetSchema
       .asGroupType()
       .getFields
@@ -353,7 +370,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         field =>
           new AttributeReference(
             field.getName,
-            toDataType(field, isBinaryAsString),
+            toDataType(field, isBinaryAsString, isInt96AsTimestamp),
             field.getRepetition != Repetition.REQUIRED)())
   }
 
@@ -476,7 +493,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
   def readSchemaFromFile(
       origPath: Path,
       conf: Option[Configuration],
-      isBinaryAsString: Boolean): Seq[Attribute] = {
+      isBinaryAsString: Boolean,
+      isInt96AsTimestamp: Boolean): Seq[Attribute] = {
     val keyValueMetadata: java.util.Map[String, String] =
       readMetaData(origPath, conf)
         .getFileMetaData
@@ -485,7 +503,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
       convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
     } else {
       val attributes = convertToAttributes(
-        readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
+        readMetaData(origPath, conf).getFileMetaData.getSchema,
+        isBinaryAsString,
+        isInt96AsTimestamp)
       log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
       attributes
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1e794cad73936bad949273324ebf925c2633bd65..179c0d6b2223985537c647fb97f2cf6a54b90f2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -136,7 +136,8 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
     ParquetTypesConverter.readSchemaFromFile(
       partitions.head.files.head.getPath,
       Some(sparkContext.hadoopConfiguration),
-      sqlContext.conf.isParquetBinaryAsString))
+      sqlContext.conf.isParquetBinaryAsString,
+      sqlContext.conf.isParquetINT96AsTimestamp))
 
   val dataIncludesKey =
     partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
new file mode 100644
index 0000000000000000000000000000000000000000..887161684429fbadef600fa87821c08fe5662786
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet.timestamp
+
+import java.nio.{ByteBuffer, ByteOrder}
+
+import parquet.Preconditions
+import parquet.io.api.{Binary, RecordConsumer}
+
+private[parquet] class NanoTime extends Serializable {
+  private var julianDay = 0
+  private var timeOfDayNanos = 0L
+
+  def set(julianDay: Int, timeOfDayNanos: Long) = {
+    this.julianDay = julianDay
+    this.timeOfDayNanos = timeOfDayNanos
+    this
+  }
+
+  def getJulianDay: Int = julianDay
+
+  def getTimeOfDayNanos: Long = timeOfDayNanos
+
+  def toBinary: Binary = {
+    val buf = ByteBuffer.allocate(12)
+    buf.order(ByteOrder.LITTLE_ENDIAN)
+    buf.putLong(timeOfDayNanos)
+    buf.putInt(julianDay)
+    buf.flip()
+    Binary.fromByteBuffer(buf)
+  }
+
+  def writeValue(recordConsumer: RecordConsumer) {
+    recordConsumer.addBinary(toBinary)
+  }
+
+  override def toString =
+    "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
+}
+
+object NanoTime {
+  def fromBinary(bytes: Binary): NanoTime = {
+    Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
+    val buf = bytes.toByteBuffer
+    buf.order(ByteOrder.LITTLE_ENDIAN)
+    val timeOfDayNanos = buf.getLong
+    val julianDay = buf.getInt
+    new NanoTime().set(julianDay, timeOfDayNanos)
+  }
+
+  def apply(julianDay: Int, timeOfDayNanos: Long): NanoTime = {
+    new NanoTime().set(julianDay, timeOfDayNanos)
+  }
+}