diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 0f27fd13e7379559aea12741ecb2f7cee3a2d888..fbc2965e61e9259ea2e686945022a5ba518307cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.json
 import scala.collection.Map
 import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
 import scala.math.BigDecimal
+import java.sql.Timestamp
 
 import com.fasterxml.jackson.databind.ObjectMapper
 
@@ -361,6 +362,14 @@ private[sql] object JsonRDD extends Logging {
     }
   }
 
+  private def toTimestamp(value: Any): Timestamp = {
+    value match {
+        case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
+        case value: java.lang.Long => new Timestamp(value)
+        case value: java.lang.String => Timestamp.valueOf(value)
+      }
+    }  
+
   private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
     if (value == null) {
       null
@@ -377,6 +386,7 @@ private[sql] object JsonRDD extends Logging {
         case ArrayType(elementType, _) =>
           value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
         case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
+        case TimestampType => toTimestamp(value)
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 685e788207725faf5e1abfa6b1422f3146d326c5..3cfcb2b1aa99322593a2f421bce374b9d932f05b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.TestSQLContext._
 
+import java.sql.Timestamp
+
 class JsonSuite extends QueryTest {
   import TestJsonData._
   TestJsonData
@@ -50,6 +52,12 @@ class JsonSuite extends QueryTest {
     val doubleNumber: Double = 1.7976931348623157E308d
     checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
     checkTypePromotion(BigDecimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType))
+    
+    checkTypePromotion(new Timestamp(intNumber), enforceCorrectType(intNumber, TimestampType))
+    checkTypePromotion(new Timestamp(intNumber.toLong), 
+        enforceCorrectType(intNumber.toLong, TimestampType))
+    val strDate = "2014-09-30 12:34:56"
+    checkTypePromotion(Timestamp.valueOf(strDate), enforceCorrectType(strDate, TimestampType))
   }
 
   test("Get compatible type") {