From 39b1e36fbc284ba999e1c00b20d1b0b5de6b40b2 Mon Sep 17 00:00:00 2001
From: Daoyuan Wang <daoyuan.wang@intel.com>
Date: Wed, 11 Nov 2015 20:36:21 -0800
Subject: [PATCH] [SPARK-11396] [SQL] add native implementation of datetime
 function to_unix_timestamp

`to_unix_timestamp` is the deterministic version of `unix_timestamp`, as it accepts at least one parameters.

Since the behavior here is quite similar to `unix_timestamp`, I think the dataframe API is not necessary here.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #9347 from adrian-wang/to_unix_timestamp.
---
 .../catalyst/analysis/FunctionRegistry.scala  |  1 +
 .../expressions/datetimeExpressions.scala     | 24 ++++++++++---
 .../expressions/DateExpressionsSuite.scala    | 36 +++++++++++++++++++
 .../apache/spark/sql/DateFunctionsSuite.scala | 21 +++++++++++
 4 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index dfa749d1af..870808aa56 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -262,6 +262,7 @@ object FunctionRegistry {
     expression[Quarter]("quarter"),
     expression[Second]("second"),
     expression[ToDate]("to_date"),
+    expression[ToUnixTimestamp]("to_unix_timestamp"),
     expression[ToUTCTimestamp]("to_utc_timestamp"),
     expression[TruncDate]("trunc"),
     expression[UnixTimestamp]("unix_timestamp"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 13cc6bb6f2..03c39f8404 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -299,7 +299,20 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
 }
 
 /**
- * Converts time string with given pattern
+ * Converts time string with given pattern.
+ * Deterministic version of [[UnixTimestamp]], must have at least one parameter.
+ */
+case class ToUnixTimestamp(timeExp: Expression, format: Expression) extends UnixTime {
+  override def left: Expression = timeExp
+  override def right: Expression = format
+
+  def this(time: Expression) = {
+    this(time, Literal("yyyy-MM-dd HH:mm:ss"))
+  }
+}
+
+/**
+ * Converts time string with given pattern.
  * (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
  * to Unix time stamp (in seconds), returns null if fail.
  * Note that hive Language Manual says it returns 0 if fail, but in fact it returns null.
@@ -308,9 +321,7 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
  * If the first parameter is a Date or Timestamp instead of String, we will ignore the
  * second parameter.
  */
-case class UnixTimestamp(timeExp: Expression, format: Expression)
-  extends BinaryExpression with ExpectsInputTypes {
-
+case class UnixTimestamp(timeExp: Expression, format: Expression) extends UnixTime {
   override def left: Expression = timeExp
   override def right: Expression = format
 
@@ -321,6 +332,9 @@ case class UnixTimestamp(timeExp: Expression, format: Expression)
   def this() = {
     this(CurrentTimestamp())
   }
+}
+
+abstract class UnixTime extends BinaryExpression with ExpectsInputTypes {
 
   override def inputTypes: Seq[AbstractDataType] =
     Seq(TypeCollection(StringType, DateType, TimestampType), StringType)
@@ -347,7 +361,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression)
             null
           }
         case StringType =>
-          val f = format.eval(input)
+          val f = right.eval(input)
           if (f == null) {
             null
           } else {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 610d39e849..53c66d8a75 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -465,6 +465,42 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
   }
 
+  test("to_unix_timestamp") {
+    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+    val sdf2 = new SimpleDateFormat(fmt2)
+    val fmt3 = "yy-MM-dd"
+    val sdf3 = new SimpleDateFormat(fmt3)
+    val date1 = Date.valueOf("2015-07-24")
+    checkEvaluation(
+      ToUnixTimestamp(Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss")), 0L)
+    checkEvaluation(ToUnixTimestamp(
+      Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
+    checkEvaluation(
+      ToUnixTimestamp(Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
+    checkEvaluation(
+      ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss")),
+      DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1)) / 1000L)
+    checkEvaluation(
+      ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2)), -1000L)
+    checkEvaluation(ToUnixTimestamp(
+      Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3)),
+      DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24"))) / 1000L)
+    val t1 = ToUnixTimestamp(
+      CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+    val t2 = ToUnixTimestamp(
+      CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+    assert(t2 - t1 <= 1)
+    checkEvaluation(
+      ToUnixTimestamp(Literal.create(null, DateType), Literal.create(null, StringType)), null)
+    checkEvaluation(
+      ToUnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss")), null)
+    checkEvaluation(ToUnixTimestamp(
+      Literal(date1), Literal.create(null, StringType)), date1.getTime / 1000L)
+    checkEvaluation(
+      ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
+  }
+
   test("datediff") {
     checkEvaluation(
       DateDiff(Literal(Date.valueOf("2015-07-24")), Literal(Date.valueOf("2015-07-21"))), 3)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 9080c53c49..1266d534cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -444,6 +444,27 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext {
       Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
   }
 
+  test("to_unix_timestamp") {
+    val date1 = Date.valueOf("2015-07-24")
+    val date2 = Date.valueOf("2015-07-25")
+    val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
+    val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
+    val s1 = "2015/07/24 10:00:00.5"
+    val s2 = "2015/07/25 02:02:02.6"
+    val ss1 = "2015-07-24 10:00:00"
+    val ss2 = "2015-07-25 02:02:02"
+    val fmt = "yyyy/MM/dd HH:mm:ss.S"
+    val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss")
+    checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq(
+      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+    checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq(
+      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+    checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq(
+      Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+    checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq(
+      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+  }
+
   test("datediff") {
     val df = Seq(
       (Date.valueOf("2015-07-24"), Timestamp.valueOf("2015-07-24 01:00:00"),
-- 
GitLab