diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 7c16ba30720b3a8159ec8b6811780ece64a652d2..625c797f8a6881ada16a899c2506282fbe4d8844 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -325,6 +325,7 @@ exportMethods("%in%", "toDegrees", "toRadians", "to_date", + "to_timestamp", "to_utc_timestamp", "translate", "trim", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 6ffa0f5481c650a6e7e4d5e05b85b36a03007ec9..032cfecfc0017f8ab7b2e1b9c077cafe1a141c68 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1730,24 +1730,90 @@ setMethod("toRadians", #' to_date #' -#' Converts the column into DateType. +#' Converts the column into a DateType. You may optionally specify a format +#' according to the rules in: +#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}. +#' If the string cannot be parsed according to the specified format (or default), +#' the value of the column will be null. +#' The default format is 'yyyy-MM-dd'. #' -#' @param x Column to compute on. +#' @param x Column to parse. +#' @param format string to use to parse x Column to DateType. (optional) #' #' @rdname to_date #' @name to_date #' @family datetime_funcs -#' @aliases to_date,Column-method +#' @aliases to_date,Column,missing-method #' @export -#' @examples \dontrun{to_date(df$c)} -#' @note to_date since 1.5.0 +#' @examples +#' \dontrun{ +#' to_date(df$c) +#' to_date(df$c, 'yyyy-MM-dd') +#' } +#' @note to_date(Column) since 1.5.0 setMethod("to_date", - signature(x = "Column"), - function(x) { + signature(x = "Column", format = "missing"), + function(x, format) { jc <- callJStatic("org.apache.spark.sql.functions", "to_date", x@jc) column(jc) }) +#' @rdname to_date +#' @name to_date +#' @family datetime_funcs +#' @aliases to_date,Column,character-method +#' @export +#' @note to_date(Column, character) since 2.2.0 +setMethod("to_date", + signature(x = "Column", format = "character"), + function(x, format) { + jc <- callJStatic("org.apache.spark.sql.functions", "to_date", x@jc, format) + column(jc) + }) + +#' to_timestamp +#' +#' Converts the column into a TimestampType. You may optionally specify a format +#' according to the rules in: +#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}. +#' If the string cannot be parsed according to the specified format (or default), +#' the value of the column will be null. +#' The default format is 'yyyy-MM-dd HH:mm:ss'. +#' +#' @param x Column to parse. +#' @param format string to use to parse x Column to DateType. (optional) +#' +#' @rdname to_timestamp +#' @name to_timestamp +#' @family datetime_funcs +#' @aliases to_timestamp,Column,missing-method +#' @export +#' @examples +#' \dontrun{ +#' to_timestamp(df$c) +#' to_timestamp(df$c, 'yyyy-MM-dd') +#' } +#' @note to_timestamp(Column) since 2.2.0 +setMethod("to_timestamp", + signature(x = "Column", format = "missing"), + function(x, format) { + jc <- callJStatic("org.apache.spark.sql.functions", "to_timestamp", x@jc) + column(jc) + }) + +#' @rdname to_timestamp +#' @name to_timestamp +#' @family datetime_funcs +#' @aliases to_timestamp,Column,character-method +#' @export +#' @note to_timestamp(Column, character) since 2.2.0 +setMethod("to_timestamp", + signature(x = "Column", format = "character"), + function(x, format) { + jc <- callJStatic("org.apache.spark.sql.functions", "to_timestamp", x@jc, format) + column(jc) + }) + #' trim #' #' Trim the spaces from both ends for the specified string column. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 0307bac349ec13420d8858d063118337f45b3f65..d78b1a10d6b42698de7dfb17eef3cf0bc89c647c 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1256,7 +1256,11 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") }) #' @rdname to_date #' @export -setGeneric("to_date", function(x) { standardGeneric("to_date") }) +setGeneric("to_date", function(x, format) { standardGeneric("to_date") }) + +#' @rdname to_timestamp +#' @export +setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") }) #' @rdname to_utc_timestamp #' @export diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 418f128ce8e8aa3e9446af5518557f6f76f60774..233a20c3d3866b68fccbf10400432e467956378f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1237,6 +1237,7 @@ test_that("column functions", { c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1") c18 <- covar_pop(c, c1) + covar_pop("c", "c1") c19 <- spark_partition_id() + c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy") # Test if base::is.nan() is exposed expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE)) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 02c2350dc2d65b5076d9bd82178bdca2ccc77eff..40727ab12b4ea002d5b702e59c711ef6b2771e0a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -143,6 +143,12 @@ _functions_2_1 = { 'measured in radians.', } +_functions_2_2 = { + 'to_date': 'Converts a string date into a DateType using the (optionally) specified format.', + 'to_timestamp': 'Converts a string timestamp into a timestamp type using the ' + + '(optionally) specified format.', +} + # math functions that take two arguments as input _binary_mathfunctions = { 'atan2': 'Returns the angle theta from the conversion of rectangular coordinates (x, y) to' + @@ -976,18 +982,52 @@ def months_between(date1, date2): return Column(sc._jvm.functions.months_between(_to_java_column(date1), _to_java_column(date2))) -@since(1.5) -def to_date(col): - """ - Converts the column of :class:`pyspark.sql.types.StringType` or - :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`. +@since(2.2) +def to_date(col, format=None): + """Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or + :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType` + using the optionally specified format. Default format is 'yyyy-MM-dd'. + Specify formats according to + `SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_. >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(to_date(df.t).alias('date')).collect() [Row(date=datetime.date(1997, 2, 28))] + + >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t']) + >>> df.select(to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect() + [Row(date=datetime.date(1997, 2, 28))] """ sc = SparkContext._active_spark_context - return Column(sc._jvm.functions.to_date(_to_java_column(col))) + if format is None: + jc = sc._jvm.functions.to_date(_to_java_column(col)) + else: + jc = sc._jvm.functions.to_date(_to_java_column(col), format) + return Column(jc) + + +@since(2.2) +def to_timestamp(col, format=None): + """Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or + :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType` + using the optionally specified format. Default format is 'yyyy-MM-dd HH:mm:ss'. Specify + formats according to + `SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_. + + >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t']) + >>> df.select(to_timestamp(df.t).alias('dt')).collect() + [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))] + + >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t']) + >>> df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect() + [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))] + """ + sc = SparkContext._active_spark_context + if format is None: + jc = sc._jvm.functions.to_timestamp(_to_java_column(col)) + else: + jc = sc._jvm.functions.to_timestamp(_to_java_column(col), format) + return Column(jc) @since(1.5) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 86cad4b363c4c2de7b07ed24d8f040b45a30740b..710585cbe291ad1fce4b717ba24f8a267c1374dd 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1807,6 +1807,8 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue("+" in functions) self.assertTrue("like" in functions) self.assertTrue("month" in functions) + self.assertTrue("to_date" in functions) + self.assertTrue("to_timestamp" in functions) self.assertTrue("to_unix_timestamp" in functions) self.assertTrue("current_database" in functions) self.assertEquals(functions["+"], Function( @@ -2189,6 +2191,13 @@ class HiveContextSQLTests(ReusedPySparkTestCase): # Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n) assert_runs_only_one_job_stage_and_task("collect_limit", lambda: df.limit(1).collect()) + def test_datetime_functions(self): + from pyspark.sql import functions + from datetime import date, datetime + df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol") + parse_result = df.select(functions.to_date(functions.col("dateCol"))).first() + self.assertEquals(date(2017, 1, 22), parse_result['to_date(dateCol)']) + @unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't support mocking") def test_unbounded_frames(self): from unittest.mock import patch diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index eea3740be8a902ae91bdbd03bc17c664c1f1144f..9c9465f6b8defbb570075b5746a8ff53ec5c340f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -345,7 +345,8 @@ object FunctionRegistry { expression[CurrentTimestamp]("now"), expression[Quarter]("quarter"), expression[Second]("second"), - expression[ToDate]("to_date"), + expression[ParseToTimestamp]("to_timestamp"), + expression[ParseToDate]("to_date"), expression[ToUnixTimestamp]("to_unix_timestamp"), expression[ToUTCTimestamp]("to_utc_timestamp"), expression[TruncDate]("trunc"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index bad8a7123017bec2ca7104925aaf5e4b9f74ac7e..f8fe774823e5bb60a9c82128e0062d9a00ad3b36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -24,6 +24,7 @@ import java.util.{Calendar, TimeZone} import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ @@ -1169,6 +1170,69 @@ case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastIn override def prettyName: String = "to_date" } +/** + * Parses a column to a date based on the given format. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(date_str, fmt) - Parses the `left` expression with the `fmt` expression. Returns null with invalid input.", + extended = """ + Examples: + > SELECT _FUNC_('2016-12-31', 'yyyy-MM-dd'); + 2016-12-31 + """) +// scalastyle:on line.size.limit +case class ParseToDate(left: Expression, format: Option[Expression], child: Expression) + extends RuntimeReplaceable { + + def this(left: Expression, format: Expression) { + this(left, Option(format), + Cast(Cast(UnixTimestamp(left, format), TimestampType), DateType)) + } + + def this(left: Expression) = { + // backwards compatability + this(left, Option(null), ToDate(left)) + } + + override def flatArguments: Iterator[Any] = Iterator(left, format) + override def sql: String = { + if (format.isDefined) { + s"$prettyName(${left.sql}, ${format.get.sql}" + } else { + s"$prettyName(${left.sql})" + } + } + + override def prettyName: String = "to_date" +} + +/** + * Parses a column to a timestamp based on the supplied format. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(timestamp, fmt) - Parses the `left` expression with the `format` expression to a timestamp. Returns null with invalid input.", + extended = """ + Examples: + > SELECT _FUNC_('2016-12-31', 'yyyy-MM-dd'); + 2016-12-31 00:00:00.0 + """) +// scalastyle:on line.size.limit +case class ParseToTimestamp(left: Expression, format: Expression, child: Expression) + extends RuntimeReplaceable { + + def this(left: Expression, format: Expression) = { + this(left, format, Cast(UnixTimestamp(left, format), TimestampType)) +} + + override def flatArguments: Iterator[Any] = Iterator(left, format) + override def sql: String = s"$prettyName(${left.sql}, ${format.sql})" + + override def prettyName: String = "to_timestamp" + override def dataType: DataType = TimestampType +} + /** * Returns date truncated to the unit specified by the format. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5e27484c110473a81f1eb03b669ba0824449c493..24ed906d33683c28c67c0f67b2b02562499cb2f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2664,6 +2664,27 @@ object functions { */ def unix_timestamp(s: Column, p: String): Column = withExpr {UnixTimestamp(s.expr, Literal(p)) } + /** + * Convert time string to a Unix timestamp (in seconds). + * Uses the pattern "yyyy-MM-dd HH:mm:ss" and will return null on failure. + * @group datetime_funcs + * @since 2.2.0 + */ + def to_timestamp(s: Column): Column = withExpr { + new ParseToTimestamp(s.expr, Literal("yyyy-MM-dd HH:mm:ss")) + } + + /** + * Convert time string to a Unix timestamp (in seconds) with a specified format + * (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]) + * to Unix timestamp (in seconds), return null if fail. + * @group datetime_funcs + * @since 2.2.0 + */ + def to_timestamp(s: Column, fmt: String): Column = withExpr { + new ParseToTimestamp(s.expr, Literal(fmt)) + } + /** * Converts the column into DateType. * @@ -2672,6 +2693,18 @@ object functions { */ def to_date(e: Column): Column = withExpr { ToDate(e.expr) } + /** + * Converts the column into a DateType with a specified format + * (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]) + * return null if fail. + * + * @group datetime_funcs + * @since 2.2.0 + */ + def to_date(e: Column, fmt: String): Column = withExpr { + new ParseToDate(e.expr, Literal(fmt)) + } + /** * Returns date truncated to the unit specified by the format. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index e05b2252ee34629df804cc3f7d21e2206ab526f0..618db434468ca718bb864ecd0bfd27cfce6a97fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -354,31 +354,58 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { test("function to_date") { val d1 = Date.valueOf("2015-07-22") val d2 = Date.valueOf("2015-07-01") + val d3 = Date.valueOf("2014-12-31") val t1 = Timestamp.valueOf("2015-07-22 10:00:00") val t2 = Timestamp.valueOf("2014-12-31 23:59:59") + val t3 = Timestamp.valueOf("2014-12-31 23:59:59") val s1 = "2015-07-22 10:00:00" val s2 = "2014-12-31" - val df = Seq((d1, t1, s1), (d2, t2, s2)).toDF("d", "t", "s") + val s3 = "2014-31-12" + val df = Seq((d1, t1, s1), (d2, t2, s2), (d3, t3, s3)).toDF("d", "t", "s") checkAnswer( df.select(to_date(col("t"))), - Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), + Row(Date.valueOf("2014-12-31")))) checkAnswer( df.select(to_date(col("d"))), - Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")))) + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), + Row(Date.valueOf("2014-12-31")))) checkAnswer( df.select(to_date(col("s"))), - Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), Row(null))) checkAnswer( df.selectExpr("to_date(t)"), - Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), + Row(Date.valueOf("2014-12-31")))) checkAnswer( df.selectExpr("to_date(d)"), - Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")))) + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), + Row(Date.valueOf("2014-12-31")))) checkAnswer( df.selectExpr("to_date(s)"), - Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), Row(null))) + + // Now with format + checkAnswer( + df.select(to_date(col("t"), "yyyy-MM-dd")), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), + Row(Date.valueOf("2014-12-31")))) + checkAnswer( + df.select(to_date(col("d"), "yyyy-MM-dd")), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), + Row(Date.valueOf("2014-12-31")))) + checkAnswer( + df.select(to_date(col("s"), "yyyy-MM-dd")), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), + Row(Date.valueOf("2016-07-12")))) + + // now switch format + checkAnswer( + df.select(to_date(col("s"), "yyyy-dd-MM")), + Seq(Row(Date.valueOf("2016-10-07")), Row(Date.valueOf("2016-07-12")), + Row(Date.valueOf("2014-12-31")))) } test("function trunc") { @@ -475,6 +502,33 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) } + + test("to_timestamp") { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") + val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + + checkAnswer(df.select(to_timestamp(col("ss"))), + df.select(unix_timestamp(col("ss")).cast("timestamp"))) + checkAnswer(df.select(to_timestamp(col("ss"))), Seq( + Row(ts1), Row(ts2))) + checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq( + Row(ts1), Row(ts2))) + checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq( + Row(ts1), Row(ts2))) + checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq( + Row(ts_date1), Row(ts_date2))) + } + test("datediff") { val df = Seq( (Date.valueOf("2015-07-24"), Timestamp.valueOf("2015-07-24 01:00:00"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala index df9390aec7f7e484b5f457abb3993143ca750f9e..1daa6f782229134778db5a85e4845bc4cc17e6f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala @@ -253,6 +253,7 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkSqlGeneration("SELECT count(now())") checkSqlGeneration("SELECT quarter('2001-05-02')") checkSqlGeneration("SELECT second('11:35:55')") + checkSqlGeneration("SELECT to_timestamp('2001-10-30 10:30:00', 'yyyy-MM-dd HH:mm:ss')") checkSqlGeneration("SELECT to_date('2001-10-30 10:30:00')") checkSqlGeneration("SELECT to_unix_timestamp('2015-07-24 00:00:00', 'yyyy-MM-dd HH:mm:ss')") checkSqlGeneration("SELECT to_utc_timestamp('2015-07-24 00:00:00', 'PST')")