diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index cc129a73fed0f09062bac288048d2016abed4aa7..aaeab665a46909e1f649beb55f36b7b6de28adde 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -117,6 +117,7 @@ exportMethods("arrange", "write.df", "write.jdbc", "write.json", + "write.orc", "write.parquet", "write.text", "write.ml") @@ -306,6 +307,7 @@ export("as.DataFrame", "read.df", "read.jdbc", "read.json", + "read.orc", "read.parquet", "read.text", "spark.lapply", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ea091c81016d4ac043e5a4aceb8f7dc8906ffdfa..f3a3eff46dd9336b0cfa7afa87317736ba6bf354 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -701,6 +701,33 @@ setMethod("write.json", invisible(callJMethod(write, "json", path)) }) +#' Save the contents of SparkDataFrame as an ORC file, preserving the schema. +#' +#' Save the contents of a SparkDataFrame as an ORC file, preserving the schema. Files written out +#' with this method can be read back in as a SparkDataFrame using read.orc(). +#' +#' @param x A SparkDataFrame +#' @param path The directory where the file is saved +#' +#' @family SparkDataFrame functions +#' @rdname write.orc +#' @name write.orc +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' write.orc(df, "/tmp/sparkr-tmp1/") +#' } +#' @note write.orc since 2.0.0 +setMethod("write.orc", + signature(x = "SparkDataFrame", path = "character"), + function(x, path) { + write <- callJMethod(x@sdf, "write") + invisible(callJMethod(write, "orc", path)) + }) + #' Save the contents of SparkDataFrame as a Parquet file, preserving the schema. #' #' Save the contents of a SparkDataFrame as a Parquet file, preserving the schema. Files written out diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index b0ccc42ff829af2ae249b7b8e5bd97943c91cb3d..b7e1c062c714e566fa58abc5f813457e391a1dd7 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -330,6 +330,25 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { } } +#' Create a SparkDataFrame from an ORC file. +#' +#' Loads an ORC file, returning the result as a SparkDataFrame. +#' +#' @param path Path of file to read. +#' @return SparkDataFrame +#' @rdname read.orc +#' @export +#' @name read.orc +#' @note read.orc since 2.0.0 +read.orc <- function(path) { + sparkSession <- getSparkSession() + # Allow the user to have a more flexible definiton of the ORC file path + path <- suppressWarnings(normalizePath(path)) + read <- callJMethod(sparkSession, "read") + sdf <- callJMethod(read, "orc", path) + dataFrame(sdf) +} + #' Create a SparkDataFrame from a Parquet file. #' #' Loads a Parquet file, returning the result as a SparkDataFrame. @@ -343,7 +362,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { read.parquet.default <- function(path) { sparkSession <- getSparkSession() - # Allow the user to have a more flexible definiton of the text file path + # Allow the user to have a more flexible definiton of the Parquet file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sparkSession, "read") sdf <- callJMethod(read, "parquet", paths) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 37d05560c3e00aba1d64bceae145766a44d78d26..dcc1cf241f42f9c4f08a53a9c7836c979aadb781 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -610,6 +610,10 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) { #' @export setGeneric("write.json", function(x, path) { standardGeneric("write.json") }) +#' @rdname write.orc +#' @export +setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") }) + #' @rdname write.parquet #' @export setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ceba0d138e27d415ca67a90a97b6599323cc66db..114fec6e36d523da7a5991782350e9fbcad802f2 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -68,6 +68,7 @@ mockLines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Justin\", \"age\":19}") jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") +orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc") writeLines(mockLines, jsonPath) # For test nafunctions, like dropna(), fillna(),... @@ -1667,6 +1668,25 @@ test_that("mutate(), transform(), rename() and names()", { detach(airquality) }) +test_that("read/write ORC files", { + df <- read.df(jsonPath, "json") + + # Test write.df and read.df + write.df(df, orcPath, "orc", mode = "overwrite") + df2 <- read.df(orcPath, "orc") + expect_is(df2, "SparkDataFrame") + expect_equal(count(df), count(df2)) + + # Test write.orc and read.orc + orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc") + write.orc(df, orcPath2) + orcDF <- read.orc(orcPath2) + expect_is(orcDF, "SparkDataFrame") + expect_equal(count(orcDF), count(df)) + + unlink(orcPath2) +}) + test_that("read/write Parquet files", { df <- read.df(jsonPath, "json") # Test write.df and read.df @@ -2351,5 +2371,6 @@ test_that("enableHiveSupport on SparkSession", { }) unlink(parquetPath) +unlink(orcPath) unlink(jsonPath) unlink(jsonPathNa)