From 2f493f7e3924b769160a16f73cccbebf21973b91 Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Tue, 25 Aug 2015 16:00:44 +0800
Subject: [PATCH] [SPARK-10177] [SQL] fix reading Timestamp in parquet from
 Hive

We misunderstood the Julian days and nanoseconds of the day in parquet (as TimestampType) from Hive/Impala, they are overlapped, so can't be added together directly.

In order to avoid the confusing rounding when do the converting, we use `2440588` as the Julian Day of epoch of unix timestamp (which should be 2440587.5).

Author: Davies Liu <davies@databricks.com>
Author: Cheng Lian <lian@databricks.com>

Closes #8400 from davies/timestamp_parquet.
---
 .../spark/sql/catalyst/util/DateTimeUtils.scala     |  7 ++++---
 .../sql/catalyst/util/DateTimeUtilsSuite.scala      | 13 +++++++++----
 .../sql/hive/ParquetHiveCompatibilitySuite.scala    |  2 +-
 3 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 672620460c..d652fce3fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -37,7 +37,8 @@ object DateTimeUtils {
   type SQLTimestamp = Long
 
   // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
-  final val JULIAN_DAY_OF_EPOCH = 2440587  // and .5
+  // it's 2440587.5, rounding up to compatible with Hive
+  final val JULIAN_DAY_OF_EPOCH = 2440588
   final val SECONDS_PER_DAY = 60 * 60 * 24L
   final val MICROS_PER_SECOND = 1000L * 1000L
   final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
@@ -183,7 +184,7 @@ object DateTimeUtils {
    */
   def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
     // use Long to avoid rounding errors
-    val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
+    val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
     seconds * MICROS_PER_SECOND + nanoseconds / 1000L
   }
 
@@ -191,7 +192,7 @@ object DateTimeUtils {
    * Returns Julian day and nanoseconds in a day from the number of microseconds
    */
   def toJulianDay(us: SQLTimestamp): (Int, Long) = {
-    val seconds = us / MICROS_PER_SECOND + SECONDS_PER_DAY / 2
+    val seconds = us / MICROS_PER_SECOND
     val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
     val secondsInDay = seconds % SECONDS_PER_DAY
     val nanos = (us % MICROS_PER_SECOND) * 1000L
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index d18fa4df13..1596bb79fa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -49,13 +49,18 @@ class DateTimeUtilsSuite extends SparkFunSuite {
   test("us and julian day") {
     val (d, ns) = toJulianDay(0)
     assert(d === JULIAN_DAY_OF_EPOCH)
-    assert(ns === SECONDS_PER_DAY / 2 * NANOS_PER_SECOND)
+    assert(ns === 0)
     assert(fromJulianDay(d, ns) == 0L)
 
-    val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
+    val t = Timestamp.valueOf("2015-06-11 10:10:10.100")
     val (d1, ns1) = toJulianDay(fromJavaTimestamp(t))
-    val t2 = toJavaTimestamp(fromJulianDay(d1, ns1))
-    assert(t.equals(t2))
+    val t1 = toJavaTimestamp(fromJulianDay(d1, ns1))
+    assert(t.equals(t1))
+
+    val t2 = Timestamp.valueOf("2015-06-11 20:10:10.100")
+    val (d2, ns2) = toJulianDay(fromJavaTimestamp(t2))
+    val t22 = toJavaTimestamp(fromJulianDay(d2, ns2))
+    assert(t2.equals(t22))
   }
 
   test("SPARK-6785: java date conversion before and after epoch") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index bc30180cf0..91d7a48208 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -113,7 +113,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with Before
       "BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "STRING")
   }
 
-  ignore("SPARK-10177 timestamp") {
+  test("SPARK-10177 timestamp") {
     testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 00:31:00")), "TIMESTAMP")
   }
 
-- 
GitLab