From 9559ac5f74434cf4bf611bdcde9a216d39799826 Mon Sep 17 00:00:00 2001
From: Anatoliy Plastinin <anatoliy.plastinin@gmail.com>
Date: Mon, 11 Jan 2016 10:28:57 -0800
Subject: [PATCH] [SPARK-12744][SQL] Change parsing JSON integers to timestamps
 to treat integers as number of seconds

JIRA: https://issues.apache.org/jira/browse/SPARK-12744

This PR makes parsing JSON integers to timestamps consistent with casting behavior.

Author: Anatoliy Plastinin <anatoliy.plastinin@gmail.com>

Closes #10687 from antlypls/fix-json-timestamp-parsing.
---
 .../datasources/json/JacksonParser.scala        |  2 +-
 .../execution/datasources/json/JsonSuite.scala  | 17 +++++++++++++++--
 .../datasources/json/TestJsonData.scala         |  4 ++++
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 2e3fe3da15..b2f5c1e964 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -90,7 +90,7 @@ object JacksonParser {
         DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
 
       case (VALUE_NUMBER_INT, TimestampType) =>
-        parser.getLongValue * 1000L
+        parser.getLongValue * 1000000L
 
       case (_, StringType) =>
         val writer = new ByteArrayOutputStream()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index b3b6b7df0c..4ab148065a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -83,9 +83,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val doubleNumber: Double = 1.7976931348623157E308d
     checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
 
-    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber * 1000L)),
         enforceCorrectType(intNumber, TimestampType))
-    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
         enforceCorrectType(intNumber.toLong, TimestampType))
     val strTime = "2014-09-30 12:34:56"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
@@ -1465,4 +1465,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
   }
 
+  test("Casting long as timestamp") {
+    withTempTable("jsonTable") {
+      val schema = (new StructType).add("ts", TimestampType)
+      val jsonDF = sqlContext.read.schema(schema).json(timestampAsLong)
+
+      jsonDF.registerTempTable("jsonTable")
+
+      checkAnswer(
+        sql("select ts from jsonTable"),
+        Row(java.sql.Timestamp.valueOf("2016-01-02 03:04:05"))
+      )
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index cb61f7eeca..a0836058d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -205,6 +205,10 @@ private[json] trait TestJsonData {
         """{"b": [{"c": {}}]}""" ::
         """]""" :: Nil)
 
+  def timestampAsLong: RDD[String] =
+    sqlContext.sparkContext.parallelize(
+      """{"ts":1451732645}""" :: Nil)
+
   lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
   def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
-- 
GitLab