From 5f48e5c33bafa376be5741e260a037c66103fdcd Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Date: Fri, 29 May 2015 14:11:58 -0700 Subject: [PATCH] [SPARK-6806] [SPARKR] [DOCS] Add a new SparkR programming guide This PR adds a new SparkR programming guide at the top-level. This will be useful for R users as our APIs don't directly match the Scala/Python APIs and as we need to explain SparkR without using RDDs as examples etc. cc rxin davies pwendell cc cafreeman -- Would be great if you could also take a look at this ! Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Closes #6490 from shivaram/sparkr-guide and squashes the following commits: d5ff360 [Shivaram Venkataraman] Add a section on HiveContext, HQL queries 408dce5 [Shivaram Venkataraman] Fix link dbb86e3 [Shivaram Venkataraman] Fix minor typo 9aff5e0 [Shivaram Venkataraman] Address comments, use dplyr-like syntax in example d09703c [Shivaram Venkataraman] Fix default argument in read.df ea816a1 [Shivaram Venkataraman] Add a new SparkR programming guide Also update write.df, read.df to handle defaults better --- R/pkg/R/DataFrame.R | 10 +- R/pkg/R/SQLContext.R | 5 + R/pkg/R/generics.R | 4 +- docs/_layouts/global.html | 1 + docs/index.md | 2 +- docs/sparkr.md | 223 ++++++++++++++++++++++++++++++++++ docs/sql-programming-guide.md | 4 +- 7 files changed, 238 insertions(+), 11 deletions(-) create mode 100644 docs/sparkr.md diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ed8093c80d..e79d324838 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1314,9 +1314,8 @@ setMethod("except", #' write.df(df, "myfile", "parquet", "overwrite") #' } setMethod("write.df", - signature(df = "DataFrame", path = 'character', source = 'character', - mode = 'character'), - function(df, path = NULL, source = NULL, mode = "append", ...){ + signature(df = "DataFrame", path = 'character'), + function(df, path, source = NULL, mode = "append", ...){ if (is.null(source)) { sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", @@ -1338,9 +1337,8 @@ setMethod("write.df", #' @aliases saveDF #' @export setMethod("saveDF", - signature(df = "DataFrame", path = 'character', source = 'character', - mode = 'character'), - function(df, path = NULL, source = NULL, mode = "append", ...){ + signature(df = "DataFrame", path = 'character'), + function(df, path, source = NULL, mode = "append", ...){ write.df(df, path, source, mode, ...) }) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 36cc612875..88e1a508f3 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -457,6 +457,11 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) { if (!is.null(path)) { options[['path']] <- path } + if (is.null(source)) { + sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) + source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", + "org.apache.spark.sql.parquet") + } sdf <- callJMethod(sqlContext, "load", source, options) dataFrame(sdf) } diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index a23d3b217b..1f4fc6adac 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -482,11 +482,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) { #' @rdname write.df #' @export -setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") }) +setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") }) #' @rdname write.df #' @export -setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") }) +setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") }) #' @rdname schema #' @export diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index b92c75f90b..eebb3faf90 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -75,6 +75,7 @@ <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li> <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li> + <li><a href="sparkr.html">SparkR (R on Spark)</a></li> </ul> </li> diff --git a/docs/index.md b/docs/index.md index 5ef6d983c4..fac071da81 100644 --- a/docs/index.md +++ b/docs/index.md @@ -54,7 +54,7 @@ Example applications are also provided in Python. For example, ./bin/spark-submit examples/src/main/python/pi.py 10 -Spark also provides an experimental R API since 1.4 (only DataFrames APIs included). +Spark also provides an experimental [R API](sparkr.html) since 1.4 (only DataFrames APIs included). To run Spark interactively in a R interpreter, use `bin/sparkR`: ./bin/sparkR --master local[2] diff --git a/docs/sparkr.md b/docs/sparkr.md new file mode 100644 index 0000000000..4d82129921 --- /dev/null +++ b/docs/sparkr.md @@ -0,0 +1,223 @@ +--- +layout: global +displayTitle: SparkR (R on Spark) +title: SparkR (R on Spark) +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +# Overview +SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. +In Spark {{site.SPARK_VERSION}}, SparkR provides a distributed data frame implementation that +supports operations like selection, filtering, aggregation etc. (similar to R data frames, +[dplyr](https://github.com/hadley/dplyr)) but on large datasets. + +# SparkR DataFrames + +A DataFrame is a distributed collection of data organized into named columns. It is conceptually +equivalent to a table in a relational database or a data frame in R, but with richer +optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: +structured data files, tables in Hive, external databases, or existing local R data frames. + +All of the examples on this page use sample data included in R or the Spark distribution and can be run using the `./bin/sparkR` shell. + +## Starting Up: SparkContext, SQLContext + +<div data-lang="r" markdown="1"> +The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster. +You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name +etc. Further, to work with DataFrames we will need a `SQLContext`, which can be created from the +SparkContext. If you are working from the SparkR shell, the `SQLContext` and `SparkContext` should +already be created for you. + +{% highlight r %} +sc <- sparkR.init() +sqlContext <- sparkRSQL.init(sc) +{% endhighlight %} + +</div> + +## Creating DataFrames +With a `SQLContext`, applications can create `DataFrame`s from a local R data frame, from a [Hive table](sql-programming-guide.html#hive-tables), or from other [data sources](sql-programming-guide.html#data-sources). + +### From local data frames +The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R. + +<div data-lang="r" markdown="1"> +{% highlight r %} +df <- createDataFrame(sqlContext, faithful) + +# Displays the content of the DataFrame to stdout +head(df) +## eruptions waiting +##1 3.600 79 +##2 1.800 54 +##3 3.333 74 + +{% endhighlight %} +</div> + +### From Data Sources + +SparkR supports operating on a variety of data sources through the `DataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. + +The general method for creating DataFrames from data sources is `read.df`. This method takes in the `SQLContext`, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through [Spark Packages](http://spark-packages.org/) you can find data source connectors for popular file formats like [CSV](http://spark-packages.org/package/databricks/spark-csv) and [Avro](http://spark-packages.org/package/databricks/spark-avro). + +We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. + +<div data-lang="r" markdown="1"> + +{% highlight r %} +people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json") +head(people) +## age name +##1 NA Michael +##2 30 Andy +##3 19 Justin + +# SparkR automatically infers the schema from the JSON file +printSchema(people) +# root +# |-- age: integer (nullable = true) +# |-- name: string (nullable = true) + +{% endhighlight %} +</div> + +The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example +to a Parquet file using `write.df` + +<div data-lang="r" markdown="1"> +{% highlight r %} +write.df(people, path="people.parquet", source="parquet", mode="overwrite") +{% endhighlight %} +</div> + +### From Hive tables + +You can also create SparkR DataFrames from Hive tables. To do this we will need to create a HiveContext which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details on the difference between SQLContext and HiveContext can be found in the [SQL programming guide](sql-programming-guide.html#starting-point-sqlcontext). + +<div data-lang="r" markdown="1"> +{% highlight r %} +# sc is an existing SparkContext. +hiveContext <- sparkRHive.init(sc) + +sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") + +# Queries can be expressed in HiveQL. +results <- hiveContext.sql("FROM src SELECT key, value") + +# results is now a DataFrame +head(results) +## key value +## 1 238 val_238 +## 2 86 val_86 +## 3 311 val_311 + +{% endhighlight %} +</div> + +## DataFrame Operations + +SparkR DataFrames support a number of functions to do structured data processing. +Here we include some basic examples and a complete list can be found in the [API](api/R/index.html) docs: + +### Selecting rows, columns + +<div data-lang="r" markdown="1"> +{% highlight r %} +# Create the DataFrame +df <- createDataFrame(sqlContext, faithful) + +# Get basic information about the DataFrame +df +## DataFrame[eruptions:double, waiting:double] + +# Select only the "eruptions" column +head(select(df, df$eruptions)) +## eruptions +##1 3.600 +##2 1.800 +##3 3.333 + +# You can also pass in column name as strings +head(select(df, "eruptions")) + +# Filter the DataFrame to only retain rows with wait times shorter than 50 mins +head(filter(df, df$waiting < 50)) +## eruptions waiting +##1 1.750 47 +##2 1.750 47 +##3 1.867 48 + +{% endhighlight %} + +</div> + +### Grouping, Aggregation + +SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below + +<div data-lang="r" markdown="1"> +{% highlight r %} + +# We use the `n` operator to count the number of times each waiting time appears +head(summarize(groupBy(df, df$waiting), count = n(df$waiting))) +## waiting count +##1 81 13 +##2 60 6 +##3 68 1 + +# We can also sort the output from the aggregation to get the most common waiting times +waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting)) +head(arrange(waiting_counts, desc(waiting_counts$count))) + +## waiting count +##1 78 15 +##2 83 14 +##3 81 13 + +{% endhighlight %} +</div> + +### Operating on Columns + +SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions. + +<div data-lang="r" markdown="1"> +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can assign this to a new column in the same DataFrame +df$waiting_secs <- df$waiting * 60 +head(df) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 + +{% endhighlight %} +</div> + +## Running SQL Queries from SparkR +A SparkR DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data. +The `sql` function enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. + +<div data-lang="r" markdown="1"> +{% highlight r %} +# Load a JSON file +people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json") + +# Register this DataFrame as a table. +registerTempTable(people, "people") + +# SQL statements can be run by using the sql method +teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19") +head(teenagers) +## name +##1 Justin + +{% endhighlight %} +</div> diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ab646f65bb..7cc0a87fd5 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1526,8 +1526,8 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ # sc is an existing SparkContext. sqlContext <- sparkRHive.init(sc) -hql(sqlContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hql(sqlContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +sql(sqlContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sql(sqlContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results = sqlContext.sql("FROM src SELECT key, value").collect() -- GitLab