diff --git a/pom.xml b/pom.xml
index 6d4f717d4931b30fba0290da38373e0bde42fe09..80cacb5ace2d4343ac869fbc398deb0e0b4d9776 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,6 @@
     <scala.binary.version>2.10</scala.binary.version>
     <jline.version>${scala.version}</jline.version>
     <jline.groupid>org.scala-lang</jline.groupid>
-    <jodd.version>3.6.3</jodd.version>
     <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
     <fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
     <snappy.version>1.1.1.7</snappy.version>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 015d0296dd369e067e4e32003dd8d6028147ac09..7a748fb5e38bd369bda309f1cd43f09f2129dbf8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,7 +54,17 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
             // SQL execution is considered private.
-            excludePackage("org.apache.spark.sql.execution")
+            excludePackage("org.apache.spark.sql.execution"),
+            // NanoTime and CatalystTimestampConverter is only used inside catalyst,
+            // not needed anymore
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.timestamp.NanoTime"),
+              ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.timestamp.NanoTime$"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.CatalystTimestampConverter"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.CatalystTimestampConverter$")
           )
         case v if v.startsWith("1.4") =>
           Seq(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 620e8de83a96cbae996e46c6a7127333e21d293e..429fc4077be9a888844aff16f3cf054d1f01c992 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst
 
 import java.lang.{Iterable => JavaIterable}
 import java.math.{BigDecimal => JavaBigDecimal}
-import java.sql.{Timestamp, Date}
+import java.sql.{Date, Timestamp}
 import java.util.{Map => JavaMap}
 import javax.annotation.Nullable
 
 import scala.collection.mutable.HashMap
 
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -272,18 +272,18 @@ object CatalystTypeConverters {
   }
 
   private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
-    override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
+    override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
     override def toScala(catalystValue: Any): Date =
-      if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
+      if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
     override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
   }
 
   private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
     override def toCatalystImpl(scalaValue: Timestamp): Long =
-      DateUtils.fromJavaTimestamp(scalaValue)
+      DateTimeUtils.fromJavaTimestamp(scalaValue)
     override def toScala(catalystValue: Any): Timestamp =
       if (catalystValue == null) null
-      else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
+      else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
     override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
       toScala(row.getLong(column))
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index ad920f287820c680bd848c37c11f216b667f7c78..d271434a306dd262b486cc308a2b9d261ef03e76 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -24,7 +24,7 @@ import java.text.{DateFormat, SimpleDateFormat}
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -115,9 +115,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
   // UDFToString
   private[this] def castToString(from: DataType): Any => Any = from match {
     case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
-    case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
+    case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d)))
     case TimestampType => buildCast[Long](_,
-      t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
+      t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
     case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
   }
 
@@ -162,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
         if (periodIdx != -1 && n.length() - periodIdx > 9) {
           n = n.substring(0, periodIdx + 10)
         }
-        try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
+        try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
         catch { case _: java.lang.IllegalArgumentException => null }
       })
     case BooleanType =>
@@ -176,7 +176,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     case ByteType =>
       buildCast[Byte](_, b => longToTimestamp(b.toLong))
     case DateType =>
-      buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
+      buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000)
     // TimestampWritable.decimalToTimestamp
     case DecimalType() =>
       buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -225,13 +225,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
   private[this] def castToDate(from: DataType): Any => Any = from match {
     case StringType =>
       buildCast[UTF8String](_, s =>
-        try DateUtils.fromJavaDate(Date.valueOf(s.toString))
+        try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
         catch { case _: java.lang.IllegalArgumentException => null }
       )
     case TimestampType =>
       // throw valid precision more than seconds, according to Hive.
       // Timestamp.nanos is in 0 to 999,999,999, no more than a second.
-      buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
+      buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L))
     // Hive throws this exception as a Semantic Exception
     // It is never possible to compare result when hive return with exception,
     // so we can return null
@@ -442,7 +442,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
       case (DateType, StringType) =>
         defineCodeGen(ctx, ev, c =>
           s"""${ctx.stringType}.fromString(
-                org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""")
+                org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""")
       // Special handling required for timestamps in hive test cases since the toString function
       // does not match the expected output.
       case (TimestampType, StringType) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 6c86a47ba200c25481a217648744865827de96bd..479224af5627ab385afd051ea17c38aa7736e5b1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -39,8 +39,8 @@ object Literal {
     case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
     case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
     case d: Decimal => Literal(d, DecimalType.Unlimited)
-    case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
-    case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
+    case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
+    case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
     case a: Array[Byte] => Literal(a, BinaryType)
     case null => Literal(null, NullType)
     case _ =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
similarity index 68%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 5cadc141af1df98e04ae6e00bef74c15a69a08c9..ff79884a44d006f5e96a1f6f812a6f85267d6f9e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -17,18 +17,28 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import java.sql.{Timestamp, Date}
+import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.{Calendar, TimeZone}
 
 import org.apache.spark.sql.catalyst.expressions.Cast
 
 /**
- * Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
+ * Helper functions for converting between internal and external date and time representations.
+ * Dates are exposed externally as java.sql.Date and are represented internally as the number of
+ * dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp
+ * and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond
+ * precision.
  */
-object DateUtils {
-  private val MILLIS_PER_DAY = 86400000
-  private val HUNDRED_NANOS_PER_SECOND = 10000000L
+object DateTimeUtils {
+  final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
+
+  // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
+  final val JULIAN_DAY_OF_EPOCH = 2440587  // and .5
+  final val SECONDS_PER_DAY = 60 * 60 * 24L
+  final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L
+  final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100
+
 
   // Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
   private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
@@ -117,4 +127,25 @@ object DateUtils {
       0L
     }
   }
+
+  /**
+   * Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day
+   * and nanoseconds in a day
+   */
+  def fromJulianDay(day: Int, nanoseconds: Long): Long = {
+    // use Long to avoid rounding errors
+    val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
+    seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
+  }
+
+  /**
+   * Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
+   */
+  def toJulianDay(num100ns: Long): (Int, Long) = {
+    val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
+    val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
+    val secondsInDay = seconds % SECONDS_PER_DAY
+    val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L
+    (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index e407f6f166e8614eb5accf9fa42d44879c3c9ff7..f3809be722a84384b99492fe59e596a9e0088b73 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import java.sql.{Timestamp, Date}
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 
 /**
@@ -156,7 +156,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(cast(cast(sd, DateType), StringType), sd)
     checkEvaluation(cast(cast(d, StringType), DateType), 0)
     checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
-    checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
+    checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
 
     // all convert to string type to check
     checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
@@ -301,9 +301,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(cast(ts, LongType), 15.toLong)
     checkEvaluation(cast(ts, FloatType), 15.002f)
     checkEvaluation(cast(ts, DoubleType), 15.002)
-    checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
-    checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
-    checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
+    checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
+    checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
+      DateTimeUtils.fromJavaTimestamp(ts))
+    checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
     checkEvaluation(
       cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
       millis.toFloat / 1000)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
index b6261bfba07867aeba924a14fe8952e9ded00ab7..72fec3b86e5e487a945e0b20b73e45374c7d34e0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types.{IntegerType, BooleanType}
 
 
@@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row)
     checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row)
 
-    val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
-    val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
+    val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))
+    val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02"))
     checkEvaluation(Literal(d1) < Literal(d2), true)
 
     val ts1 = new Timestamp(12)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index d8f3351d6dff6d2cc76c6b39cc791218f84631d3..c0675f4f4dff6a3b6d94a533c0a5cf0b99b8aac6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -23,7 +23,7 @@ import java.util.Arrays
 import org.scalatest.Matchers
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.PlatformDependent
 import org.apache.spark.unsafe.array.ByteArrayMethods
@@ -83,8 +83,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val row = new SpecificMutableRow(fieldTypes)
     row.setLong(0, 0)
     row.setString(1, "Hello")
-    row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01")))
-    row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
+    row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
+    row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
 
     val sizeRequired: Int = converter.getSizeRequirement(row)
     sizeRequired should be (8 + (8 * 4) +
@@ -98,9 +98,9 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     unsafeRow.getLong(0) should be (0)
     unsafeRow.getString(1) should be ("Hello")
     // Date is represented as Int in unsafeRow
-    DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
+    DateTimeUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
     // Timestamp is represented as Long in unsafeRow
-    DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
+    DateTimeUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
       (Timestamp.valueOf("2015-05-08 08:10:25"))
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
similarity index 52%
rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 4d8fe4ac5e78f1c2db1477a9b67bd395116d7454..03eb64f097a370443cfd52efc571b74bb3f6ce18 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -21,19 +21,31 @@ import java.sql.Timestamp
 
 import org.apache.spark.SparkFunSuite
 
-class DateUtilsSuite extends SparkFunSuite {
+class DateTimeUtilsSuite extends SparkFunSuite {
 
-  test("timestamp") {
+  test("timestamp and 100ns") {
     val now = new Timestamp(System.currentTimeMillis())
     now.setNanos(100)
-    val ns = DateUtils.fromJavaTimestamp(now)
-    assert(ns % 10000000L == 1)
-    assert(DateUtils.toJavaTimestamp(ns) == now)
+    val ns = DateTimeUtils.fromJavaTimestamp(now)
+    assert(ns % 10000000L === 1)
+    assert(DateTimeUtils.toJavaTimestamp(ns) === now)
 
     List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
-      val ts = DateUtils.toJavaTimestamp(t)
-      assert(DateUtils.fromJavaTimestamp(ts) == t)
-      assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
+      val ts = DateTimeUtils.toJavaTimestamp(t)
+      assert(DateTimeUtils.fromJavaTimestamp(ts) === t)
+      assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) === ts)
     }
   }
+
+  test("100ns and julian day") {
+    val (d, ns) = DateTimeUtils.toJulianDay(0)
+    assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH)
+    assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND)
+    assert(DateTimeUtils.fromJulianDay(d, ns) == 0L)
+
+    val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
+    val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t))
+    val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1))
+    assert(t.equals(t2))
+  }
 }
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index ed75475a870678c5ef3b3251878a712760f5cf68..8fc16928adbd9da647d572aab6b0f8a0493a4203 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -73,11 +73,6 @@
       <artifactId>jackson-databind</artifactId>
       <version>${fasterxml.jackson.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.jodd</groupId>
-      <artifactId>jodd-core</artifactId>
-      <version>${jodd.version}</version>
-    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index c8c67ce334002da8af4ab6ebe015fe1e214c7a33..6db551c543a9ca1b82f1c8180a17f5fe7664dc29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -148,8 +148,8 @@ object EvaluatePython {
 
     case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
 
-    case (date: Int, DateType) => DateUtils.toJavaDate(date)
-    case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
+    case (date: Int, DateType) => DateTimeUtils.toJavaDate(date)
+    case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t)
     case (s: UTF8String, StringType) => s.toString
 
     // Pyrolite can handle Timestamp and Decimal
@@ -188,12 +188,12 @@ object EvaluatePython {
       }): Row
 
     case (c: java.util.Calendar, DateType) =>
-      DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
+      DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
 
     case (c: java.util.Calendar, TimestampType) =>
       c.getTimeInMillis * 10000L
     case (t: java.sql.Timestamp, TimestampType) =>
-      DateUtils.fromJavaTimestamp(t)
+      DateTimeUtils.fromJavaTimestamp(t)
 
     case (_, udt: UserDefinedType[_]) =>
       fromJava(obj, udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 226b143923df6debf8deee9f6e013e5a1a82fd5c..8b4276b2c364cc00611a02af85860e4afe7a3eec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -22,13 +22,13 @@ import java.util.Properties
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{InternalRow, SpecificMutableRow}
-import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 
 /**
  * Data corresponding to one partition of a JDBCRDD.
@@ -383,10 +383,10 @@ private[sql] class JDBCRDD(
           conversions(i) match {
             case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos))
             case DateConversion =>
-              // DateUtils.fromJavaDate does not handle null value, so we need to check it.
+              // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
               val dateVal = rs.getDate(pos)
               if (dateVal != null) {
-                mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal))
+                mutableRow.setInt(i, DateTimeUtils.fromJavaDate(dateVal))
               } else {
                 mutableRow.update(i, null)
               }
@@ -421,7 +421,7 @@ private[sql] class JDBCRDD(
             case TimestampConversion =>
               val t = rs.getTimestamp(pos)
               if (t != null) {
-                mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t))
+                mutableRow.setLong(i, DateTimeUtils.fromJavaTimestamp(t))
               } else {
                 mutableRow.update(i, null)
               }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 817e8a20b34deee2ed9e6d62f4ccec5776d872a0..6222addc9aa3acfcd44aa4184fa44b0e37f3677d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -63,10 +63,10 @@ private[sql] object JacksonParser {
         null
 
       case (VALUE_STRING, DateType) =>
-        DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime)
+        DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
 
       case (VALUE_STRING, TimestampType) =>
-        DateUtils.stringToTime(parser.getText).getTime * 10000L
+        DateTimeUtils.stringToTime(parser.getText).getTime * 10000L
 
       case (VALUE_NUMBER_INT, TimestampType) =>
         parser.getLongValue * 10000L
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 44594c5080ff436abd6badf5cf638503c97b86db..73d9520d6f53ff9c8adb283d48cb7cd594ab219f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -393,8 +393,8 @@ private[sql] object JsonRDD extends Logging {
     value match {
       // only support string as date
       case value: java.lang.String =>
-        DateUtils.millisToDays(DateUtils.stringToTime(value).getTime)
-      case value: java.sql.Date => DateUtils.fromJavaDate(value)
+        DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(value).getTime)
+      case value: java.sql.Date => DateTimeUtils.fromJavaDate(value)
     }
   }
 
@@ -402,7 +402,7 @@ private[sql] object JsonRDD extends Logging {
     value match {
       case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L
       case value: java.lang.Long => value * 10000L
-      case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L
+      case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 10000L
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 4da5e96b82e3df01a6c0d6f88f82406f18ecab02..cf7aa44e4cd5509f3949b38602e5eb99543c1385 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -17,21 +17,19 @@
 
 package org.apache.spark.sql.parquet
 
-import java.sql.Timestamp
-import java.util.{TimeZone, Calendar}
+import java.nio.ByteOrder
 
-import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}
 
-import jodd.datetime.JDateTime
+import org.apache.parquet.Preconditions
 import org.apache.parquet.column.Dictionary
-import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
+import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.parquet.CatalystConverter.FieldType
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.parquet.timestamp.NanoTime
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -269,7 +267,12 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
    * Read a Timestamp value from a Parquet Int96Value
    */
   protected[parquet] def readTimestamp(value: Binary): Long = {
-    DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value))
+    Preconditions.checkArgument(value.length() == 12, "Must be 12 bytes")
+    val buf = value.toByteBuffer
+    buf.order(ByteOrder.LITTLE_ENDIAN)
+    val timeOfDayNanos = buf.getLong
+    val julianDay = buf.getInt
+    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
   }
 }
 
@@ -498,73 +501,6 @@ private[parquet] object CatalystArrayConverter {
   val INITIAL_ARRAY_SIZE = 20
 }
 
-private[parquet] object CatalystTimestampConverter {
-  // TODO most part of this comes from Hive-0.14
-  // Hive code might have some issues, so we need to keep an eye on it.
-  // Also we use NanoTime and Int96Values from parquet-examples.
-  // We utilize jodd to convert between NanoTime and Timestamp
-  val parquetTsCalendar = new ThreadLocal[Calendar]
-  def getCalendar: Calendar = {
-    // this is a cache for the calendar instance.
-    if (parquetTsCalendar.get == null) {
-      parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
-    }
-    parquetTsCalendar.get
-  }
-  val NANOS_PER_SECOND: Long = 1000000000
-  val SECONDS_PER_MINUTE: Long = 60
-  val MINUTES_PER_HOUR: Long = 60
-  val NANOS_PER_MILLI: Long = 1000000
-
-  def convertToTimestamp(value: Binary): Timestamp = {
-    val nt = NanoTime.fromBinary(value)
-    val timeOfDayNanos = nt.getTimeOfDayNanos
-    val julianDay = nt.getJulianDay
-    val jDateTime = new JDateTime(julianDay.toDouble)
-    val calendar = getCalendar
-    calendar.set(Calendar.YEAR, jDateTime.getYear)
-    calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
-    calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)
-
-    // written in command style
-    var remainder = timeOfDayNanos
-    calendar.set(
-      Calendar.HOUR_OF_DAY,
-      (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
-    remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
-    calendar.set(
-      Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
-    remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
-    calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
-    val nanos = remainder % NANOS_PER_SECOND
-    val ts = new Timestamp(calendar.getTimeInMillis)
-    ts.setNanos(nanos.toInt)
-    ts
-  }
-
-  def convertFromTimestamp(ts: Timestamp): Binary = {
-    val calendar = getCalendar
-    calendar.setTime(ts)
-    val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
-      calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
-    // Hive-0.14 didn't set hour before get day number, while the day number should
-    // has something to do with hour, since julian day number grows at 12h GMT
-    // here we just follow what hive does.
-    val julianDay = jDateTime.getJulianDayNumber
-
-    val hour = calendar.get(Calendar.HOUR_OF_DAY)
-    val minute = calendar.get(Calendar.MINUTE)
-    val second = calendar.get(Calendar.SECOND)
-    val nanos = ts.getNanos
-    // Hive-0.14 would use hours directly, that might be wrong, since the day starts
-    // from 12h in Julian. here we just follow what hive does.
-    val nanosOfDay = nanos + second * NANOS_PER_SECOND +
-      minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
-      hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
-    NanoTime(julianDay, nanosOfDay).toBinary
-  }
-}
-
 /**
  * A `parquet.io.api.GroupConverter` that converts a single-element groups that
  * match the characteristics of an array (see
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index a8775a2a8fd836bdffe7adc81d4b0f10d6b8a353..e65fa0030e179a8391eb50d51914c7cc25424e77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.parquet
 
+import java.nio.{ByteOrder, ByteBuffer}
 import java.util.{HashMap => JHashMap}
 
 import org.apache.hadoop.conf.Configuration
@@ -29,7 +30,7 @@ import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -298,7 +299,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
   }
 
   // Scratch array used to write decimals as fixed-length binary
-  private val scratchBytes = new Array[Byte](8)
+  private[this] val scratchBytes = new Array[Byte](8)
 
   private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
     val numBytes = ParquetTypesConverter.BYTES_FOR_PRECISION(precision)
@@ -313,10 +314,16 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
     writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
   }
 
+  // array used to write Timestamp as Int96 (fixed-length binary)
+  private[this] val int96buf = new Array[Byte](12)
+
   private[parquet] def writeTimestamp(ts: Long): Unit = {
-    val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(
-      DateUtils.toJavaTimestamp(ts))
-    writer.addBinary(binaryNanoTime)
+    val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
+    val buf = ByteBuffer.wrap(int96buf)
+    buf.order(ByteOrder.LITTLE_ENDIAN)
+    buf.putLong(timeOfDayNanos)
+    buf.putInt(julianDay)
+    writer.addBinary(Binary.fromByteArray(int96buf))
   }
 }
 
@@ -360,7 +367,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
       case FloatType => writer.addFloat(record.getFloat(index))
       case BooleanType => writer.addBoolean(record.getBoolean(index))
       case DateType => writer.addInteger(record.getInt(index))
-      case TimestampType => writeTimestamp(record(index).asInstanceOf[Long])
+      case TimestampType => writeTimestamp(record.getLong(index))
       case d: DecimalType =>
         if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
           sys.error(s"Unsupported datatype $d, cannot write to consumer")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
deleted file mode 100644
index 4d5ed211ad0c0f460007ec3a0c5ed603b7e914da..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet.timestamp
-
-import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.parquet.Preconditions
-import org.apache.parquet.io.api.{Binary, RecordConsumer}
-
-private[parquet] class NanoTime extends Serializable {
-  private var julianDay = 0
-  private var timeOfDayNanos = 0L
-
-  def set(julianDay: Int, timeOfDayNanos: Long): this.type = {
-    this.julianDay = julianDay
-    this.timeOfDayNanos = timeOfDayNanos
-    this
-  }
-
-  def getJulianDay: Int = julianDay
-
-  def getTimeOfDayNanos: Long = timeOfDayNanos
-
-  def toBinary: Binary = {
-    val buf = ByteBuffer.allocate(12)
-    buf.order(ByteOrder.LITTLE_ENDIAN)
-    buf.putLong(timeOfDayNanos)
-    buf.putInt(julianDay)
-    buf.flip()
-    Binary.fromByteBuffer(buf)
-  }
-
-  def writeValue(recordConsumer: RecordConsumer): Unit = {
-    recordConsumer.addBinary(toBinary)
-  }
-
-  override def toString: String =
-    "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
-}
-
-private[sql] object NanoTime {
-  def fromBinary(bytes: Binary): NanoTime = {
-    Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
-    val buf = bytes.toByteBuffer
-    buf.order(ByteOrder.LITTLE_ENDIAN)
-    val timeOfDayNanos = buf.getLong
-    val julianDay = buf.getInt
-    new NanoTime().set(julianDay, timeOfDayNanos)
-  }
-
-  def apply(julianDay: Int, timeOfDayNanos: Long): NanoTime = {
-    new NanoTime().set(julianDay, timeOfDayNanos)
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index c32d9f88dd6ee0bd1900c32504174b11a169ab90..8204a584179bbba205664242979b9e9394d32344 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -25,7 +25,7 @@ import org.scalactic.Tolerance._
 
 import org.apache.spark.sql.{QueryTest, Row, SQLConf}
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.json.InferSchema.compatibleType
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
@@ -76,26 +76,28 @@ class JsonSuite extends QueryTest with TestJsonData {
     checkTypePromotion(
       Decimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType.Unlimited))
 
-    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)),
         enforceCorrectType(intNumber, TimestampType))
-    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
         enforceCorrectType(intNumber.toLong, TimestampType))
     val strTime = "2014-09-30 12:34:56"
-    checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
         enforceCorrectType(strTime, TimestampType))
 
     val strDate = "2014-10-15"
     checkTypePromotion(
-      DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
+      DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
-    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
         enforceCorrectType(ISO8601Time1, TimestampType))
-    checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
+    checkTypePromotion(DateTimeUtils.millisToDays(3601000),
+      enforceCorrectType(ISO8601Time1, DateType))
     val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
-    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)),
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
         enforceCorrectType(ISO8601Time2, TimestampType))
-    checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
+    checkTypePromotion(DateTimeUtils.millisToDays(10801000),
+      enforceCorrectType(ISO8601Time2, DateType))
   }
 
   test("Get compatible type") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 284d99d4938d173d0feb82eed0f844747480f412..47a7be1c6a6646da3600ebe3689f9b20a20f24a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -37,7 +37,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 
 // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
@@ -137,7 +137,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     def makeDateRDD(): DataFrame =
       sqlContext.sparkContext
         .parallelize(0 to 1000)
-        .map(i => Tuple1(DateUtils.toJavaDate(i)))
+        .map(i => Tuple1(DateTimeUtils.toJavaDate(i)))
         .toDF()
         .select($"_1")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 48875773224c7a579e6b4369557bc70f8ef7f1ff..79eac930e54f74a1122f0ca9015f5d7e9ecca33b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.sql.sources
 
-import java.sql.{Timestamp, Date}
-
+import java.sql.{Date, Timestamp}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -84,8 +83,8 @@ case class AllDataTypesScan(
         i.toDouble,
         Decimal(new java.math.BigDecimal(i)),
         Decimal(new java.math.BigDecimal(i)),
-        DateUtils.fromJavaDate(new Date(1970, 1, 1)),
-        DateUtils.fromJavaTimestamp(new Timestamp(20000 + i)),
+        DateTimeUtils.fromJavaDate(new Date(1970, 1, 1)),
+        DateTimeUtils.fromJavaTimestamp(new Timestamp(20000 + i)),
         UTF8String.fromString(s"varchar_$i"),
         Seq(i, i + 1),
         Seq(Map(UTF8String.fromString(s"str_$i") -> InternalRow(i.toLong))),
@@ -93,7 +92,7 @@ case class AllDataTypesScan(
         Map(Map(UTF8String.fromString(s"str_$i") -> i.toFloat) -> InternalRow(i.toLong)),
         Row(i, i.toString),
         Row(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")),
-          InternalRow(Seq(DateUtils.fromJavaDate(new Date(1970, 1, i + 1))))))
+          InternalRow(Seq(DateTimeUtils.fromJavaDate(new Date(1970, 1, i + 1))))))
     }
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index d4f1ae8ee01d94921882dcbf216e76edf3982078..864c888ab073d65a4cac799f90871c2d50ad6b4b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.serde2.{io => hiveIo}
 import org.apache.hadoop.{io => hadoopIo}
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -273,7 +273,7 @@ private[hive] trait HiveInspectors {
       System.arraycopy(writable.getBytes, 0, temp, 0, temp.length)
       temp
     case poi: WritableConstantDateObjectInspector =>
-      DateUtils.fromJavaDate(poi.getWritableConstantValue.get())
+      DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get())
     case mi: StandardConstantMapObjectInspector =>
       // take the value from the map inspector object, rather than the input data
       mi.getWritableConstantValue.map { case (k, v) =>
@@ -313,13 +313,13 @@ private[hive] trait HiveInspectors {
         System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength())
         result
       case x: DateObjectInspector if x.preferWritable() =>
-        DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
-      case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
+        DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
+      case x: DateObjectInspector => DateTimeUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
       case x: TimestampObjectInspector if x.preferWritable() =>
         val t = x.getPrimitiveWritableObject(data)
         t.getSeconds * 10000000L + t.getNanos / 100
       case ti: TimestampObjectInspector =>
-        DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
+        DateTimeUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
       case _ => pi.getPrimitiveJavaObject(data)
     }
     case li: ListObjectInspector =>
@@ -356,10 +356,10 @@ private[hive] trait HiveInspectors {
       (o: Any) => HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal)
 
     case _: JavaDateObjectInspector =>
-      (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
+      (o: Any) => DateTimeUtils.toJavaDate(o.asInstanceOf[Int])
 
     case _: JavaTimestampObjectInspector =>
-      (o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long])
+      (o: Any) => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long])
 
     case soi: StandardStructObjectInspector =>
       val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
@@ -468,9 +468,9 @@ private[hive] trait HiveInspectors {
       case _: BinaryObjectInspector if x.preferWritable() => getBinaryWritable(a)
       case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]]
       case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
-      case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
+      case _: DateObjectInspector => DateTimeUtils.toJavaDate(a.asInstanceOf[Int])
       case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a)
-      case _: TimestampObjectInspector => DateUtils.toJavaTimestamp(a.asInstanceOf[Long])
+      case _: TimestampObjectInspector => DateTimeUtils.toJavaTimestamp(a.asInstanceOf[Long])
     }
     case x: SettableStructObjectInspector =>
       val fieldRefs = x.getAllStructFieldRefs
@@ -781,7 +781,7 @@ private[hive] trait HiveInspectors {
     if (value == null) {
       null
     } else {
-      new hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long]))
+      new hiveIo.TimestampWritable(DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]))
     }
 
   private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 439f39bafc926dd54be9e51390a45d582c1c21e7..00e61e35d4354da32d5dac3ea8d6b18290e89eb3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
 
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
@@ -362,10 +362,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
             row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
         case oi: TimestampObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
-            row.setLong(ordinal, DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
+            row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
         case oi: DateObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
-            row.setInt(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
+            row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
         case oi: BinaryObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
             row.update(ordinal, oi.getPrimitiveJavaObject(value))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8b928861fcc704fdf23921f7e0be7b4970e071cb..ab75b12e2a2e7d5b1e01d7cb2c8ea17f3b0947cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.common.FileUtils
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.sql.Row
 import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableJobConf
@@ -201,7 +201,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
     def convertToHiveRawString(col: String, value: Any): String = {
       val raw = String.valueOf(value)
       schema(col).dataType match {
-        case DateType => DateUtils.toString(raw.toInt)
+        case DateType => DateTimeUtils.toString(raw.toInt)
         case _: DecimalType => BigDecimal(raw).toString()
         case _ => raw
       }