Skip to content
Snippets Groups Projects
Commit 9fb6b832 authored by Daoyuan Wang's avatar Daoyuan Wang Committed by Reynold Xin
Browse files

[SPARK-8192] [SPARK-8193] [SQL] udf current_date, current_timestamp

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #6985 from adrian-wang/udfcurrent and squashes the following commits:

6a20b64 [Daoyuan Wang] remove codegen and add lazy in testsuite
27c9f95 [Daoyuan Wang] refine tests..
e11ae75 [Daoyuan Wang] refine tests
61ed3d5 [Daoyuan Wang] add in functions
98e8550 [Daoyuan Wang] fix sytle
427d9dc [Daoyuan Wang] add tests and codegen
0b69a1f [Daoyuan Wang] udf current
parent 4a22bce8
No related branches found
No related tags found
No related merge requests found
......@@ -163,7 +163,11 @@ object FunctionRegistry {
expression[Substring]("substring"),
expression[Upper]("ucase"),
expression[UnHex]("unhex"),
expression[Upper]("upper")
expression[Upper]("upper"),
// datetime functions
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp")
)
val builtin: FunctionRegistry = {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
/**
* Returns the current date at the start of query evaluation.
* All calls of current_date within the same query return the same value.
*/
case class CurrentDate() extends LeafExpression {
override def foldable: Boolean = true
override def nullable: Boolean = false
override def dataType: DataType = DateType
override def eval(input: InternalRow): Any = {
DateTimeUtils.millisToDays(System.currentTimeMillis())
}
}
/**
* Returns the current timestamp at the start of query evaluation.
* All calls of current_timestamp within the same query return the same value.
*/
case class CurrentTimestamp() extends LeafExpression {
override def foldable: Boolean = true
override def nullable: Boolean = false
override def dataType: DataType = TimestampType
override def eval(input: InternalRow): Any = {
System.currentTimeMillis() * 10000L
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils
class DatetimeFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("datetime function current_date") {
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())
assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)
}
test("datetime function current_timestamp") {
val ct = DateTimeUtils.toJavaTimestamp(CurrentTimestamp().eval(EmptyRow).asInstanceOf[Long])
val t1 = System.currentTimeMillis()
assert(math.abs(t1 - ct.getTime) < 5000)
}
}
......@@ -35,6 +35,7 @@ import org.apache.spark.util.Utils
*
* @groupname udf_funcs UDF functions
* @groupname agg_funcs Aggregate functions
* @groupname datetime_funcs Date time functions
* @groupname sort_funcs Sorting functions
* @groupname normal_funcs Non-aggregate functions
* @groupname math_funcs Math functions
......@@ -991,6 +992,22 @@ object functions {
*/
def cosh(columnName: String): Column = cosh(Column(columnName))
/**
* Returns the current date.
*
* @group datetime_funcs
* @since 1.5.0
*/
def current_date(): Column = CurrentDate()
/**
* Returns the current timestamp.
*
* @group datetime_funcs
* @since 1.5.0
*/
def current_timestamp(): Column = CurrentTimestamp()
/**
* Computes the exponential of the given value.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
class DatetimeExpressionsSuite extends QueryTest {
private lazy val ctx = org.apache.spark.sql.test.TestSQLContext
import ctx.implicits._
lazy val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
test("function current_date") {
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
val d2 = DateTimeUtils.fromJavaDate(
ctx.sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
}
test("function current_timestamp") {
checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))
// Execution in one query should return the same value
checkAnswer(ctx.sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""),
Row(true))
assert(math.abs(ctx.sql("""SELECT CURRENT_TIMESTAMP()""").collect().head.getTimestamp(
0).getTime - System.currentTimeMillis()) < 5000)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment