diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 063940cb9e2c3b58f1441c5a3c8c54dd56bc4d83..672620460c3c59bbe4e94d6213380d1bca8f5d7e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -31,6 +31,11 @@ import org.apache.spark.unsafe.types.UTF8String
  * precision.
  */
 object DateTimeUtils {
+
+  // we use Int and Long internally to represent [[DateType]] and [[TimestampType]]
+  type SQLDate = Int
+  type SQLTimestamp = Long
+
   // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
   final val JULIAN_DAY_OF_EPOCH = 2440587  // and .5
   final val SECONDS_PER_DAY = 60 * 60 * 24L
@@ -72,7 +77,7 @@ object DateTimeUtils {
   }
 
   // we should use the exact day as Int, for example, (year, month, day) -> day
-  def millisToDays(millisUtc: Long): Int = {
+  def millisToDays(millisUtc: Long): SQLDate = {
     // SPARK-6785: use Math.floor so negative number of days (dates before 1970)
     // will correctly work as input for function toJavaDate(Int)
     val millisLocal = millisUtc + threadLocalLocalTimeZone.get().getOffset(millisUtc)
@@ -80,16 +85,16 @@ object DateTimeUtils {
   }
 
   // reverse of millisToDays
-  def daysToMillis(days: Int): Long = {
+  def daysToMillis(days: SQLDate): Long = {
     val millisUtc = days.toLong * MILLIS_PER_DAY
     millisUtc - threadLocalLocalTimeZone.get().getOffset(millisUtc)
   }
 
-  def dateToString(days: Int): String =
+  def dateToString(days: SQLDate): String =
     threadLocalDateFormat.get.format(toJavaDate(days))
 
   // Converts Timestamp to string according to Hive TimestampWritable convention.
-  def timestampToString(us: Long): String = {
+  def timestampToString(us: SQLTimestamp): String = {
     val ts = toJavaTimestamp(us)
     val timestampString = ts.toString
     val formatted = threadLocalTimestampFormat.get.format(ts)
@@ -132,21 +137,21 @@ object DateTimeUtils {
   /**
    * Returns the number of days since epoch from from java.sql.Date.
    */
-  def fromJavaDate(date: Date): Int = {
+  def fromJavaDate(date: Date): SQLDate = {
     millisToDays(date.getTime)
   }
 
   /**
    * Returns a java.sql.Date from number of days since epoch.
    */
-  def toJavaDate(daysSinceEpoch: Int): Date = {
+  def toJavaDate(daysSinceEpoch: SQLDate): Date = {
     new Date(daysToMillis(daysSinceEpoch))
   }
 
   /**
    * Returns a java.sql.Timestamp from number of micros since epoch.
    */
-  def toJavaTimestamp(us: Long): Timestamp = {
+  def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
     // setNanos() will overwrite the millisecond part, so the milliseconds should be
     // cut off at seconds
     var seconds = us / MICROS_PER_SECOND
@@ -164,7 +169,7 @@ object DateTimeUtils {
   /**
    * Returns the number of micros since epoch from java.sql.Timestamp.
    */
-  def fromJavaTimestamp(t: Timestamp): Long = {
+  def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
     if (t != null) {
       t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L
     } else {
@@ -176,7 +181,7 @@ object DateTimeUtils {
    * Returns the number of microseconds since epoch from Julian day
    * and nanoseconds in a day
    */
-  def fromJulianDay(day: Int, nanoseconds: Long): Long = {
+  def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
     // use Long to avoid rounding errors
     val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
     seconds * MICROS_PER_SECOND + nanoseconds / 1000L
@@ -185,7 +190,7 @@ object DateTimeUtils {
   /**
    * Returns Julian day and nanoseconds in a day from the number of microseconds
    */
-  def toJulianDay(us: Long): (Int, Long) = {
+  def toJulianDay(us: SQLTimestamp): (Int, Long) = {
     val seconds = us / MICROS_PER_SECOND + SECONDS_PER_DAY / 2
     val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
     val secondsInDay = seconds % SECONDS_PER_DAY
@@ -219,7 +224,7 @@ object DateTimeUtils {
    * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m`
    * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m`
    */
-  def stringToTimestamp(s: UTF8String): Option[Long] = {
+  def stringToTimestamp(s: UTF8String): Option[SQLTimestamp] = {
     if (s == null) {
       return None
     }
@@ -355,7 +360,7 @@ object DateTimeUtils {
    * `yyyy-[m]m-[d]d *`
    * `yyyy-[m]m-[d]dT*`
    */
-  def stringToDate(s: UTF8String): Option[Int] = {
+  def stringToDate(s: UTF8String): Option[SQLDate] = {
     if (s == null) {
       return None
     }
@@ -394,7 +399,7 @@ object DateTimeUtils {
   /**
    * Returns the hour value of a given timestamp value. The timestamp is expressed in microseconds.
    */
-  def getHours(timestamp: Long): Int = {
+  def getHours(timestamp: SQLTimestamp): Int = {
     val localTs = (timestamp / 1000) + defaultTimeZone.getOffset(timestamp / 1000)
     ((localTs / 1000 / 3600) % 24).toInt
   }
@@ -403,7 +408,7 @@ object DateTimeUtils {
    * Returns the minute value of a given timestamp value. The timestamp is expressed in
    * microseconds.
    */
-  def getMinutes(timestamp: Long): Int = {
+  def getMinutes(timestamp: SQLTimestamp): Int = {
     val localTs = (timestamp / 1000) + defaultTimeZone.getOffset(timestamp / 1000)
     ((localTs / 1000 / 60) % 60).toInt
   }
@@ -412,7 +417,7 @@ object DateTimeUtils {
    * Returns the second value of a given timestamp value. The timestamp is expressed in
    * microseconds.
    */
-  def getSeconds(timestamp: Long): Int = {
+  def getSeconds(timestamp: SQLTimestamp): Int = {
     ((timestamp / 1000 / 1000) % 60).toInt
   }
 
@@ -447,7 +452,7 @@ object DateTimeUtils {
    * The calculation uses the fact that the period 1.1.2001 until 31.12.2400 is
    * equals to the period 1.1.1601 until 31.12.2000.
    */
-  private[this] def getYearAndDayInYear(daysSince1970: Int): (Int, Int) = {
+  private[this] def getYearAndDayInYear(daysSince1970: SQLDate): (Int, Int) = {
     // add the difference (in days) between 1.1.1970 and the artificial year 0 (-17999)
     val daysNormalized = daysSince1970 + toYearZero
     val numOfQuarterCenturies = daysNormalized / daysIn400Years
@@ -461,7 +466,7 @@ object DateTimeUtils {
    * Returns the 'day in year' value for the given date. The date is expressed in days
    * since 1.1.1970.
    */
-  def getDayInYear(date: Int): Int = {
+  def getDayInYear(date: SQLDate): Int = {
     getYearAndDayInYear(date)._2
   }
 
@@ -469,7 +474,7 @@ object DateTimeUtils {
    * Returns the year value for the given date. The date is expressed in days
    * since 1.1.1970.
    */
-  def getYear(date: Int): Int = {
+  def getYear(date: SQLDate): Int = {
     getYearAndDayInYear(date)._1
   }
 
@@ -477,7 +482,7 @@ object DateTimeUtils {
    * Returns the quarter for the given date. The date is expressed in days
    * since 1.1.1970.
    */
-  def getQuarter(date: Int): Int = {
+  def getQuarter(date: SQLDate): Int = {
     var (year, dayInYear) = getYearAndDayInYear(date)
     if (isLeapYear(year)) {
       dayInYear = dayInYear - 1
@@ -497,7 +502,7 @@ object DateTimeUtils {
    * Split date (expressed in days since 1.1.1970) into four fields:
    * year, month (Jan is Month 1), dayInMonth, daysToMonthEnd (0 if it's last day of month).
    */
-  def splitDate(date: Int): (Int, Int, Int, Int) = {
+  def splitDate(date: SQLDate): (Int, Int, Int, Int) = {
     var (year, dayInYear) = getYearAndDayInYear(date)
     val isLeap = isLeapYear(year)
     if (isLeap && dayInYear == 60) {
@@ -541,7 +546,7 @@ object DateTimeUtils {
    * Returns the month value for the given date. The date is expressed in days
    * since 1.1.1970. January is month 1.
    */
-  def getMonth(date: Int): Int = {
+  def getMonth(date: SQLDate): Int = {
     var (year, dayInYear) = getYearAndDayInYear(date)
     if (isLeapYear(year)) {
       if (dayInYear == 60) {
@@ -582,7 +587,7 @@ object DateTimeUtils {
    * Returns the 'day of month' value for the given date. The date is expressed in days
    * since 1.1.1970.
    */
-  def getDayOfMonth(date: Int): Int = {
+  def getDayOfMonth(date: SQLDate): Int = {
     var (year, dayInYear) = getYearAndDayInYear(date)
     if (isLeapYear(year)) {
       if (dayInYear == 60) {
@@ -628,7 +633,7 @@ object DateTimeUtils {
    * Returns the date value for the first day of the given month.
    * The month is expressed in months since year zero (17999 BC), starting from 0.
    */
-  private def firstDayOfMonth(absoluteMonth: Int): Int = {
+  private def firstDayOfMonth(absoluteMonth: Int): SQLDate = {
     val absoluteYear = absoluteMonth / 12
     var monthInYear = absoluteMonth - absoluteYear * 12
     var date = getDateFromYear(absoluteYear)
@@ -646,7 +651,7 @@ object DateTimeUtils {
    * Returns the date value for January 1 of the given year.
    * The year is expressed in years since year zero (17999 BC), starting from 0.
    */
-  private def getDateFromYear(absoluteYear: Int): Int = {
+  private def getDateFromYear(absoluteYear: Int): SQLDate = {
     val absoluteDays = (absoluteYear * 365 + absoluteYear / 400 - absoluteYear / 100
       + absoluteYear / 4)
     absoluteDays - toYearZero
@@ -656,7 +661,7 @@ object DateTimeUtils {
    * Add date and year-month interval.
    * Returns a date value, expressed in days since 1.1.1970.
    */
-  def dateAddMonths(days: Int, months: Int): Int = {
+  def dateAddMonths(days: SQLDate, months: Int): SQLDate = {
     val (year, monthInYear, dayOfMonth, daysToMonthEnd) = splitDate(days)
     val absoluteMonth = (year - YearZero) * 12 + monthInYear - 1 + months
     val nonNegativeMonth = if (absoluteMonth >= 0) absoluteMonth else 0
@@ -679,7 +684,7 @@ object DateTimeUtils {
    * Add timestamp and full interval.
    * Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00.
    */
-  def timestampAddInterval(start: Long, months: Int, microseconds: Long): Long = {
+  def timestampAddInterval(start: SQLTimestamp, months: Int, microseconds: Long): SQLTimestamp = {
     val days = millisToDays(start / 1000L)
     val newDays = dateAddMonths(days, months)
     daysToMillis(newDays) * 1000L + start - daysToMillis(days) * 1000L + microseconds
@@ -695,7 +700,7 @@ object DateTimeUtils {
    * Otherwise, the difference is calculated based on 31 days per month, and rounding to
    * 8 digits.
    */
-  def monthsBetween(time1: Long, time2: Long): Double = {
+  def monthsBetween(time1: SQLTimestamp, time2: SQLTimestamp): Double = {
     val millis1 = time1 / 1000L
     val millis2 = time2 / 1000L
     val date1 = millisToDays(millis1)
@@ -740,7 +745,7 @@ object DateTimeUtils {
    * Returns the first date which is later than startDate and is of the given dayOfWeek.
    * dayOfWeek is an integer ranges in [0, 6], and 0 is Thu, 1 is Fri, etc,.
    */
-  def getNextDateForDayOfWeek(startDate: Int, dayOfWeek: Int): Int = {
+  def getNextDateForDayOfWeek(startDate: SQLDate, dayOfWeek: Int): SQLDate = {
     startDate + 1 + ((dayOfWeek - 1 - startDate) % 7 + 7) % 7
   }
 
@@ -748,7 +753,7 @@ object DateTimeUtils {
    * Returns last day of the month for the given date. The date is expressed in days
    * since 1.1.1970.
    */
-  def getLastDayOfMonth(date: Int): Int = {
+  def getLastDayOfMonth(date: SQLDate): SQLDate = {
     val (_, _, _, daysToMonthEnd) = splitDate(date)
     date + daysToMonthEnd
   }
@@ -761,7 +766,7 @@ object DateTimeUtils {
    * Returns the trunc date from original date and trunc level.
    * Trunc level should be generated using `parseTruncLevel()`, should only be 1 or 2.
    */
-  def truncDate(d: Int, level: Int): Int = {
+  def truncDate(d: SQLDate, level: Int): SQLDate = {
     if (level == TRUNC_TO_YEAR) {
       d - DateTimeUtils.getDayInYear(d) + 1
     } else if (level == TRUNC_TO_MONTH) {
@@ -792,7 +797,7 @@ object DateTimeUtils {
    * Returns a timestamp of given timezone from utc timestamp, with the same string
    * representation in their timezone.
    */
-  def fromUTCTime(time: Long, timeZone: String): Long = {
+  def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
     val tz = TimeZone.getTimeZone(timeZone)
     val offset = tz.getOffset(time / 1000L)
     time + offset * 1000L
@@ -802,7 +807,7 @@ object DateTimeUtils {
    * Returns a utc timestamp from a given timestamp from a given timezone, with the same
    * string representation in their timezone.
    */
-  def toUTCTime(time: Long, timeZone: String): Long = {
+  def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
     val tz = TimeZone.getTimeZone(timeZone)
     val offset = tz.getOffset(time / 1000L)
     time - offset * 1000L