From 1646f89d967913ee1f231d9606f8502d13c25804 Mon Sep 17 00:00:00 2001
From: Daoyuan Wang <daoyuan.wang@intel.com>
Date: Mon, 2 Feb 2015 15:49:22 -0800
Subject: [PATCH] [SPARK-4508] [SQL] build native date type to conform behavior
 to Hive

Store daysSinceEpoch as an Int value(4 bytes) to represent DateType, instead of using java.sql.Date(8 bytes as Long) in catalyst row. This ensures the same comparison behavior of Hive and Catalyst.
Subsumes #3381
I thinks there are already some tests in JavaSQLSuite, and for python it will not affect python's datetime class.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #3732 from adrian-wang/datenative and squashes the following commits:

0ed0fdc [Daoyuan Wang] fix test data
a2fdd4e [Daoyuan Wang] getDate
c37832b [Daoyuan Wang] row to catalyst
f0005b1 [Daoyuan Wang] add date in sql parser and java type conversion
024c9a6 [Daoyuan Wang] clean some import order
d6715fc [Daoyuan Wang] refactoring Date as Primitive Int internally
374abd5 [Daoyuan Wang] spark native date type support
---
 .../main/scala/org/apache/spark/sql/Row.scala |  2 +-
 .../spark/sql/catalyst/ScalaReflection.scala  |  9 +--
 .../apache/spark/sql/catalyst/SqlParser.scala |  2 +
 .../spark/sql/catalyst/expressions/Cast.scala | 53 +++++++---------
 .../expressions/codegen/CodeGenerator.scala   |  3 +
 .../sql/catalyst/expressions/literals.scala   |  2 +-
 .../apache/spark/sql/types/DateUtils.scala    | 60 +++++++++++++++++++
 .../apache/spark/sql/types/dataTypes.scala    | 12 ++--
 .../ExpressionEvaluationSuite.scala           | 28 ++++-----
 .../spark/sql/types/DataTypeSuite.scala       |  2 +-
 .../spark/sql/columnar/ColumnStats.scala      | 19 +-----
 .../spark/sql/columnar/ColumnType.scala       | 13 ++--
 .../spark/sql/execution/pythonUdfs.scala      |  4 +-
 .../org/apache/spark/sql/json/JsonRDD.scala   |  6 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala  |  7 +++
 .../sql/ScalaReflectionRelationSuite.scala    |  3 +-
 .../spark/sql/columnar/ColumnStatsSuite.scala |  2 +-
 .../spark/sql/columnar/ColumnTypeSuite.scala  |  6 +-
 .../sql/columnar/ColumnarTestUtils.scala      |  6 +-
 .../org/apache/spark/sql/json/JsonSuite.scala |  7 ++-
 .../execution/HiveCompatibilitySuite.scala    |  1 +
 .../apache/spark/sql/hive/HiveContext.scala   |  4 +-
 .../spark/sql/hive/HiveInspectors.scala       | 11 +++-
 .../apache/spark/sql/hive/TableReader.scala   |  3 +-
 ...te cast-0-a7cd69b80c77a771a2c955db666be53d |  1 +
 ... test 1-0-bde89be08a12361073ff658fef768b7e |  1 +
 ... test 2-0-dc1b267f1d79d49e6675afe4fd2a34a5 |  1 +
 .../date_1-0-50131c0ba7b7a6b65c789a5a8497bada |  1 +
 ...date_1-1-23edf29bf7376c70d5ecf12720f4b1eb} |  0
 ...ate_1-10-df16364a220ff96a6ea1cd478cbc1d0b} |  0
 ...ate_1-11-d964bec7e5632091ab5cb6f6786dbbf9} |  0
 ...ate_1-12-480c5f024a28232b7857be327c992509} |  0
 ...ate_1-13-4c0ed7fcb75770d8790575b586bf14f4} |  0
 ...date_1-14-44fc74c1993062c0a9522199ff27fea} |  0
 ...ate_1-15-4855a66124b16d1d0d003235995ac06b} |  0
 ...ate_1-16-8bc190dba0f641840b5e1e198a14c55b} |  0
 ...ate_1-17-23edf29bf7376c70d5ecf12720f4b1eb} |  0
 ...date_1-2-4ebe3571c13a8b0c03096fbd972b7f1b} |  0
 ... date_1-3-26b5c291400dfde455b3c1b878b71d0} |  0
 ...date_1-4-df16364a220ff96a6ea1cd478cbc1d0b} |  0
 ...date_1-5-d964bec7e5632091ab5cb6f6786dbbf9} |  0
 ...date_1-6-559d01fb0b42c42f0c4927fa0f9deac4} |  0
 ...date_1-7-df16364a220ff96a6ea1cd478cbc1d0b} |  0
 ...date_1-8-d964bec7e5632091ab5cb6f6786dbbf9} |  0
 ...date_1-9-8306558e0eabe936ac33dabaaa17fea4} |  0
 .../spark/sql/hive/HiveInspectorSuite.scala   |  4 +-
 .../sql/hive/execution/HiveQuerySuite.scala   | 26 +++++++-
 .../org/apache/spark/sql/hive/Shim12.scala    |  2 +-
 .../org/apache/spark/sql/hive/Shim13.scala    |  2 +-
 49 files changed, 191 insertions(+), 112 deletions(-)
 create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
 create mode 100644 sql/hive/src/test/resources/golden/Date cast-0-a7cd69b80c77a771a2c955db666be53d
 create mode 100644 sql/hive/src/test/resources/golden/Date comparison test 1-0-bde89be08a12361073ff658fef768b7e
 create mode 100644 sql/hive/src/test/resources/golden/Date comparison test 2-0-dc1b267f1d79d49e6675afe4fd2a34a5
 create mode 100644 sql/hive/src/test/resources/golden/date_1-0-50131c0ba7b7a6b65c789a5a8497bada
 rename sql/hive/src/test/resources/golden/{date_1-0-23edf29bf7376c70d5ecf12720f4b1eb => date_1-1-23edf29bf7376c70d5ecf12720f4b1eb} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-3-df16364a220ff96a6ea1cd478cbc1d0b => date_1-10-df16364a220ff96a6ea1cd478cbc1d0b} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-10-d964bec7e5632091ab5cb6f6786dbbf9 => date_1-11-d964bec7e5632091ab5cb6f6786dbbf9} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-11-480c5f024a28232b7857be327c992509 => date_1-12-480c5f024a28232b7857be327c992509} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-12-4c0ed7fcb75770d8790575b586bf14f4 => date_1-13-4c0ed7fcb75770d8790575b586bf14f4} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-13-44fc74c1993062c0a9522199ff27fea => date_1-14-44fc74c1993062c0a9522199ff27fea} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-14-4855a66124b16d1d0d003235995ac06b => date_1-15-4855a66124b16d1d0d003235995ac06b} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-15-8bc190dba0f641840b5e1e198a14c55b => date_1-16-8bc190dba0f641840b5e1e198a14c55b} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-1-4ebe3571c13a8b0c03096fbd972b7f1b => date_1-17-23edf29bf7376c70d5ecf12720f4b1eb} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-16-23edf29bf7376c70d5ecf12720f4b1eb => date_1-2-4ebe3571c13a8b0c03096fbd972b7f1b} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-2-abdce0c0d14d3fc7441b7c134b02f99a => date_1-3-26b5c291400dfde455b3c1b878b71d0} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-6-df16364a220ff96a6ea1cd478cbc1d0b => date_1-4-df16364a220ff96a6ea1cd478cbc1d0b} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-4-d964bec7e5632091ab5cb6f6786dbbf9 => date_1-5-d964bec7e5632091ab5cb6f6786dbbf9} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-5-5e70fc74158fbfca38134174360de12d => date_1-6-559d01fb0b42c42f0c4927fa0f9deac4} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-9-df16364a220ff96a6ea1cd478cbc1d0b => date_1-7-df16364a220ff96a6ea1cd478cbc1d0b} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-7-d964bec7e5632091ab5cb6f6786dbbf9 => date_1-8-d964bec7e5632091ab5cb6f6786dbbf9} (100%)
 rename sql/hive/src/test/resources/golden/{date_1-8-1d5c58095cd52ea539d869f2ab1ab67d => date_1-9-8306558e0eabe936ac33dabaaa17fea4} (100%)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 41bb4f012f..3a70d25534 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import scala.util.hashing.MurmurHash3
 
 import org.apache.spark.sql.catalyst.expressions.GenericRow
-
+import org.apache.spark.sql.types.DateUtils
 
 object Row {
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index e0db587efb..8e79e532ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.catalyst
 
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 
-
 /**
  * A default version of ScalaReflection that uses the runtime universe.
  */
@@ -72,6 +71,7 @@ trait ScalaReflection {
         }.toArray)
     case (d: BigDecimal, _) => Decimal(d)
     case (d: java.math.BigDecimal, _) => Decimal(d)
+    case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d)
     case (other, _) => other
   }
 
@@ -85,6 +85,7 @@ trait ScalaReflection {
     }
     case (r: Row, s: StructType) => convertRowToScala(r, s)
     case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal
+    case (i: Int, DateType) => DateUtils.toJavaDate(i)
     case (other, _) => other
   }
 
@@ -159,7 +160,7 @@ trait ScalaReflection {
           valueDataType, valueContainsNull = valueNullable), nullable = true)
       case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
       case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
-      case t if t <:< typeOf[Date] => Schema(DateType, nullable = true)
+      case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
       case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
       case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
       case t if t <:< typeOf[Decimal] => Schema(DecimalType.Unlimited, nullable = true)
@@ -191,7 +192,7 @@ trait ScalaReflection {
     case obj: LongType.JvmType => LongType
     case obj: FloatType.JvmType => FloatType
     case obj: DoubleType.JvmType => DoubleType
-    case obj: DateType.JvmType => DateType
+    case obj: java.sql.Date => DateType
     case obj: java.math.BigDecimal => DecimalType.Unlimited
     case obj: Decimal => DecimalType.Unlimited
     case obj: TimestampType.JvmType => TimestampType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 594a423146..2ce8be8e24 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -52,6 +52,7 @@ class SqlParser extends AbstractSparkSQLParser {
   protected val CAST = Keyword("CAST")
   protected val COALESCE = Keyword("COALESCE")
   protected val COUNT = Keyword("COUNT")
+  protected val DATE = Keyword("DATE")
   protected val DECIMAL = Keyword("DECIMAL")
   protected val DESC = Keyword("DESC")
   protected val DISTINCT = Keyword("DISTINCT")
@@ -383,6 +384,7 @@ class SqlParser extends AbstractSparkSQLParser {
     | DOUBLE ^^^ DoubleType
     | fixedDecimalType
     | DECIMAL ^^^ DecimalType.Unlimited
+    | DATE ^^^ DateType
     )
 
   protected lazy val fixedDecimalType: Parser[DataType] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index ece5ee7361..b1bc858478 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -113,7 +113,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
   // UDFToString
   private[this] def castToString(from: DataType): Any => Any = from match {
     case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
-    case DateType => buildCast[Date](_, dateToString)
+    case DateType => buildCast[Int](_, d => DateUtils.toString(d))
     case TimestampType => buildCast[Timestamp](_, timestampToString)
     case _ => buildCast[Any](_, _.toString)
   }
@@ -131,7 +131,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
       buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
     case DateType =>
       // Hive would return null when cast from date to boolean
-      buildCast[Date](_, d => null)
+      buildCast[Int](_, d => null)
     case LongType =>
       buildCast[Long](_, _ != 0)
     case IntegerType =>
@@ -171,7 +171,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case ByteType =>
       buildCast[Byte](_, b => new Timestamp(b))
     case DateType =>
-      buildCast[Date](_, d => new Timestamp(d.getTime))
+      buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
     // TimestampWritable.decimalToTimestamp
     case DecimalType() =>
       buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -224,37 +224,24 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     }
   }
 
-  // Converts Timestamp to string according to Hive TimestampWritable convention
-  private[this] def timestampToDateString(ts: Timestamp): String = {
-    Cast.threadLocalDateFormat.get.format(ts)
-  }
-
   // DateConverter
   private[this] def castToDate(from: DataType): Any => Any = from match {
     case StringType =>
       buildCast[String](_, s =>
-        try Date.valueOf(s) catch { case _: java.lang.IllegalArgumentException => null })
+        try DateUtils.fromJavaDate(Date.valueOf(s))
+        catch { case _: java.lang.IllegalArgumentException => null }
+      )
     case TimestampType =>
       // throw valid precision more than seconds, according to Hive.
       // Timestamp.nanos is in 0 to 999,999,999, no more than a second.
-      buildCast[Timestamp](_, t => new Date(Math.floor(t.getTime / 1000.0).toLong * 1000))
+      buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
     // Hive throws this exception as a Semantic Exception
-    // It is never possible to compare result when hive return with exception, so we can return null
+    // It is never possible to compare result when hive return with exception,
+    // so we can return null
     // NULL is more reasonable here, since the query itself obeys the grammar.
     case _ => _ => null
   }
 
-  // Date cannot be cast to long, according to hive
-  private[this] def dateToLong(d: Date) = null
-
-  // Date cannot be cast to double, according to hive
-  private[this] def dateToDouble(d: Date) = null
-
-  // Converts Date to string according to Hive DateWritable convention
-  private[this] def dateToString(d: Date): String = {
-    Cast.threadLocalDateFormat.get.format(d)
-  }
-
   // LongConverter
   private[this] def castToLong(from: DataType): Any => Any = from match {
     case StringType =>
@@ -264,7 +251,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => if (b) 1L else 0L)
     case DateType =>
-      buildCast[Date](_, d => dateToLong(d))
+      buildCast[Int](_, d => null)
     case TimestampType =>
       buildCast[Timestamp](_, t => timestampToLong(t))
     case x: NumericType =>
@@ -280,7 +267,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => if (b) 1 else 0)
     case DateType =>
-      buildCast[Date](_, d => dateToLong(d))
+      buildCast[Int](_, d => null)
     case TimestampType =>
       buildCast[Timestamp](_, t => timestampToLong(t).toInt)
     case x: NumericType =>
@@ -296,7 +283,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort)
     case DateType =>
-      buildCast[Date](_, d => dateToLong(d))
+      buildCast[Int](_, d => null)
     case TimestampType =>
       buildCast[Timestamp](_, t => timestampToLong(t).toShort)
     case x: NumericType =>
@@ -312,7 +299,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte)
     case DateType =>
-      buildCast[Date](_, d => dateToLong(d))
+      buildCast[Int](_, d => null)
     case TimestampType =>
       buildCast[Timestamp](_, t => timestampToLong(t).toByte)
     case x: NumericType =>
@@ -342,7 +329,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => changePrecision(if (b) Decimal(1) else Decimal(0), target))
     case DateType =>
-      buildCast[Date](_, d => null) // date can't cast to decimal in Hive
+      buildCast[Int](_, d => null) // date can't cast to decimal in Hive
     case TimestampType =>
       // Note that we lose precision here.
       buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
@@ -367,7 +354,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => if (b) 1d else 0d)
     case DateType =>
-      buildCast[Date](_, d => dateToDouble(d))
+      buildCast[Int](_, d => null)
     case TimestampType =>
       buildCast[Timestamp](_, t => timestampToDouble(t))
     case x: NumericType =>
@@ -383,7 +370,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case BooleanType =>
       buildCast[Boolean](_, b => if (b) 1f else 0f)
     case DateType =>
-      buildCast[Date](_, d => dateToDouble(d))
+      buildCast[Int](_, d => null)
     case TimestampType =>
       buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
     case x: NumericType =>
@@ -442,16 +429,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
 
 object Cast {
   // `SimpleDateFormat` is not thread-safe.
-  private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
+  private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
     override def initialValue() = {
-      new SimpleDateFormat("yyyy-MM-dd")
+      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
     }
   }
 
   // `SimpleDateFormat` is not thread-safe.
-  private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
+  private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
     override def initialValue() = {
-      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+      new SimpleDateFormat("yyyy-MM-dd")
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 4cae5c4718..1f80d84b74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -246,6 +246,9 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
               new String(${eval.primitiveTerm}.asInstanceOf[Array[Byte]])
         """.children
 
+      case Cast(child @ DateType(), StringType) =>
+        child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)
+
       case Cast(child @ NumericType(), IntegerType) =>
         child.castOrNull(c => q"$c.toInt", IntegerType)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 5b389aad7a..97bb96f48e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -35,7 +35,7 @@ object Literal {
     case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
     case d: Decimal => Literal(d, DecimalType.Unlimited)
     case t: Timestamp => Literal(t, TimestampType)
-    case d: Date => Literal(d, DateType)
+    case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
     case a: Array[Byte] => Literal(a, BinaryType)
     case null => Literal(null, NullType)
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
new file mode 100644
index 0000000000..8a1a3b81b3
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.sql.Date
+import java.util.{Calendar, TimeZone}
+
+import org.apache.spark.sql.catalyst.expressions.Cast
+
+/**
+ * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
+ */
+object DateUtils {
+  private val MILLIS_PER_DAY = 86400000
+
+  // Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
+  private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
+    override protected def initialValue: TimeZone = {
+      Calendar.getInstance.getTimeZone
+    }
+  }
+
+  private def javaDateToDays(d: Date): Int = {
+    millisToDays(d.getTime)
+  }
+
+  def millisToDays(millisLocal: Long): Int = {
+    ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
+  }
+
+  private def toMillisSinceEpoch(days: Int): Long = {
+    val millisUtc = days.toLong * MILLIS_PER_DAY
+    millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
+  }
+
+  def fromJavaDate(date: java.sql.Date): Int = {
+    javaDateToDays(date)
+  }
+
+  def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
+    new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
+  }
+
+  def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index 6ab99aa388..8ca0769fac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.types
 
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
 import scala.reflect.ClassTag
@@ -387,18 +387,16 @@ case object TimestampType extends NativeType {
  */
 @DeveloperApi
 case object DateType extends NativeType {
-  private[sql] type JvmType = Date
+  private[sql] type JvmType = Int
 
   @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
 
-  private[sql] val ordering = new Ordering[JvmType] {
-    def compare(x: Date, y: Date) = x.compareTo(y)
-  }
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
 
   /**
-   * The default size of a value of the DateType is 8 bytes.
+   * The default size of a value of the DateType is 4 bytes.
    */
-  override def defaultSize: Int = 8
+  override def defaultSize: Int = 4
 }
 
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 37e64adeea..25d1c105a0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -303,6 +303,7 @@ class ExpressionEvaluationSuite extends FunSuite {
 
     val sd = "1970-01-01"
     val d = Date.valueOf(sd)
+    val zts = sd + " 00:00:00"
     val sts = sd + " 00:00:02"
     val nts = sts + ".1"
     val ts = Timestamp.valueOf(nts)
@@ -319,14 +320,14 @@ class ExpressionEvaluationSuite extends FunSuite {
     checkEvaluation(Cast(Literal(1.toDouble) cast TimestampType, DoubleType), 1.toDouble)
 
     checkEvaluation(Cast(Literal(sd) cast DateType, StringType), sd)
-    checkEvaluation(Cast(Literal(d) cast StringType, DateType), d)
+    checkEvaluation(Cast(Literal(d) cast StringType, DateType), 0)
     checkEvaluation(Cast(Literal(nts) cast TimestampType, StringType), nts)
     checkEvaluation(Cast(Literal(ts) cast StringType, TimestampType), ts)
     // all convert to string type to check
     checkEvaluation(
       Cast(Cast(Literal(nts) cast TimestampType, DateType), StringType), sd)
     checkEvaluation(
-      Cast(Cast(Literal(ts) cast DateType, TimestampType), StringType), sts)
+      Cast(Cast(Literal(ts) cast DateType, TimestampType), StringType), zts)
 
     checkEvaluation(Cast("abdef" cast BinaryType, StringType), "abdef")
 
@@ -377,8 +378,8 @@ class ExpressionEvaluationSuite extends FunSuite {
   }
 
   test("date") {
-    val d1 = Date.valueOf("1970-01-01")
-    val d2 = Date.valueOf("1970-01-02")
+    val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
+    val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
     checkEvaluation(Literal(d1) < Literal(d2), true)
   }
 
@@ -459,22 +460,21 @@ class ExpressionEvaluationSuite extends FunSuite {
 
   test("date casting") {
     val d = Date.valueOf("1970-01-01")
-    checkEvaluation(Cast(d, ShortType), null)
-    checkEvaluation(Cast(d, IntegerType), null)
-    checkEvaluation(Cast(d, LongType), null)
-    checkEvaluation(Cast(d, FloatType), null)
-    checkEvaluation(Cast(d, DoubleType), null)
-    checkEvaluation(Cast(d, DecimalType.Unlimited), null)
-    checkEvaluation(Cast(d, DecimalType(10, 2)), null)
-    checkEvaluation(Cast(d, StringType), "1970-01-01")
-    checkEvaluation(Cast(Cast(d, TimestampType), StringType), "1970-01-01 00:00:00")
+    checkEvaluation(Cast(Literal(d), ShortType), null)
+    checkEvaluation(Cast(Literal(d), IntegerType), null)
+    checkEvaluation(Cast(Literal(d), LongType), null)
+    checkEvaluation(Cast(Literal(d), FloatType), null)
+    checkEvaluation(Cast(Literal(d), DoubleType), null)
+    checkEvaluation(Cast(Literal(d), DecimalType.Unlimited), null)
+    checkEvaluation(Cast(Literal(d), DecimalType(10, 2)), null)
+    checkEvaluation(Cast(Literal(d), StringType), "1970-01-01")
+    checkEvaluation(Cast(Cast(Literal(d), TimestampType), StringType), "1970-01-01 00:00:00")
   }
 
   test("timestamp casting") {
     val millis = 15 * 1000 + 2
     val seconds = millis * 1000 + 2
     val ts = new Timestamp(millis)
-    val ts1 = new Timestamp(15 * 1000)  // a timestamp without the milliseconds part
     val tss = new Timestamp(seconds)
     checkEvaluation(Cast(ts, ShortType), 15)
     checkEvaluation(Cast(ts, IntegerType), 15)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index c147be9f6b..7bcd6687d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -106,7 +106,7 @@ class DataTypeSuite extends FunSuite {
   checkDefaultSize(DoubleType, 8)
   checkDefaultSize(DecimalType(10, 5), 4096)
   checkDefaultSize(DecimalType.Unlimited, 4096)
-  checkDefaultSize(DateType, 8)
+  checkDefaultSize(DateType, 4)
   checkDefaultSize(TimestampType, 8)
   checkDefaultSize(StringType, 4096)
   checkDefaultSize(BinaryType, 4096)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 391b3dae5c..cad0667b46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.columnar
 
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, AttributeReference}
@@ -215,22 +215,7 @@ private[sql] class StringColumnStats extends ColumnStats {
   def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
-private[sql] class DateColumnStats extends ColumnStats {
-  protected var upper: Date = null
-  protected var lower: Date = null
-
-  override def gatherStats(row: Row, ordinal: Int) {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row(ordinal).asInstanceOf[Date]
-      if (upper == null || value.compareTo(upper) > 0) upper = value
-      if (lower == null || value.compareTo(lower) < 0) lower = value
-      sizeInBytes += DATE.defaultSize
-    }
-  }
-
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
-}
+private[sql] class DateColumnStats extends IntColumnStats
 
 private[sql] class TimestampColumnStats extends ColumnStats {
   protected var upper: Timestamp = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index fcf2faa091..db5bc0de36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -335,21 +335,20 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
   }
 }
 
-private[sql] object DATE extends NativeColumnType(DateType, 8, 8) {
+private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
   override def extract(buffer: ByteBuffer) = {
-    val date = new Date(buffer.getLong())
-    date
+    buffer.getInt
   }
 
-  override def append(v: Date, buffer: ByteBuffer): Unit = {
-    buffer.putLong(v.getTime)
+  override def append(v: Int, buffer: ByteBuffer): Unit = {
+    buffer.putInt(v)
   }
 
   override def getField(row: Row, ordinal: Int) = {
-    row(ordinal).asInstanceOf[Date]
+    row(ordinal).asInstanceOf[Int]
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Date): Unit = {
+  def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
     row(ordinal) = value
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index b85021acc9..3a2f8d75da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -135,6 +135,8 @@ object EvaluatePython {
 
     case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
 
+    case (date: Int, DateType) => DateUtils.toJavaDate(date)
+
     // Pyrolite can handle Timestamp and Decimal
     case (other, _) => other
   }
@@ -171,7 +173,7 @@ object EvaluatePython {
       }): Row
 
     case (c: java.util.Calendar, DateType) =>
-      new java.sql.Date(c.getTime().getTime())
+      DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime()))
 
     case (c: java.util.Calendar, TimestampType) =>
       new java.sql.Timestamp(c.getTime().getTime())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 9171939f7e..33ce71b51b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -377,10 +377,12 @@ private[sql] object JsonRDD extends Logging {
     }
   }
 
-  private def toDate(value: Any): Date = {
+  private def toDate(value: Any): Int = {
     value match {
       // only support string as date
-      case value: java.lang.String => new Date(DataTypeConversions.stringToTime(value).getTime)
+      case value: java.lang.String =>
+        DateUtils.millisToDays(DataTypeConversions.stringToTime(value).getTime)
+      case value: java.sql.Date => DateUtils.fromJavaDate(value)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d82c34316c..a7f6a50a04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -296,6 +296,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       mapData.collect().take(1).map(Row.fromTuple).toSeq)
   }
 
+  test("date row") {
+    checkAnswer(sql(
+      """select cast("2015-01-28" as date) from testData limit 1"""),
+      Row(java.sql.Date.valueOf("2015-01-28"))
+    )
+  }
+
   test("from follow multiple brackets") {
     checkAnswer(sql(
       "select key from ((select * from testData limit 1) union all (select * from testData limit 1)) x limit 1"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index a015884bae..f26fcc0385 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -83,7 +83,8 @@ class ScalaReflectionRelationSuite extends FunSuite {
 
     assert(sql("SELECT * FROM reflectData").collect().head ===
       Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
-        new java.math.BigDecimal(1), new Date(12345), new Timestamp(12345), Seq(1,2,3)))
+        new java.math.BigDecimal(1), new Date(70, 0, 1), // This is 1970-01-01
+        new Timestamp(12345), Seq(1,2,3)))
   }
 
   test("query case class RDD with nulls") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index be2b34de07..581fccf8ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -30,7 +30,7 @@ class ColumnStatsSuite extends FunSuite {
   testColumnStats(classOf[FloatColumnStats], FLOAT, Row(Float.MaxValue, Float.MinValue, 0))
   testColumnStats(classOf[DoubleColumnStats], DOUBLE, Row(Double.MaxValue, Double.MinValue, 0))
   testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
-  testColumnStats(classOf[DateColumnStats], DATE, Row(null, null, 0))
+  testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0))
   testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0))
 
   def testColumnStats[T <: NativeType, U <: ColumnStats](
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 87e608a885..9ce845912f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.columnar
 
 import java.nio.ByteBuffer
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import org.scalatest.FunSuite
 
@@ -34,7 +34,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
   test("defaultSize") {
     val checks = Map(
       INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4, BOOLEAN -> 1,
-      STRING -> 8, DATE -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
+      STRING -> 8, DATE -> 4, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
 
     checks.foreach { case (columnType, expectedSize) =>
       assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
@@ -64,7 +64,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
     checkActualSize(FLOAT,     Float.MaxValue,    4)
     checkActualSize(BOOLEAN,   true,              1)
     checkActualSize(STRING,    "hello",           4 + "hello".getBytes("utf-8").length)
-    checkActualSize(DATE,      new Date(0L),      8)
+    checkActualSize(DATE,      0,                 4)
     checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
 
     val binary = Array.fill[Byte](4)(0: Byte)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index f941465fa3..60ed28cc97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.columnar
 
+import java.sql.Timestamp
+
 import scala.collection.immutable.HashSet
 import scala.util.Random
 
-import java.sql.{Date, Timestamp}
-
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
 import org.apache.spark.sql.types.{DataType, NativeType}
@@ -50,7 +50,7 @@ object ColumnarTestUtils {
       case STRING    => Random.nextString(Random.nextInt(32))
       case BOOLEAN   => Random.nextBoolean()
       case BINARY    => randomBytes(Random.nextInt(32))
-      case DATE      => new Date(Random.nextLong())
+      case DATE      => Random.nextInt()
       case TIMESTAMP =>
         val timestamp = new Timestamp(Random.nextLong())
         timestamp.setNanos(Random.nextInt(999999999))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index cb615388da..1396c6b724 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -67,14 +67,15 @@ class JsonSuite extends QueryTest {
     checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, TimestampType))
 
     val strDate = "2014-10-15"
-    checkTypePromotion(Date.valueOf(strDate), enforceCorrectType(strDate, DateType))
+    checkTypePromotion(
+      DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
     checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType))
-    checkTypePromotion(new Date(3601000), enforceCorrectType(ISO8601Time1, DateType))
+    checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
     val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType))
-    checkTypePromotion(new Date(10801000), enforceCorrectType(ISO8601Time2, DateType))
+    checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
   }
 
   test("Get compatible type") {
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 0d934620ac..a6266f611c 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -357,6 +357,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "database_drop",
     "database_location",
     "database_properties",
+    "date_1",
     "date_2",
     "date_3",
     "date_4",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index b746942cb1..724bd28d4b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.{BufferedReader, InputStreamReader, PrintStream}
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
@@ -409,7 +409,7 @@ private object HiveContext {
           toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
       }.toSeq.sorted.mkString("{", ",", "}")
     case (null, _) => "NULL"
-    case (d: Date, DateType) => new DateWritable(d).toString
+    case (d: Int, DateType) => new DateWritable(d).toString
     case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
     case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
     case (decimal: java.math.BigDecimal, DecimalType()) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 82dba99900..4afa2e71d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -267,7 +267,8 @@ private[hive] trait HiveInspectors {
       val temp = new Array[Byte](writable.getLength)
       System.arraycopy(writable.getBytes, 0, temp, 0, temp.length)
       temp
-    case poi: WritableConstantDateObjectInspector => poi.getWritableConstantValue.get()
+    case poi: WritableConstantDateObjectInspector =>
+      DateUtils.fromJavaDate(poi.getWritableConstantValue.get())
     case mi: StandardConstantMapObjectInspector =>
       // take the value from the map inspector object, rather than the input data
       mi.getWritableConstantValue.map { case (k, v) =>
@@ -304,7 +305,8 @@ private[hive] trait HiveInspectors {
         System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength())
         result
       case x: DateObjectInspector if x.preferWritable() =>
-        x.getPrimitiveWritableObject(data).get()
+        DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
+      case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
       // org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object
       // if next timestamp is null, so Timestamp object is cloned
       case x: TimestampObjectInspector if x.preferWritable() =>
@@ -343,6 +345,9 @@ private[hive] trait HiveInspectors {
     case _: JavaHiveDecimalObjectInspector =>
       (o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toJavaBigDecimal)
 
+    case _: JavaDateObjectInspector =>
+      (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
+
     case soi: StandardStructObjectInspector =>
       val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
       (o: Any) => {
@@ -426,7 +431,7 @@ private[hive] trait HiveInspectors {
       case _: BinaryObjectInspector if x.preferWritable() => HiveShim.getBinaryWritable(a)
       case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]]
       case _: DateObjectInspector if x.preferWritable() => HiveShim.getDateWritable(a)
-      case _: DateObjectInspector => a.asInstanceOf[java.sql.Date]
+      case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
       case _: TimestampObjectInspector if x.preferWritable() => HiveShim.getTimestampWritable(a)
       case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index c368715f7c..effaa5a443 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -34,6 +34,7 @@ import org.apache.spark.SerializableWritable
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DateUtils
 
 /**
  * A trait for subclasses that handle table scans.
@@ -306,7 +307,7 @@ private[hive] object HadoopTableReader extends HiveInspectors {
             row.update(ordinal, oi.getPrimitiveJavaObject(value).clone())
         case oi: DateObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
-            row.update(ordinal, oi.getPrimitiveJavaObject(value))
+            row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
         case oi: BinaryObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
             row.update(ordinal, oi.getPrimitiveJavaObject(value))
diff --git a/sql/hive/src/test/resources/golden/Date cast-0-a7cd69b80c77a771a2c955db666be53d b/sql/hive/src/test/resources/golden/Date cast-0-a7cd69b80c77a771a2c955db666be53d
new file mode 100644
index 0000000000..98da82fa89
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/Date cast-0-a7cd69b80c77a771a2c955db666be53d	
@@ -0,0 +1 @@
+1970-01-01	1970-01-01	1969-12-31 16:00:00	1969-12-31 16:00:00	1970-01-01 00:00:00
diff --git a/sql/hive/src/test/resources/golden/Date comparison test 1-0-bde89be08a12361073ff658fef768b7e b/sql/hive/src/test/resources/golden/Date comparison test 1-0-bde89be08a12361073ff658fef768b7e
new file mode 100644
index 0000000000..27ba77ddaf
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/Date comparison test 1-0-bde89be08a12361073ff658fef768b7e	
@@ -0,0 +1 @@
+true
diff --git a/sql/hive/src/test/resources/golden/Date comparison test 2-0-dc1b267f1d79d49e6675afe4fd2a34a5 b/sql/hive/src/test/resources/golden/Date comparison test 2-0-dc1b267f1d79d49e6675afe4fd2a34a5
new file mode 100644
index 0000000000..27ba77ddaf
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/Date comparison test 2-0-dc1b267f1d79d49e6675afe4fd2a34a5	
@@ -0,0 +1 @@
+true
diff --git a/sql/hive/src/test/resources/golden/date_1-0-50131c0ba7b7a6b65c789a5a8497bada b/sql/hive/src/test/resources/golden/date_1-0-50131c0ba7b7a6b65c789a5a8497bada
new file mode 100644
index 0000000000..573541ac97
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/date_1-0-50131c0ba7b7a6b65c789a5a8497bada
@@ -0,0 +1 @@
+0
diff --git a/sql/hive/src/test/resources/golden/date_1-0-23edf29bf7376c70d5ecf12720f4b1eb b/sql/hive/src/test/resources/golden/date_1-1-23edf29bf7376c70d5ecf12720f4b1eb
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-0-23edf29bf7376c70d5ecf12720f4b1eb
rename to sql/hive/src/test/resources/golden/date_1-1-23edf29bf7376c70d5ecf12720f4b1eb
diff --git a/sql/hive/src/test/resources/golden/date_1-3-df16364a220ff96a6ea1cd478cbc1d0b b/sql/hive/src/test/resources/golden/date_1-10-df16364a220ff96a6ea1cd478cbc1d0b
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-3-df16364a220ff96a6ea1cd478cbc1d0b
rename to sql/hive/src/test/resources/golden/date_1-10-df16364a220ff96a6ea1cd478cbc1d0b
diff --git a/sql/hive/src/test/resources/golden/date_1-10-d964bec7e5632091ab5cb6f6786dbbf9 b/sql/hive/src/test/resources/golden/date_1-11-d964bec7e5632091ab5cb6f6786dbbf9
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-10-d964bec7e5632091ab5cb6f6786dbbf9
rename to sql/hive/src/test/resources/golden/date_1-11-d964bec7e5632091ab5cb6f6786dbbf9
diff --git a/sql/hive/src/test/resources/golden/date_1-11-480c5f024a28232b7857be327c992509 b/sql/hive/src/test/resources/golden/date_1-12-480c5f024a28232b7857be327c992509
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-11-480c5f024a28232b7857be327c992509
rename to sql/hive/src/test/resources/golden/date_1-12-480c5f024a28232b7857be327c992509
diff --git a/sql/hive/src/test/resources/golden/date_1-12-4c0ed7fcb75770d8790575b586bf14f4 b/sql/hive/src/test/resources/golden/date_1-13-4c0ed7fcb75770d8790575b586bf14f4
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-12-4c0ed7fcb75770d8790575b586bf14f4
rename to sql/hive/src/test/resources/golden/date_1-13-4c0ed7fcb75770d8790575b586bf14f4
diff --git a/sql/hive/src/test/resources/golden/date_1-13-44fc74c1993062c0a9522199ff27fea b/sql/hive/src/test/resources/golden/date_1-14-44fc74c1993062c0a9522199ff27fea
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-13-44fc74c1993062c0a9522199ff27fea
rename to sql/hive/src/test/resources/golden/date_1-14-44fc74c1993062c0a9522199ff27fea
diff --git a/sql/hive/src/test/resources/golden/date_1-14-4855a66124b16d1d0d003235995ac06b b/sql/hive/src/test/resources/golden/date_1-15-4855a66124b16d1d0d003235995ac06b
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-14-4855a66124b16d1d0d003235995ac06b
rename to sql/hive/src/test/resources/golden/date_1-15-4855a66124b16d1d0d003235995ac06b
diff --git a/sql/hive/src/test/resources/golden/date_1-15-8bc190dba0f641840b5e1e198a14c55b b/sql/hive/src/test/resources/golden/date_1-16-8bc190dba0f641840b5e1e198a14c55b
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-15-8bc190dba0f641840b5e1e198a14c55b
rename to sql/hive/src/test/resources/golden/date_1-16-8bc190dba0f641840b5e1e198a14c55b
diff --git a/sql/hive/src/test/resources/golden/date_1-1-4ebe3571c13a8b0c03096fbd972b7f1b b/sql/hive/src/test/resources/golden/date_1-17-23edf29bf7376c70d5ecf12720f4b1eb
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-1-4ebe3571c13a8b0c03096fbd972b7f1b
rename to sql/hive/src/test/resources/golden/date_1-17-23edf29bf7376c70d5ecf12720f4b1eb
diff --git a/sql/hive/src/test/resources/golden/date_1-16-23edf29bf7376c70d5ecf12720f4b1eb b/sql/hive/src/test/resources/golden/date_1-2-4ebe3571c13a8b0c03096fbd972b7f1b
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-16-23edf29bf7376c70d5ecf12720f4b1eb
rename to sql/hive/src/test/resources/golden/date_1-2-4ebe3571c13a8b0c03096fbd972b7f1b
diff --git a/sql/hive/src/test/resources/golden/date_1-2-abdce0c0d14d3fc7441b7c134b02f99a b/sql/hive/src/test/resources/golden/date_1-3-26b5c291400dfde455b3c1b878b71d0
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-2-abdce0c0d14d3fc7441b7c134b02f99a
rename to sql/hive/src/test/resources/golden/date_1-3-26b5c291400dfde455b3c1b878b71d0
diff --git a/sql/hive/src/test/resources/golden/date_1-6-df16364a220ff96a6ea1cd478cbc1d0b b/sql/hive/src/test/resources/golden/date_1-4-df16364a220ff96a6ea1cd478cbc1d0b
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-6-df16364a220ff96a6ea1cd478cbc1d0b
rename to sql/hive/src/test/resources/golden/date_1-4-df16364a220ff96a6ea1cd478cbc1d0b
diff --git a/sql/hive/src/test/resources/golden/date_1-4-d964bec7e5632091ab5cb6f6786dbbf9 b/sql/hive/src/test/resources/golden/date_1-5-d964bec7e5632091ab5cb6f6786dbbf9
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-4-d964bec7e5632091ab5cb6f6786dbbf9
rename to sql/hive/src/test/resources/golden/date_1-5-d964bec7e5632091ab5cb6f6786dbbf9
diff --git a/sql/hive/src/test/resources/golden/date_1-5-5e70fc74158fbfca38134174360de12d b/sql/hive/src/test/resources/golden/date_1-6-559d01fb0b42c42f0c4927fa0f9deac4
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-5-5e70fc74158fbfca38134174360de12d
rename to sql/hive/src/test/resources/golden/date_1-6-559d01fb0b42c42f0c4927fa0f9deac4
diff --git a/sql/hive/src/test/resources/golden/date_1-9-df16364a220ff96a6ea1cd478cbc1d0b b/sql/hive/src/test/resources/golden/date_1-7-df16364a220ff96a6ea1cd478cbc1d0b
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-9-df16364a220ff96a6ea1cd478cbc1d0b
rename to sql/hive/src/test/resources/golden/date_1-7-df16364a220ff96a6ea1cd478cbc1d0b
diff --git a/sql/hive/src/test/resources/golden/date_1-7-d964bec7e5632091ab5cb6f6786dbbf9 b/sql/hive/src/test/resources/golden/date_1-8-d964bec7e5632091ab5cb6f6786dbbf9
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-7-d964bec7e5632091ab5cb6f6786dbbf9
rename to sql/hive/src/test/resources/golden/date_1-8-d964bec7e5632091ab5cb6f6786dbbf9
diff --git a/sql/hive/src/test/resources/golden/date_1-8-1d5c58095cd52ea539d869f2ab1ab67d b/sql/hive/src/test/resources/golden/date_1-9-8306558e0eabe936ac33dabaaa17fea4
similarity index 100%
rename from sql/hive/src/test/resources/golden/date_1-8-1d5c58095cd52ea539d869f2ab1ab67d
rename to sql/hive/src/test/resources/golden/date_1-9-8306558e0eabe936ac33dabaaa17fea4
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
index 2d3ff68012..09bbd5c867 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hive
 
 import java.util
-import java.sql.Date
 import java.util.{Locale, TimeZone}
 
 import org.apache.hadoop.hive.ql.udf.UDAFPercentile
@@ -76,7 +75,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
     Literal(0.asInstanceOf[Float]) ::
     Literal(0.asInstanceOf[Double]) ::
     Literal("0") ::
-    Literal(new Date(2014, 9, 23)) ::
+    Literal(new java.sql.Date(114, 8, 23)) ::
     Literal(Decimal(BigDecimal(123.123))) ::
     Literal(new java.sql.Timestamp(123123)) ::
     Literal(Array[Byte](1,2,3)) ::
@@ -143,7 +142,6 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
       case (r1: Array[Byte], r2: Array[Byte])
         if r1 != null && r2 != null && r1.length == r2.length =>
         r1.zip(r2).map { case (b1, b2) => assert(b1 === b2) }
-      case (r1: Date, r2: Date) => assert(r1.compareTo(r2) === 0)
       case (r1, r2) => assert(r1 === r2)
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4c53b10ba9..4f67d1def6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -253,8 +253,30 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
 
   createQueryTest("Cast Timestamp to Timestamp in UDF",
     """
-       | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp))
-       | FROM src LIMIT 1
+      | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp))
+      | FROM src LIMIT 1
+    """.stripMargin)
+
+  createQueryTest("Date comparison test 1",
+    """
+      | SELECT
+      | CAST(CAST('1970-01-01 22:00:00' AS timestamp) AS date) ==
+      | CAST(CAST('1970-01-01 23:00:00' AS timestamp) AS date)
+      | FROM src LIMIT 1
+    """.stripMargin)
+
+  createQueryTest("Date comparison test 2",
+    "SELECT CAST(CAST(0 AS timestamp) AS date) > CAST(0 AS timestamp) FROM src LIMIT 1")
+
+  createQueryTest("Date cast",
+    """
+      | SELECT
+      | CAST(CAST(0 AS timestamp) AS date),
+      | CAST(CAST(CAST(0 AS timestamp) AS date) AS string),
+      | CAST(0 AS timestamp),
+      | CAST(CAST(0 AS timestamp) AS string),
+      | CAST(CAST(CAST('1970-01-01 23:00:00' AS timestamp) AS date) AS timestamp)
+      | FROM src LIMIT 1
     """.stripMargin)
 
   createQueryTest("Simple Average",
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 254919e8f6..b5a0754ff6 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -160,7 +160,7 @@ private[hive] object HiveShim {
     if (value == null) null else new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
 
   def getDateWritable(value: Any): hiveIo.DateWritable =
-    if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[java.sql.Date])
+    if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
 
   def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
     if (value == null) {
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index 45ca59ae56..e4c1809c8b 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -263,7 +263,7 @@ private[hive] object HiveShim {
     }
 
   def getDateWritable(value: Any): hiveIo.DateWritable =
-    if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[java.sql.Date])
+    if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
 
   def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
     if (value == null) {
-- 
GitLab