diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 2b93882919487186c859504debc90eca5956a79b..157ac2ba24ca78ed0b0930f62cc1be6fa531ba9a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -89,8 +89,8 @@ object DateTimeUtils {
 
   // reverse of millisToDays
   def daysToMillis(days: SQLDate): Long = {
-    val millisUtc = days.toLong * MILLIS_PER_DAY
-    millisUtc - threadLocalLocalTimeZone.get().getOffset(millisUtc)
+    val millisLocal = days.toLong * MILLIS_PER_DAY
+    millisLocal - getOffsetFromLocalMillis(millisLocal, threadLocalLocalTimeZone.get())
   }
 
   def dateToString(days: SQLDate): String =
@@ -819,6 +819,41 @@ object DateTimeUtils {
     }
   }
 
+  /**
+   * Lookup the offset for given millis seconds since 1970-01-01 00:00:00 in given timezone.
+   */
+  private def getOffsetFromLocalMillis(millisLocal: Long, tz: TimeZone): Long = {
+    var guess = tz.getRawOffset
+    // the actual offset should be calculated based on milliseconds in UTC
+    val offset = tz.getOffset(millisLocal - guess)
+    if (offset != guess) {
+      guess = tz.getOffset(millisLocal - offset)
+      if (guess != offset) {
+        // fallback to do the reverse lookup using java.sql.Timestamp
+        // this should only happen near the start or end of DST
+        val days = Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
+        val year = getYear(days)
+        val month = getMonth(days)
+        val day = getDayOfMonth(days)
+
+        var millisOfDay = (millisLocal % MILLIS_PER_DAY).toInt
+        if (millisOfDay < 0) {
+          millisOfDay += MILLIS_PER_DAY.toInt
+        }
+        val seconds = (millisOfDay / 1000L).toInt
+        val hh = seconds / 3600
+        val mm = seconds / 60 % 60
+        val ss = seconds % 60
+        val nano = millisOfDay % 1000 * 1000000
+
+        // create a Timestamp to get the unix timestamp (in UTC)
+        val timestamp = new Timestamp(year - 1900, month - 1, day, hh, mm, ss, nano)
+        guess = (millisLocal - timestamp.getTime).toInt
+      }
+    }
+    guess
+  }
+
   /**
    * Returns a timestamp of given timezone from utc timestamp, with the same string
    * representation in their timezone.
@@ -835,7 +870,16 @@ object DateTimeUtils {
    */
   def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
     val tz = TimeZone.getTimeZone(timeZone)
-    val offset = tz.getOffset(time / 1000L)
+    val offset = getOffsetFromLocalMillis(time / 1000L, tz)
     time - offset * 1000L
   }
+
+  /**
+   * Re-initialize the current thread's thread locals. Exposed for testing.
+   */
+  private[util] def resetThreadLocals(): Unit = {
+    threadLocalLocalTimeZone.remove()
+    threadLocalTimestampFormat.remove()
+    threadLocalDateFormat.remove()
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
index 1d73e40ffcd36679f833221b3a10eabddc437ead..2c966230e447e65dff44ede69fe524bae3870f7f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.ScalaReflectionLock
  *
  * Please use the singleton [[DataTypes.DateType]].
  *
- * Internally, this is represented as the number of days from epoch (1970-01-01 00:00:00 UTC).
+ * Internally, this is represented as the number of days from 1970-01-01.
  */
 @DeveloperApi
 class DateType private() extends AtomicType {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0c1feb3aa0882f088f956a04709c20a073717056
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.util.TimeZone
+
+/**
+ * Helper functions for testing date and time functionality.
+ */
+object DateTimeTestUtils {
+
+  val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone)
+
+  def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = {
+    val originalDefaultTimeZone = TimeZone.getDefault
+    try {
+      DateTimeUtils.resetThreadLocals()
+      TimeZone.setDefault(newDefaultTimeZone)
+      block
+    } finally {
+      TimeZone.setDefault(originalDefaultTimeZone)
+      DateTimeUtils.resetThreadLocals()
+    }
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 0ce5a2fb69505dca9315d7a4fa553b58cd26456b..6660453da7d4a9bd97bf7b969f37b8e7674a70e9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -476,6 +476,13 @@ class DateTimeUtilsSuite extends SparkFunSuite {
     test("2011-12-25 09:00:00.123456", "JST", "2011-12-25 18:00:00.123456")
     test("2011-12-25 09:00:00.123456", "PST", "2011-12-25 01:00:00.123456")
     test("2011-12-25 09:00:00.123456", "Asia/Shanghai", "2011-12-25 17:00:00.123456")
+
+    // Daylight Saving Time
+    test("2016-03-13 09:59:59.0", "PST", "2016-03-13 01:59:59.0")
+    test("2016-03-13 10:00:00.0", "PST", "2016-03-13 03:00:00.0")
+    test("2016-11-06 08:59:59.0", "PST", "2016-11-06 01:59:59.0")
+    test("2016-11-06 09:00:00.0", "PST", "2016-11-06 01:00:00.0")
+    test("2016-11-06 10:00:00.0", "PST", "2016-11-06 02:00:00.0")
   }
 
   test("to UTC timestamp") {
@@ -487,5 +494,38 @@ class DateTimeUtilsSuite extends SparkFunSuite {
     test("2011-12-25 18:00:00.123456", "JST", "2011-12-25 09:00:00.123456")
     test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456")
     test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456")
+
+    // Daylight Saving Time
+    test("2016-03-13 01:59:59", "PST", "2016-03-13 09:59:59.0")
+    // 2016-03-13 02:00:00 PST does not exists
+    test("2016-03-13 02:00:00", "PST", "2016-03-13 10:00:00.0")
+    test("2016-03-13 03:00:00", "PST", "2016-03-13 10:00:00.0")
+    test("2016-11-06 00:59:59", "PST", "2016-11-06 07:59:59.0")
+    // 2016-11-06 01:00:00 PST could be 2016-11-06 08:00:00 UTC or 2016-11-06 09:00:00 UTC
+    test("2016-11-06 01:00:00", "PST", "2016-11-06 09:00:00.0")
+    test("2016-11-06 01:59:59", "PST", "2016-11-06 09:59:59.0")
+    test("2016-11-06 02:00:00", "PST", "2016-11-06 10:00:00.0")
+  }
+
+  test("daysToMillis and millisToDays") {
+    // There are some days are skipped entirely in some timezone, skip them here.
+    val skipped_days = Map[String, Int](
+      "Kwajalein" -> 8632,
+      "Pacific/Apia" -> 15338,
+      "Pacific/Enderbury" -> 9131,
+      "Pacific/Fakaofo" -> 15338,
+      "Pacific/Kiritimati" -> 9131,
+      "Pacific/Kwajalein" -> 8632,
+      "MIT" -> 15338)
+    for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
+      DateTimeTestUtils.withDefaultTimeZone(tz) {
+        val skipped = skipped_days.getOrElse(tz.getID, Int.MinValue)
+        (-20000 to 20000).foreach { d =>
+          if (d != skipped) {
+            assert(millisToDays(daysToMillis(d)) === d)
+          }
+        }
+      }
+    }
   }
 }