Skip to content
Snippets Groups Projects
Commit 1146c534 authored by Burak Yavuz's avatar Burak Yavuz Committed by Davies Liu
Browse files

[SPARK-14353] Dataset Time Window `window` API for R

## What changes were proposed in this pull request?

The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008).
This PR adds the R API for this function.

With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
 - `window(timeColumn, windowDuration)`
 - `window(timeColumn, windowDuration, slideDuration)`
 - `window(timeColumn, windowDuration, slideDuration, startTime)`

In Python and R, users can access all APIs above, but in addition they can do
 - In R:
   `window(timeColumn, windowDuration, startTime=...)`

that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.

## How was this patch tested?

Unit tests + manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12141 from brkyvz/R-windows.
parent 48682f6b
No related branches found
No related tags found
No related merge requests found
......@@ -265,6 +265,7 @@ exportMethods("%in%",
"var_samp",
"weekofyear",
"when",
"window",
"year")
exportClasses("GroupedData")
......
......@@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
column(jc)
})
#' window
#'
#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
#' the order of months are not supported.
#'
#' The time column must be of TimestampType.
#'
#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
#' If the `slideDuration` is not provided, the windows will be tumbling windows.
#'
#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
#'
#' The output column will be a struct called 'window' by default with the nested columns 'start'
#' and 'end'.
#'
#' @family datetime_funcs
#' @rdname window
#' @name window
#' @export
#' @examples
#'\dontrun{
#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
#' window(df$time, "1 minute", "15 seconds", "10 seconds")
#'
#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
#' # 09:01:15-09:02:15...
#' window(df$time, "1 minute", startTime = "15 seconds")
#'
#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
#' window(df$time, "30 seconds", "10 seconds")
#'}
setMethod("window", signature(x = "Column"),
function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
stopifnot(is.character(windowDuration))
if (!is.null(slideDuration) && !is.null(startTime)) {
stopifnot(is.character(slideDuration) && is.character(startTime))
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration, slideDuration, startTime)
} else if (!is.null(slideDuration)) {
stopifnot(is.character(slideDuration))
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration, slideDuration)
} else if (!is.null(startTime)) {
stopifnot(is.character(startTime))
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration, windowDuration, startTime)
} else {
jc <- callJStatic("org.apache.spark.sql.functions",
"window",
x@jc, windowDuration)
}
column(jc)
})
#' locate
#'
#' Locate the position of the first occurrence of substr.
......
......@@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
#' @export
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
#' @rdname window
#' @export
setGeneric("window", function(x, ...) { standardGeneric("window") })
#' @rdname year
#' @export
setGeneric("year", function(x) { standardGeneric("year") })
......
......@@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop")
"summary", "transform", "drop", "window")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
......
......@@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", {
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
})
test_that("time windowing (window()) with all inputs", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})
test_that("time windowing (window()) with slide duration", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1, 1))
})
test_that("time windowing (window()) with start time", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})
test_that("time windowing (window()) with just window duration", {
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
df$window <- window(df$t, "5 seconds")
local <- collect(df)$v
# Not checking time windows because of possible time zone issues. Just checking that the function
# works
expect_equal(local, c(1))
})
test_that("when(), otherwise() and ifelse() on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment