From b0e8eb6d3e9e80fa62625a5b9382d93af77250db Mon Sep 17 00:00:00 2001 From: Felix Cheung <felixcheung_m@hotmail.com> Date: Fri, 13 Jan 2017 10:08:14 -0800 Subject: [PATCH] [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter ## What changes were proposed in this pull request? To allow specifying number of partitions when the DataFrame is created ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16512 from felixcheung/rnumpart. --- R/pkg/R/SQLContext.R | 20 ++++++++---- R/pkg/R/context.R | 39 ++++++++++++++++++++--- R/pkg/inst/tests/testthat/test_rdd.R | 4 +-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++++++++++++- 4 files changed, 72 insertions(+), 14 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 6f48cd6639..e771a057e2 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -184,8 +184,11 @@ getDefaultSqlSource <- function() { #' #' Converts R data.frame or list into SparkDataFrame. #' -#' @param data an RDD or list or data.frame. +#' @param data a list or data.frame. #' @param schema a list of column names or named list (StructType), optional. +#' @param samplingRatio Currently not used. +#' @param numPartitions the number of partitions of the SparkDataFrame. Defaults to 1, this is +#' limited by length of the list or number of rows of the data.frame #' @return A SparkDataFrame. #' @rdname createDataFrame #' @export @@ -195,12 +198,14 @@ getDefaultSqlSource <- function() { #' df1 <- as.DataFrame(iris) #' df2 <- as.DataFrame(list(3,4,5,6)) #' df3 <- createDataFrame(iris) +#' df4 <- createDataFrame(cars, numPartitions = 2) #' } #' @name createDataFrame #' @method createDataFrame default #' @note createDataFrame since 1.4.0 # TODO(davies): support sampling and infer type from NA -createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { +createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, + numPartitions = NULL) { sparkSession <- getSparkSession() if (is.data.frame(data)) { @@ -233,7 +238,11 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - rdd <- parallelize(sc, data) + if (!is.null(numPartitions)) { + rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) + } else { + rdd <- parallelize(sc, data, numSlices = 1) + } } else if (inherits(data, "RDD")) { rdd <- data } else { @@ -283,14 +292,13 @@ createDataFrame <- function(x, ...) { dispatchFunc("createDataFrame(data, schema = NULL)", x, ...) } -#' @param samplingRatio Currently not used. #' @rdname createDataFrame #' @aliases createDataFrame #' @export #' @method as.DataFrame default #' @note as.DataFrame since 1.6.0 -as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { - createDataFrame(data, schema) +as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { + createDataFrame(data, schema, samplingRatio, numPartitions) } #' @param ... additional argument(s). diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 1138caf98e..1a0dd65f45 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -91,6 +91,16 @@ objectFile <- function(sc, path, minPartitions = NULL) { #' will write it to disk and send the file name to JVM. Also to make sure each slice is not #' larger than that limit, number of slices may be increased. #' +#' In 2.2.0 we are changing how the numSlices are used/computed to handle +#' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact number of slices. +#' This change affects both createDataFrame and spark.lapply. +#' In the specific one case that it is used to convert R native object into SparkDataFrame, it has +#' always been kept at the default of 1. In the case the object is large, we are explicitly setting +#' the parallism to numSlices (which is still 1). +#' +#' Specifically, we are changing to split positions to match the calculation in positions() of +#' ParallelCollectionRDD in Spark. +#' #' @param sc SparkContext to use #' @param coll collection to parallelize #' @param numSlices number of partitions to create in the RDD @@ -107,6 +117,8 @@ parallelize <- function(sc, coll, numSlices = 1) { # TODO: bound/safeguard numSlices # TODO: unit tests for if the split works for all primitives # TODO: support matrix, data frame, etc + + # Note, for data.frame, createDataFrame turns it into a list before it calls here. # nolint start # suppress lintr warning: Place a space before left parenthesis, except in a function call. if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) { @@ -128,12 +140,29 @@ parallelize <- function(sc, coll, numSlices = 1) { objectSize <- object.size(coll) # For large objects we make sure the size of each slice is also smaller than sizeLimit - numSlices <- max(numSlices, ceiling(objectSize / sizeLimit)) - if (numSlices > length(coll)) - numSlices <- length(coll) + numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit)) + if (numSerializedSlices > length(coll)) + numSerializedSlices <- length(coll) + + # Generate the slice ids to put each row + # For instance, for numSerializedSlices of 22, length of 50 + # [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22 + # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47 + # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced. + # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD + splits <- if (numSerializedSlices > 0) { + unlist(lapply(0: (numSerializedSlices - 1), function(x) { + # nolint start + start <- trunc((x * length(coll)) / numSerializedSlices) + end <- trunc(((x + 1) * length(coll)) / numSerializedSlices) + # nolint end + rep(start, end - start) + })) + } else { + 1 + } - sliceLen <- ceiling(length(coll) / numSlices) - slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)]) + slices <- split(coll, splits) # Serialize each slice: obtain a list of raws, or a list of lists (slices) of # 2-tuples of raws diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R index a3d66c245a..2c41a6b075 100644 --- a/R/pkg/inst/tests/testthat/test_rdd.R +++ b/R/pkg/inst/tests/testthat/test_rdd.R @@ -381,8 +381,8 @@ test_that("aggregateRDD() on RDDs", { test_that("zipWithUniqueId() on RDDs", { rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) actual <- collectRDD(zipWithUniqueId(rdd)) - expected <- list(list("a", 0), list("b", 3), list("c", 1), - list("d", 4), list("e", 2)) + expected <- list(list("a", 0), list("b", 1), list("c", 4), + list("d", 2), list("e", 5)) expect_equal(actual, expected) rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3e8b96a513..26017427ab 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -196,6 +196,26 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float"))) expect_equal(as.list(collect(where(df, df$name == "John"))), list(name = "John", age = 19L, height = 176.5)) + expect_equal(getNumPartitions(toRDD(df)), 1) + + df <- as.DataFrame(cars, numPartitions = 2) + expect_equal(getNumPartitions(toRDD(df)), 2) + df <- createDataFrame(cars, numPartitions = 3) + expect_equal(getNumPartitions(toRDD(df)), 3) + # validate limit by num of rows + df <- createDataFrame(cars, numPartitions = 60) + expect_equal(getNumPartitions(toRDD(df)), 50) + # validate when 1 < (length(coll) / numSlices) << length(coll) + df <- createDataFrame(cars, numPartitions = 20) + expect_equal(getNumPartitions(toRDD(df)), 20) + + df <- as.DataFrame(data.frame(0)) + expect_is(df, "SparkDataFrame") + df <- createDataFrame(list(list(1))) + expect_is(df, "SparkDataFrame") + df <- as.DataFrame(data.frame(0), numPartitions = 2) + # no data to partition, goes to 1 + expect_equal(getNumPartitions(toRDD(df)), 1) setHiveContext(sc) sql("CREATE TABLE people (name string, age double, height float)") @@ -213,7 +233,8 @@ test_that("createDataFrame uses files for large objects", { # To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value conf <- callJMethod(sparkSession, "conf") callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100") - df <- suppressWarnings(createDataFrame(iris)) + df <- suppressWarnings(createDataFrame(iris, numPartitions = 3)) + expect_equal(getNumPartitions(toRDD(df)), 3) # Resetting the conf back to default value callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10)) -- GitLab