diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 0f2dcdcacf0cad74797d578d6a7358e7cbb9ab72..d9d7a3fea3963718964e932c273310936097fad6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.json
 
 import org.apache.spark.sql.catalyst.types.decimal.Decimal
+import org.apache.spark.sql.types.util.DataTypeConversions
 
 import scala.collection.Map
 import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
@@ -378,7 +379,7 @@ private[sql] object JsonRDD extends Logging {
   private def toDate(value: Any): Date = {
     value match {
       // only support string as date
-      case value: java.lang.String => Date.valueOf(value)
+      case value: java.lang.String => new Date(DataTypeConversions.stringToTime(value).getTime)
     }
   }
 
@@ -386,7 +387,7 @@ private[sql] object JsonRDD extends Logging {
     value match {
       case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
       case value: java.lang.Long => new Timestamp(value)
-      case value: java.lang.String => Timestamp.valueOf(value)
+      case value: java.lang.String => toTimestamp(DataTypeConversions.stringToTime(value).getTime)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
index 9aad7b3df4eed75d1a675cb5872deaab34ae5a93..d4258156f18f6a8d36bb0462d007c1a2247a7040 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.types.util
 
+import java.text.SimpleDateFormat
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
@@ -129,6 +131,34 @@ protected[sql] object DataTypeConversions {
       StructType(structType.getFields.map(asScalaStructField))
   }
 
+  def stringToTime(s: String): java.util.Date = {
+    if (!s.contains('T')) {
+      // JDBC escape string
+      if (s.contains(' ')) {
+        java.sql.Timestamp.valueOf(s)
+      } else {
+        java.sql.Date.valueOf(s)
+      }
+    } else if (s.endsWith("Z")) {
+      // this is zero timezone of ISO8601
+      stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
+    } else if (s.indexOf("GMT") == -1) {
+      // timezone with ISO8601
+      val inset = "+00.00".length
+      val s0 = s.substring(0, s.length - inset)
+      val s1 = s.substring(s.length - inset, s.length)
+      if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
+        stringToTime(s0 + "GMT" + s1)
+      } else {
+        stringToTime(s0 + ".0GMT" + s1)
+      }
+    } else {
+      // ISO8601 with GMT insert
+      val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
+      ISO8601GMT.parse(s)
+    }
+  }
+
   /** Converts Java objects to catalyst rows / types */
   def convertJavaToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
     case (obj, udt: UserDefinedType[_]) => ScalaReflection.convertToCatalyst(obj, udt) // Scala type
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index cade244f7ac395c54d9c74bc04bacd943ac06d31..f8ca2c773d9ab527a0fa70fff43087f8db8a86d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -66,6 +66,13 @@ class JsonSuite extends QueryTest {
 
     val strDate = "2014-10-15"
     checkTypePromotion(Date.valueOf(strDate), enforceCorrectType(strDate, DateType))
+
+    val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
+    checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType))
+    checkTypePromotion(new Date(3601000), enforceCorrectType(ISO8601Time1, DateType))
+    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
+    checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType))
+    checkTypePromotion(new Date(10801000), enforceCorrectType(ISO8601Time2, DateType))
   }
 
   test("Get compatible type") {