Skip to content
Snippets Groups Projects
Commit 9cb1eb7a authored by Xin Ren's avatar Xin Ren Committed by Cheng Lian
Browse files

[SPARK-16381][SQL][SPARKR] Update SQL examples and programming guide for R language binding

https://issues.apache.org/jira/browse/SPARK-16381

## What changes were proposed in this pull request?

Update SQL examples and programming guide for R language binding.

Here I just follow example https://github.com/apache/spark/compare/master...liancheng:example-snippet-extraction, created a separate R file to store all the example code.

## How was this patch tested?

Manual test on my local machine.
Screenshot as below:

![screen shot 2016-07-06 at 4 52 25 pm](https://cloud.githubusercontent.com/assets/3925641/16638180/13925a58-439a-11e6-8d57-8451a63dcae9.png)

Author: Xin Ren <iamshrek@126.com>

Closes #14082 from keypointt/SPARK-16381.
parent e2262789
No related branches found
No related tags found
No related merge requests found
...@@ -86,9 +86,7 @@ The entry point into all functionality in Spark is the [`SparkSession`](api/pyth ...@@ -86,9 +86,7 @@ The entry point into all functionality in Spark is the [`SparkSession`](api/pyth
The entry point into all functionality in Spark is the [`SparkSession`](api/R/sparkR.session.html) class. To initialize a basic `SparkSession`, just call `sparkR.session()`: The entry point into all functionality in Spark is the [`SparkSession`](api/R/sparkR.session.html) class. To initialize a basic `SparkSession`, just call `sparkR.session()`:
{% highlight r %} {% include_example init_session r/RSparkSQLExample.R %}
sparkR.session()
{% endhighlight %}
Note that when invoked for the first time, `sparkR.session()` initializes a global `SparkSession` singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the `SparkSession` once, then SparkR functions like `read.df` will be able to access this global instance implicitly, and users don't need to pass the `SparkSession` instance around. Note that when invoked for the first time, `sparkR.session()` initializes a global `SparkSession` singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the `SparkSession` once, then SparkR functions like `read.df` will be able to access this global instance implicitly, and users don't need to pass the `SparkSession` instance around.
</div> </div>
...@@ -155,12 +153,7 @@ from a Hive table, or from [Spark data sources](#data-sources). ...@@ -155,12 +153,7 @@ from a Hive table, or from [Spark data sources](#data-sources).
As an example, the following creates a DataFrame based on the content of a JSON file: As an example, the following creates a DataFrame based on the content of a JSON file:
{% highlight r %} {% include_example create_DataFrames r/RSparkSQLExample.R %}
df <- read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame
showDF(df)
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -343,50 +336,8 @@ In addition to simple column references and expressions, DataFrames also have a ...@@ -343,50 +336,8 @@ In addition to simple column references and expressions, DataFrames also have a
</div> </div>
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %}
# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
showDF(df)
## age name
## null Michael
## 30 Andy
## 19 Justin
# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column {% include_example dataframe_operations r/RSparkSQLExample.R %}
showDF(select(df, "name"))
## name
## Michael
## Andy
## Justin
# Select everybody, but increment the age by 1
showDF(select(df, df$name, df$age + 1))
## name (age + 1)
## Michael null
## Andy 31
## Justin 20
# Select people older than 21
showDF(where(df, df$age > 21))
## age name
## 30 Andy
# Count people by age
showDF(count(groupBy(df, "age")))
## age count
## null 1
## 19 1
## 30 1
{% endhighlight %}
For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/R/index.html). For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/R/index.html).
...@@ -429,12 +380,10 @@ df = spark.sql("SELECT * FROM table") ...@@ -429,12 +380,10 @@ df = spark.sql("SELECT * FROM table")
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`. The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
{% highlight r %} {% include_example sql_query r/RSparkSQLExample.R %}
df <- sql("SELECT * FROM table")
{% endhighlight %}
</div>
</div> </div>
</div>
## Creating Datasets ## Creating Datasets
...@@ -888,10 +837,7 @@ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") ...@@ -888,10 +837,7 @@ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %} {% include_example source_parquet r/RSparkSQLExample.R %}
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -937,12 +883,7 @@ df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") ...@@ -937,12 +883,7 @@ df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
</div> </div>
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %} {% include_example source_json r/RSparkSQLExample.R %}
df <- read.df("examples/src/main/resources/people.json", "json")
write.df(select(df, "name", "age"), "namesAndAges.parquet", "parquet")
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -978,9 +919,7 @@ df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet ...@@ -978,9 +919,7 @@ df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %} {% include_example direct_query r/RSparkSQLExample.R %}
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -1133,26 +1072,7 @@ for teenName in teenNames.collect(): ...@@ -1133,26 +1072,7 @@ for teenName in teenNames.collect():
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %} {% include_example load_programmatically r/RSparkSQLExample.R %}
schemaPeople # The SparkDataFrame from the previous example.
# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(schemaPeople, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
{% endhighlight %}
</div> </div>
...@@ -1315,27 +1235,7 @@ df3.printSchema() ...@@ -1315,27 +1235,7 @@ df3.printSchema()
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %} {% include_example schema_merging r/RSparkSQLExample.R %}
# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema="true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)
{% endhighlight %}
</div> </div>
...@@ -1601,25 +1501,8 @@ Note that the file that is offered as _a json file_ is not a typical JSON file. ...@@ -1601,25 +1501,8 @@ Note that the file that is offered as _a json file_ is not a typical JSON file.
line must contain a separate, self-contained valid JSON object. As a consequence, line must contain a separate, self-contained valid JSON object. As a consequence,
a regular multi-line JSON file will most often fail. a regular multi-line JSON file will most often fail.
{% highlight r %} {% include_example load_json_file r/RSparkSQLExample.R %}
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- read.json(path)
# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Register this DataFrame as a table.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
{% endhighlight %}
</div> </div>
<div data-lang="sql" markdown="1"> <div data-lang="sql" markdown="1">
...@@ -1734,16 +1617,8 @@ results = spark.sql("FROM src SELECT key, value").collect() ...@@ -1734,16 +1617,8 @@ results = spark.sql("FROM src SELECT key, value").collect()
When working with Hive one must instantiate `SparkSession` with Hive support. This When working with Hive one must instantiate `SparkSession` with Hive support. This
adds support for finding tables in the MetaStore and writing queries using HiveQL. adds support for finding tables in the MetaStore and writing queries using HiveQL.
{% highlight r %}
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
{% endhighlight %} {% include_example hive_table r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
...@@ -1920,11 +1795,7 @@ df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable=' ...@@ -1920,11 +1795,7 @@ df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% highlight r %} {% include_example jdbc r/RSparkSQLExample.R %}
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
{% endhighlight %}
</div> </div>
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(SparkR)
# $example on:init_session$
sparkR.session(appName = "MyApp", sparkConfig = list(spark.executor.memory = "1g"))
# $example off:init_session$
# $example on:create_DataFrames$
df <- read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame
head(df)
# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
# $example off:create_DataFrames$
# $example on:dataframe_operations$
# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
head(df)
## age name
## null Michael
## 30 Andy
## 19 Justin
# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column
head(select(df, "name"))
## name
## Michael
## Andy
## Justin
# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
## name (age + 1)
## Michael null
## Andy 31
## Justin 20
# Select people older than 21
head(where(df, df$age > 21))
## age name
## 30 Andy
# Count people by age
head(count(groupBy(df, "age")))
## age count
## null 1
## 19 1
## 30 1
# $example off:dataframe_operations$
# Register this DataFrame as a table.
createOrReplaceTempView(df, "table")
# $example on:sql_query$
df <- sql("SELECT * FROM table")
# $example off:sql_query$
# $example on:source_parquet$
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
# $example off:source_parquet$
# $example on:source_json$
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
# $example off:source_json$
# $example on:direct_query$
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_query$
# $example on:load_programmatically$
df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
# $example off:load_programmatically$
# $example on:schema_merging$
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema="true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: double (nullable = true)
# |-- double: double (nullable = true)
# |-- triple: double (nullable = true)
# |-- key : int (nullable = true)
# $example off:schema_merging$
# $example on:load_json_file$
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- read.json(path)
# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Register this DataFrame as a table.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# $example off:load_json_file$
# $example on:hive_table$
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
# $example off:hive_table$
# $example on:jdbc$
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# $example off:jdbc$
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
library(SparkR) library(SparkR)
# Initialize SparkSession # Initialize SparkSession
sc <- sparkR.session(appName="SparkR-DataFrame-example") sc <- sparkR.session(appName = "SparkR-DataFrame-example")
# Create a simple local data.frame # Create a simple local data.frame
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
library(SparkR) library(SparkR)
# Initialize SparkSession # Initialize SparkSession
sparkR.session(appName="SparkR-ML-example") sparkR.session(appName = "SparkR-ML-example")
# $example on$ # $example on$
############################ spark.glm and glm ############################################## ############################ spark.glm and glm ##############################################
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment