diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 7179438efc1d9a45ba851cc2b70e7b182f3c90cf..963a1bb5806a734a3f4e1b966f3e71df080bf4a8 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -26,6 +26,7 @@ Collate: 'pairRDD.R' 'DataFrame.R' 'SQLContext.R' + 'WindowSpec.R' 'backend.R' 'broadcast.R' 'client.R' @@ -38,4 +39,5 @@ Collate: 'stats.R' 'types.R' 'utils.R' + 'window.R' RoxygenNote: 5.0.1 diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 73f7c595f443715f9d47b8f1c07de77b4286e53c..1432ab8a9d1ce3567627ed348c881ae4e1ba305d 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -216,6 +216,7 @@ exportMethods("%in%", "next_day", "ntile", "otherwise", + "over", "percent_rank", "pmod", "quarter", @@ -315,3 +316,12 @@ export("structField", "structType.jobj", "structType.structField", "print.structType") + +exportClasses("WindowSpec") + +export("partitionBy", + "rowsBetween", + "rangeBetween") + +export("window.partitionBy", + "window.orderBy") diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index fcf473ac7b76e613f161e2cc2ac3833c6f6bb9df..43c46b847446b17be2119b351c512e6990c1b863 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1749,8 +1749,8 @@ setMethod("arrange", #' @export setMethod("orderBy", signature(x = "SparkDataFrame", col = "characterOrColumn"), - function(x, col) { - arrange(x, col) + function(x, col, ...) { + arrange(x, col, ...) }) #' Filter diff --git a/R/pkg/R/WindowSpec.R b/R/pkg/R/WindowSpec.R new file mode 100644 index 0000000000000000000000000000000000000000..581176a6c0918f34ed529bd87ea050725c8f1b4e --- /dev/null +++ b/R/pkg/R/WindowSpec.R @@ -0,0 +1,188 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# WindowSpec.R - WindowSpec class and methods implemented in S4 OO classes + +#' @include generics.R jobj.R column.R +NULL + +#' @title S4 class that represents a WindowSpec +#' @description WindowSpec can be created by using window.partitionBy() +#' or window.orderBy() +#' @rdname WindowSpec +#' @seealso \link{window.partitionBy}, \link{window.orderBy} +#' +#' @param sws A Java object reference to the backing Scala WindowSpec +#' @export +setClass("WindowSpec", + slots = list(sws = "jobj")) + +setMethod("initialize", "WindowSpec", function(.Object, sws) { + .Object@sws <- sws + .Object +}) + +windowSpec <- function(sws) { + stopifnot(class(sws) == "jobj") + new("WindowSpec", sws) +} + +#' @rdname show +setMethod("show", "WindowSpec", + function(object) { + cat("WindowSpec", callJMethod(object@sws, "toString"), "\n") + }) + +#' partitionBy +#' +#' Defines the partitioning columns in a WindowSpec. +#' +#' @param x a WindowSpec +#' @return a WindowSpec +#' @rdname partitionBy +#' @name partitionBy +#' @family windowspec_method +#' @export +#' @examples +#' \dontrun{ +#' partitionBy(ws, "col1", "col2") +#' partitionBy(ws, df$col1, df$col2) +#' } +setMethod("partitionBy", + signature(x = "WindowSpec"), + function(x, col, ...) { + stopifnot (class(col) %in% c("character", "Column")) + + if (class(col) == "character") { + windowSpec(callJMethod(x@sws, "partitionBy", col, list(...))) + } else { + jcols <- lapply(list(col, ...), function(c) { + c@jc + }) + windowSpec(callJMethod(x@sws, "partitionBy", jcols)) + } + }) + +#' orderBy +#' +#' Defines the ordering columns in a WindowSpec. +#' +#' @param x a WindowSpec +#' @return a WindowSpec +#' @rdname arrange +#' @name orderBy +#' @family windowspec_method +#' @export +#' @examples +#' \dontrun{ +#' orderBy(ws, "col1", "col2") +#' orderBy(ws, df$col1, df$col2) +#' } +setMethod("orderBy", + signature(x = "WindowSpec", col = "character"), + function(x, col, ...) { + windowSpec(callJMethod(x@sws, "orderBy", col, list(...))) + }) + +#' @rdname arrange +#' @name orderBy +#' @export +setMethod("orderBy", + signature(x = "WindowSpec", col = "Column"), + function(x, col, ...) { + jcols <- lapply(list(col, ...), function(c) { + c@jc + }) + windowSpec(callJMethod(x@sws, "orderBy", jcols)) + }) + +#' rowsBetween +#' +#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). +#' +#' Both `start` and `end` are relative positions from the current row. For example, "0" means +#' "current row", while "-1" means the row before the current row, and "5" means the fifth row +#' after the current row. +#' +#' @param x a WindowSpec +#' @param start boundary start, inclusive. +#' The frame is unbounded if this is the minimum long value. +#' @param end boundary end, inclusive. +#' The frame is unbounded if this is the maximum long value. +#' @return a WindowSpec +#' @rdname rowsBetween +#' @name rowsBetween +#' @family windowspec_method +#' @export +#' @examples +#' \dontrun{ +#' rowsBetween(ws, 0, 3) +#' } +setMethod("rowsBetween", + signature(x = "WindowSpec", start = "numeric", end = "numeric"), + function(x, start, end) { + # "start" and "end" should be long, due to serde limitation, + # limit "start" and "end" as integer now + windowSpec(callJMethod(x@sws, "rowsBetween", as.integer(start), as.integer(end))) + }) + +#' rangeBetween +#' +#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). +#' +#' Both `start` and `end` are relative from the current row. For example, "0" means "current row", +#' while "-1" means one off before the current row, and "5" means the five off after the +#' current row. + +#' @param x a WindowSpec +#' @param start boundary start, inclusive. +#' The frame is unbounded if this is the minimum long value. +#' @param end boundary end, inclusive. +#' The frame is unbounded if this is the maximum long value. +#' @return a WindowSpec +#' @rdname rangeBetween +#' @name rangeBetween +#' @family windowspec_method +#' @export +#' @examples +#' \dontrun{ +#' rangeBetween(ws, 0, 3) +#' } +setMethod("rangeBetween", + signature(x = "WindowSpec", start = "numeric", end = "numeric"), + function(x, start, end) { + # "start" and "end" should be long, due to serde limitation, + # limit "start" and "end" as integer now + windowSpec(callJMethod(x@sws, "rangeBetween", as.integer(start), as.integer(end))) + }) + +# Note that over is a method of Column class, but it is placed here to +# avoid Roxygen circular-dependency between class Column and WindowSpec. + +#' over +#' +#' Define a windowing column. +#' +#' @rdname over +#' @name over +#' @family colum_func +#' @export +setMethod("over", + signature(x = "Column", window = "WindowSpec"), + function(x, window) { + column(callJMethod(x@jc, "over", window@sws)) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 3db1ac07666b30d2a3d2a19104442e4e388de98c..8563be1e649838379d4c56142bb26aa9517675f2 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -339,9 +339,9 @@ setGeneric("join", function(x, y, ...) { standardGeneric("join") }) # @export setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") }) -# @rdname partitionBy -# @export -setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") }) +#' @rdname partitionBy +#' @export +setGeneric("partitionBy", function(x, ...) { standardGeneric("partitionBy") }) # @rdname reduceByKey # @seealso groupByKey @@ -533,7 +533,7 @@ setGeneric("mutate", function(.data, ...) {standardGeneric("mutate") }) #' @rdname arrange #' @export -setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") }) +setGeneric("orderBy", function(x, col, ...) { standardGeneric("orderBy") }) #' @rdname schema #' @export @@ -733,6 +733,27 @@ setGeneric("when", function(condition, value) { standardGeneric("when") }) #' @export setGeneric("otherwise", function(x, value) { standardGeneric("otherwise") }) +#' @rdname over +#' @export +setGeneric("over", function(x, window) { standardGeneric("over") }) + +###################### WindowSpec Methods ########################## + +#' @rdname rowsBetween +#' @export +setGeneric("rowsBetween", function(x, start, end) { standardGeneric("rowsBetween") }) + +#' @rdname rangeBetween +#' @export +setGeneric("rangeBetween", function(x, start, end) { standardGeneric("rangeBetween") }) + +#' @rdname window.partitionBy +#' @export +setGeneric("window.partitionBy", function(col, ...) { standardGeneric("window.partitionBy") }) + +#' @rdname window.orderBy +#' @export +setGeneric("window.orderBy", function(col, ...) { standardGeneric("window.orderBy") }) ###################### Expression Function Methods ########################## diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index 4075ef4377acf90b850ff792a711ecf8fa426825..d39775cabef8860ef4f5366e8bcf3fdce847cad1 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -205,8 +205,10 @@ setMethod("flatMapValues", #' @aliases partitionBy,RDD,integer-method #' @noRd setMethod("partitionBy", - signature(x = "RDD", numPartitions = "numeric"), + signature(x = "RDD"), function(x, numPartitions, partitionFunc = hashCode) { + stopifnot(is.numeric(numPartitions)) + partitionFunc <- cleanClosure(partitionFunc) serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL) diff --git a/R/pkg/R/window.R b/R/pkg/R/window.R new file mode 100644 index 0000000000000000000000000000000000000000..7ecf70abc6b50fd0af424222c7172008a304792d --- /dev/null +++ b/R/pkg/R/window.R @@ -0,0 +1,98 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# window.R - Utility functions for defining window in DataFrames + +#' window.partitionBy +#' +#' Creates a WindowSpec with the partitioning defined. +#' +#' @rdname window.partitionBy +#' @name window.partitionBy +#' @export +#' @examples +#' \dontrun{ +#' ws <- window.partitionBy("key1", "key2") +#' df1 <- select(df, over(lead("value", 1), ws)) +#' +#' ws <- window.partitionBy(df$key1, df$key2) +#' df1 <- select(df, over(lead("value", 1), ws)) +#' } +setMethod("window.partitionBy", + signature(col = "character"), + function(col, ...) { + windowSpec( + callJStatic("org.apache.spark.sql.expressions.Window", + "partitionBy", + col, + list(...))) + }) + +#' @rdname window.partitionBy +#' @name window.partitionBy +#' @export +setMethod("window.partitionBy", + signature(col = "Column"), + function(col, ...) { + jcols <- lapply(list(col, ...), function(c) { + c@jc + }) + windowSpec( + callJStatic("org.apache.spark.sql.expressions.Window", + "partitionBy", + jcols)) + }) + +#' window.orderBy +#' +#' Creates a WindowSpec with the ordering defined. +#' +#' @rdname window.orderBy +#' @name window.orderBy +#' @export +#' @examples +#' \dontrun{ +#' ws <- window.orderBy("key1", "key2") +#' df1 <- select(df, over(lead("value", 1), ws)) +#' +#' ws <- window.orderBy(df$key1, df$key2) +#' df1 <- select(df, over(lead("value", 1), ws)) +#' } +setMethod("window.orderBy", + signature(col = "character"), + function(col, ...) { + windowSpec( + callJStatic("org.apache.spark.sql.expressions.Window", + "orderBy", + col, + list(...))) + }) + +#' @rdname window.orderBy +#' @name window.orderBy +#' @export +setMethod("window.orderBy", + signature(col = "Column"), + function(col, ...) { + jcols <- lapply(list(col, ...), function(c) { + c@jc + }) + windowSpec( + callJStatic("org.apache.spark.sql.expressions.Window", + "orderBy", + jcols)) + }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3b6a27c3b86a107d9cd203bcec2a769dae498588..0f67bc2e331d1e7f73d840b957badb9f2518496b 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2118,6 +2118,42 @@ test_that("repartition by columns on DataFrame", { expect_equal(nrow(df1), 2) }) +test_that("Window functions on a DataFrame", { + ssc <- callJMethod(sc, "sc") + hiveCtx <- tryCatch({ + newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc) + }, + error = function(err) { + skip("Hive is not build with SparkSQL, skipped") + }) + + df <- createDataFrame(hiveCtx, + list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), + schema = c("key", "value")) + ws <- orderBy(window.partitionBy("key"), "value") + result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) + names(result) <- c("key", "value") + expected <- data.frame(key = c(1L, NA, 2L, NA), + value = c("1", NA, "2", NA), + stringsAsFactors = FALSE) + expect_equal(result, expected) + + ws <- orderBy(window.partitionBy(df$key), df$value) + result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) + names(result) <- c("key", "value") + expect_equal(result, expected) + + ws <- partitionBy(window.orderBy("value"), "key") + result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) + names(result) <- c("key", "value") + expect_equal(result, expected) + + ws <- partitionBy(window.orderBy(df$value), df$key) + result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) + names(result) <- c("key", "value") + expect_equal(result, expected) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa)