From c44bf137c7ca649e0c504229eb3e6ff7955e9a53 Mon Sep 17 00:00:00 2001
From: Dongjoon Hyun <dongjoon@apache.org>
Date: Mon, 20 Jun 2016 11:30:26 -0700
Subject: [PATCH] [SPARK-16051][R] Add `read.orc/write.orc` to SparkR

## What changes were proposed in this pull request?

This issue adds `read.orc/write.orc` to SparkR for API parity.

## How was this patch tested?

Pass the Jenkins tests (with new testcases).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13763 from dongjoon-hyun/SPARK-16051.
---
 R/pkg/NAMESPACE                           |  2 ++
 R/pkg/R/DataFrame.R                       | 27 +++++++++++++++++++++++
 R/pkg/R/SQLContext.R                      | 21 +++++++++++++++++-
 R/pkg/R/generics.R                        |  4 ++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 ++++++++++++++++++
 5 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cc129a73fe..aaeab665a4 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -117,6 +117,7 @@ exportMethods("arrange",
               "write.df",
               "write.jdbc",
               "write.json",
+              "write.orc",
               "write.parquet",
               "write.text",
               "write.ml")
@@ -306,6 +307,7 @@ export("as.DataFrame",
        "read.df",
        "read.jdbc",
        "read.json",
+       "read.orc",
        "read.parquet",
        "read.text",
        "spark.lapply",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index ea091c8101..f3a3eff46d 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -701,6 +701,33 @@ setMethod("write.json",
             invisible(callJMethod(write, "json", path))
           })
 
+#' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
+#'
+#' Save the contents of a SparkDataFrame as an ORC file, preserving the schema. Files written out
+#' with this method can be read back in as a SparkDataFrame using read.orc().
+#'
+#' @param x A SparkDataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family SparkDataFrame functions
+#' @rdname write.orc
+#' @name write.orc
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' write.orc(df, "/tmp/sparkr-tmp1/")
+#' }
+#' @note write.orc since 2.0.0
+setMethod("write.orc",
+          signature(x = "SparkDataFrame", path = "character"),
+          function(x, path) {
+            write <- callJMethod(x@sdf, "write")
+            invisible(callJMethod(write, "orc", path))
+          })
+
 #' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
 #'
 #' Save the contents of a SparkDataFrame as a Parquet file, preserving the schema. Files written out
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index b0ccc42ff8..b7e1c062c7 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -330,6 +330,25 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
   }
 }
 
+#' Create a SparkDataFrame from an ORC file.
+#'
+#' Loads an ORC file, returning the result as a SparkDataFrame.
+#'
+#' @param path Path of file to read.
+#' @return SparkDataFrame
+#' @rdname read.orc
+#' @export
+#' @name read.orc
+#' @note read.orc since 2.0.0
+read.orc <- function(path) {
+  sparkSession <- getSparkSession()
+  # Allow the user to have a more flexible definiton of the ORC file path
+  path <- suppressWarnings(normalizePath(path))
+  read <- callJMethod(sparkSession, "read")
+  sdf <- callJMethod(read, "orc", path)
+  dataFrame(sdf)
+}
+
 #' Create a SparkDataFrame from a Parquet file.
 #'
 #' Loads a Parquet file, returning the result as a SparkDataFrame.
@@ -343,7 +362,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
 
 read.parquet.default <- function(path) {
   sparkSession <- getSparkSession()
-  # Allow the user to have a more flexible definiton of the text file path
+  # Allow the user to have a more flexible definiton of the Parquet file path
   paths <- as.list(suppressWarnings(normalizePath(path)))
   read <- callJMethod(sparkSession, "read")
   sdf <- callJMethod(read, "parquet", paths)
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 37d05560c3..dcc1cf241f 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -610,6 +610,10 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
 #' @export
 setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
 
+#' @rdname write.orc
+#' @export
+setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
+
 #' @rdname write.parquet
 #' @export
 setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ceba0d138e..114fec6e36 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -68,6 +68,7 @@ mockLines <- c("{\"name\":\"Michael\"}",
                "{\"name\":\"Justin\", \"age\":19}")
 jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
 parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
 writeLines(mockLines, jsonPath)
 
 # For test nafunctions, like dropna(), fillna(),...
@@ -1667,6 +1668,25 @@ test_that("mutate(), transform(), rename() and names()", {
   detach(airquality)
 })
 
+test_that("read/write ORC files", {
+  df <- read.df(jsonPath, "json")
+
+  # Test write.df and read.df
+  write.df(df, orcPath, "orc", mode = "overwrite")
+  df2 <- read.df(orcPath, "orc")
+  expect_is(df2, "SparkDataFrame")
+  expect_equal(count(df), count(df2))
+
+  # Test write.orc and read.orc
+  orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
+  write.orc(df, orcPath2)
+  orcDF <- read.orc(orcPath2)
+  expect_is(orcDF, "SparkDataFrame")
+  expect_equal(count(orcDF), count(df))
+
+  unlink(orcPath2)
+})
+
 test_that("read/write Parquet files", {
   df <- read.df(jsonPath, "json")
   # Test write.df and read.df
@@ -2351,5 +2371,6 @@ test_that("enableHiveSupport on SparkSession", {
 })
 
 unlink(parquetPath)
+unlink(orcPath)
 unlink(jsonPath)
 unlink(jsonPathNa)
-- 
GitLab