-
Patrick Wendell authored
This simplifies the shell a bunch and passes all arguments through to spark-submit. There is a tiny incompatibility from 0.9.1 which is that you can't put `-c` _or_ `--cores`, only `--cores`. However, spark-submit will give a good error message in this case, I don't think many people used this, and it's a trivial change for users. Author: Patrick Wendell <pwendell@gmail.com> Closes #542 from pwendell/spark-shell and squashes the following commits: 9eb3e6f [Patrick Wendell] Updating Spark docs b552459 [Patrick Wendell] Andrew's feedback 97720fa [Patrick Wendell] Review feedback aa2900b [Patrick Wendell] SPARK-1619 Launch spark-shell with spark-submit
Patrick Wendell authoredThis simplifies the shell a bunch and passes all arguments through to spark-submit. There is a tiny incompatibility from 0.9.1 which is that you can't put `-c` _or_ `--cores`, only `--cores`. However, spark-submit will give a good error message in this case, I don't think many people used this, and it's a trivial change for users. Author: Patrick Wendell <pwendell@gmail.com> Closes #542 from pwendell/spark-shell and squashes the following commits: 9eb3e6f [Patrick Wendell] Updating Spark docs b552459 [Patrick Wendell] Andrew's feedback 97720fa [Patrick Wendell] Review feedback aa2900b [Patrick Wendell] SPARK-1619 Launch spark-shell with spark-submit
layout: global
title: Spark Programming Guide
- This will become a table of contents (this text will be scraped). {:toc}
Overview
At a high level, every Spark application consists of a driver program that runs the user's main
function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only "added" to, such as counters and sums.
This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for closures. Note that you can also run Spark interactively using the bin/spark-shell
script. We highly recommend doing that to follow along!
Linking with Spark
Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may not work.
To write a Spark application, you need to add a dependency on Spark. If you use SBT or Maven, Spark is available through Maven Central at:
groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION}}
In addition, if you wish to access an HDFS cluster, you need to add a dependency on
hadoop-client
for your version of HDFS. Some common HDFS version tags are listed on the
third party distributions page.
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
Finally, you need to import some Spark classes and implicit conversions into your program. Add the following lines:
{% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf {% endhighlight %}
Initializing Spark
The first thing a Spark program must do is to create a SparkContext
object, which tells Spark
how to access a cluster. To create a SparkContext
you first need to build a SparkConf
object
that contains information about your application.
{% highlight scala %} val conf = new SparkConf().setAppName().setMaster() new SparkContext(conf) {% endhighlight %}
The <master>
parameter is a string specifying a Spark, Mesos or YARN cluster URL
to connect to, or a special "local" string to run in local mode, as described below. <app name>
is
a name for your application, which will be shown in the cluster web UI. It's also possible to set
these variables using a configuration file
which avoids hard-coding the master name in your application.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called sc
. Making your own SparkContext will not work. You can set which master the
context connects to using the --master
argument, and you can add JARs to the classpath
by passing a comma separated list to the --jars
argument. For example, to run
bin/spark-shell
on exactly four cores, use
{% highlight bash %} $ ./bin/spark-shell --master local[4] {% endhighlight %}
Or, to also add code.jar
to its classpath, use:
{% highlight bash %} $ ./bin/spark-shell --master local[4] --jars code.jar {% endhighlight %}
Master URLs
The master URL passed to Spark can be in one of the following formats:
Master URL | Meaning |
---|---|
local | Run Spark locally with one worker thread (i.e. no parallelism at all). |
local[K] | Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
local[*] | Run Spark locally with as many worker threads as logical cores on your machine. |
spark://HOST:PORT | Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. |
mesos://HOST:PORT | Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, which is 5050 by default. |
yarn-client | Connect to a YARN cluster in client mode. The cluster location will be inferred based on the local Hadoop configuration. |
yarn-cluster | Connect to a YARN cluster in cluster mode. The cluster location will be inferred based on the local Hadoop configuration. |
If no master URL is specified, the spark shell defaults to "local[*]".
Resilient Distributed Datasets (RDDs)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.
Parallelized Collections
Parallelized collections are created by calling SparkContext
's parallelize
method on an existing Scala collection (a Seq
object). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is some interpreter output showing how to create a parallel collection from an array:
{% highlight scala %} scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data) distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e {% endhighlight %}
Once created, the distributed dataset (distData
here) can be operated on in parallel. For example, we might call distData.reduce(_ + _)
to add up the elements of the array. We describe operations on distributed datasets later on.
One important parameter for parallel collections is the number of slices to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize
(e.g. sc.parallelize(data, 10)
).
Hadoop Datasets
Spark can create distributed datasets from any file stored in the Hadoop distributed file system (HDFS) or other storage systems supported by Hadoop (including your local file system, Amazon S3, Hypertable, HBase, etc). Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext
's textFile
method. This method takes an URI for the file (either a local path on the machine, or a hdfs://
, s3n://
, kfs://
, etc URI). Here is an example invocation:
{% highlight scala %} scala> val distFile = sc.textFile("data.txt") distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08 {% endhighlight %}
Once created, distFile
can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map
and reduce
operations as follows: distFile.map(_.size).reduce(_ + _)
.
The textFile
method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks.