diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index caa1c3b91bc7f06f2e1c54e1f9625c4e1aa3d4bb..7ff6e9a9d3c730435cf24a8683a48f264eef2803 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -95,6 +95,7 @@ exportMethods("arrange", "freqItems", "gapply", "gapplyCollect", + "getNumPartitions", "group_by", "groupBy", "head", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0a1012283ed02128e33eb2b778d596d0dc7cda2e..523343ea9f4f32d01bcfbe69247c6cc18335d71a 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3428,3 +3428,26 @@ setMethod("randomSplit", } sapply(sdfs, dataFrame) }) + +#' getNumPartitions +#' +#' Return the number of partitions +#' +#' @param x A SparkDataFrame +#' @family SparkDataFrame functions +#' @aliases getNumPartitions,SparkDataFrame-method +#' @rdname getNumPartitions +#' @name getNumPartitions +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- createDataFrame(cars, numPartitions = 2) +#' getNumPartitions(df) +#' } +#' @note getNumPartitions since 2.1.1 +setMethod("getNumPartitions", + signature(x = "SparkDataFrame"), + function(x) { + callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions") + }) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 0f1162fec1df9c88680c32028a17c286a6b2c1f6..91bab332c28640999190b6b1f22a02d65ecdabab 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -313,7 +313,7 @@ setMethod("checkpoint", #' @rdname getNumPartitions #' @aliases getNumPartitions,RDD-method #' @noRd -setMethod("getNumPartitions", +setMethod("getNumPartitionsRDD", signature(x = "RDD"), function(x) { callJMethod(getJRDD(x), "getNumPartitions") @@ -329,7 +329,7 @@ setMethod("numPartitions", signature(x = "RDD"), function(x) { .Deprecated("getNumPartitions") - getNumPartitions(x) + getNumPartitionsRDD(x) }) #' Collect elements of an RDD @@ -460,7 +460,7 @@ setMethod("countByValue", signature(x = "RDD"), function(x) { ones <- lapply(x, function(item) { list(item, 1L) }) - collectRDD(reduceByKey(ones, `+`, getNumPartitions(x))) + collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x))) }) #' Apply a function to all elements @@ -780,7 +780,7 @@ setMethod("takeRDD", resList <- list() index <- -1 jrdd <- getJRDD(x) - numPartitions <- getNumPartitions(x) + numPartitions <- getNumPartitionsRDD(x) serializedModeRDD <- getSerializedMode(x) # TODO(shivaram): Collect more than one partition based on size @@ -846,7 +846,7 @@ setMethod("firstRDD", #' @noRd setMethod("distinctRDD", signature(x = "RDD"), - function(x, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) { identical.mapped <- lapply(x, function(x) { list(x, NULL) }) reduced <- reduceByKey(identical.mapped, function(x, y) { x }, @@ -1053,7 +1053,7 @@ setMethod("coalesce", signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions, shuffle = FALSE) { numPartitions <- numToInt(numPartitions) - if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) { + if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) { func <- function(partIndex, part) { set.seed(partIndex) # partIndex as seed start <- as.integer(base::sample(numPartitions, 1) - 1) @@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile", #' @noRd setMethod("sortBy", signature(x = "RDD", func = "function"), - function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) { values(sortByKey(keyBy(x, func), ascending, numPartitions)) }) @@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) { resList <- list() index <- -1 jrdd <- getJRDD(newRdd) - numPartitions <- getNumPartitions(newRdd) + numPartitions <- getNumPartitionsRDD(newRdd) serializedModeRDD <- getSerializedMode(newRdd) while (TRUE) { @@ -1407,7 +1407,7 @@ setMethod("setName", setMethod("zipWithUniqueId", signature(x = "RDD"), function(x) { - n <- getNumPartitions(x) + n <- getNumPartitionsRDD(x) partitionFunc <- function(partIndex, part) { mapply( @@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId", setMethod("zipWithIndex", signature(x = "RDD"), function(x) { - n <- getNumPartitions(x) + n <- getNumPartitionsRDD(x) if (n > 1) { nums <- collectRDD(lapplyPartition(x, function(part) { @@ -1566,8 +1566,8 @@ setMethod("unionRDD", setMethod("zipRDD", signature(x = "RDD", other = "RDD"), function(x, other) { - n1 <- getNumPartitions(x) - n2 <- getNumPartitions(other) + n1 <- getNumPartitionsRDD(x) + n2 <- getNumPartitionsRDD(other) if (n1 != n2) { stop("Can only zip RDDs which have the same number of partitions.") } @@ -1637,7 +1637,7 @@ setMethod("cartesian", #' @noRd setMethod("subtract", signature(x = "RDD", other = "RDD"), - function(x, other, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) { mapFunction <- function(e) { list(e, NA) } rdd1 <- map(x, mapFunction) rdd2 <- map(other, mapFunction) @@ -1671,7 +1671,7 @@ setMethod("subtract", #' @noRd setMethod("intersection", signature(x = "RDD", other = "RDD"), - function(x, other, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) { rdd1 <- map(x, function(v) { list(v, NA) }) rdd2 <- map(other, function(v) { list(v, NA) }) @@ -1714,7 +1714,7 @@ setMethod("zipPartitions", if (length(rrdds) == 1) { return(rrdds[[1]]) } - nPart <- sapply(rrdds, getNumPartitions) + nPart <- sapply(rrdds, getNumPartitionsRDD) if (length(unique(nPart)) != 1) { stop("Can only zipPartitions RDDs which have the same number of partitions.") } diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 433c16640cf1bf461e5594cc099966fcda31fc46..0307bac349ec13420d8858d063118337f45b3f65 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") }) # @export setGeneric("name", function(x) { standardGeneric("name") }) -# @rdname getNumPartitions +# @rdname getNumPartitionsRDD # @export -setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") }) +setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") }) # @rdname getNumPartitions # @export @@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { standardGeneric("gapply") }) #' @export setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") }) +# @rdname getNumPartitions +# @export +setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") }) + #' @rdname summary #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index 4dee3245f9b7513b4fdb74dc050032543864e851..8fa21be3076b52cd203ff5e0c806bb6293de6390 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -780,7 +780,7 @@ setMethod("cogroup", #' @noRd setMethod("sortByKey", signature(x = "RDD"), - function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) { rangeBounds <- list() if (numPartitions > 1) { @@ -850,7 +850,7 @@ setMethod("sortByKey", #' @noRd setMethod("subtractByKey", signature(x = "RDD", other = "RDD"), - function(x, other, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) { filterFunction <- function(elem) { iters <- elem[[2]] (length(iters[[1]]) > 0) && (length(iters[[2]]) == 0) diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R index 2c41a6b075b47821c299820a12231845ad63a9e4..ceb31bd896133d7e20b4c1041ed3fe911bdc81ac 100644 --- a/R/pkg/inst/tests/testthat/test_rdd.R +++ b/R/pkg/inst/tests/testthat/test_rdd.R @@ -29,8 +29,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200)) intRdd <- parallelize(sc, intPairs, 2L) test_that("get number of partitions in RDD", { - expect_equal(getNumPartitions(rdd), 2) - expect_equal(getNumPartitions(intRdd), 2) + expect_equal(getNumPartitionsRDD(rdd), 2) + expect_equal(getNumPartitionsRDD(intRdd), 2) }) test_that("first on RDD", { @@ -305,18 +305,18 @@ test_that("repartition/coalesce on RDDs", { # repartition r1 <- repartitionRDD(rdd, 2) - expect_equal(getNumPartitions(r1), 2L) + expect_equal(getNumPartitionsRDD(r1), 2L) count <- length(collectPartition(r1, 0L)) expect_true(count >= 8 && count <= 12) r2 <- repartitionRDD(rdd, 6) - expect_equal(getNumPartitions(r2), 6L) + expect_equal(getNumPartitionsRDD(r2), 6L) count <- length(collectPartition(r2, 0L)) expect_true(count >= 0 && count <= 4) # coalesce r3 <- coalesce(rdd, 1) - expect_equal(getNumPartitions(r3), 1L) + expect_equal(getNumPartitionsRDD(r3), 1L) count <- length(collectPartition(r3, 0L)) expect_equal(count, 20) }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index aaa8fb498c85ac4de163ccaca4d5f33810e5c29b..417a03ff6182763a50648f1ec03c597492eb917f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -196,18 +196,18 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float"))) expect_equal(as.list(collect(where(df, df$name == "John"))), list(name = "John", age = 19L, height = 176.5)) - expect_equal(getNumPartitions(toRDD(df)), 1) + expect_equal(getNumPartitions(df), 1) df <- as.DataFrame(cars, numPartitions = 2) - expect_equal(getNumPartitions(toRDD(df)), 2) + expect_equal(getNumPartitions(df), 2) df <- createDataFrame(cars, numPartitions = 3) - expect_equal(getNumPartitions(toRDD(df)), 3) + expect_equal(getNumPartitions(df), 3) # validate limit by num of rows df <- createDataFrame(cars, numPartitions = 60) - expect_equal(getNumPartitions(toRDD(df)), 50) + expect_equal(getNumPartitions(df), 50) # validate when 1 < (length(coll) / numSlices) << length(coll) df <- createDataFrame(cars, numPartitions = 20) - expect_equal(getNumPartitions(toRDD(df)), 20) + expect_equal(getNumPartitions(df), 20) df <- as.DataFrame(data.frame(0)) expect_is(df, "SparkDataFrame") @@ -215,7 +215,7 @@ test_that("create DataFrame from RDD", { expect_is(df, "SparkDataFrame") df <- as.DataFrame(data.frame(0), numPartitions = 2) # no data to partition, goes to 1 - expect_equal(getNumPartitions(toRDD(df)), 1) + expect_equal(getNumPartitions(df), 1) setHiveContext(sc) sql("CREATE TABLE people (name string, age double, height float)") @@ -234,7 +234,7 @@ test_that("createDataFrame uses files for large objects", { conf <- callJMethod(sparkSession, "conf") callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100") df <- suppressWarnings(createDataFrame(iris, numPartitions = 3)) - expect_equal(getNumPartitions(toRDD(df)), 3) + expect_equal(getNumPartitions(df), 3) # Resetting the conf back to default value callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))