Skip to content
Snippets Groups Projects
  • Patrick Wendell's avatar
    dc3b640a
    SPARK-1619 Launch spark-shell with spark-submit · dc3b640a
    Patrick Wendell authored
    This simplifies the shell a bunch and passes all arguments through to spark-submit.
    
    There is a tiny incompatibility from 0.9.1 which is that you can't put `-c` _or_ `--cores`, only `--cores`. However, spark-submit will give a good error message in this case, I don't think many people used this, and it's a trivial change for users.
    
    Author: Patrick Wendell <pwendell@gmail.com>
    
    Closes #542 from pwendell/spark-shell and squashes the following commits:
    
    9eb3e6f [Patrick Wendell] Updating Spark docs
    b552459 [Patrick Wendell] Andrew's feedback
    97720fa [Patrick Wendell] Review feedback
    aa2900b [Patrick Wendell] SPARK-1619 Launch spark-shell with spark-submit
    dc3b640a
    History
    SPARK-1619 Launch spark-shell with spark-submit
    Patrick Wendell authored
    This simplifies the shell a bunch and passes all arguments through to spark-submit.
    
    There is a tiny incompatibility from 0.9.1 which is that you can't put `-c` _or_ `--cores`, only `--cores`. However, spark-submit will give a good error message in this case, I don't think many people used this, and it's a trivial change for users.
    
    Author: Patrick Wendell <pwendell@gmail.com>
    
    Closes #542 from pwendell/spark-shell and squashes the following commits:
    
    9eb3e6f [Patrick Wendell] Updating Spark docs
    b552459 [Patrick Wendell] Andrew's feedback
    97720fa [Patrick Wendell] Review feedback
    aa2900b [Patrick Wendell] SPARK-1619 Launch spark-shell with spark-submit
scala-programming-guide.md 26.64 KiB
layout: global
title: Spark Programming Guide
  • This will become a table of contents (this text will be scraped). {:toc}

Overview

At a high level, every Spark application consists of a driver program that runs the user's main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only "added" to, such as counters and sums.

This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for closures. Note that you can also run Spark interactively using the bin/spark-shell script. We highly recommend doing that to follow along!

Linking with Spark

Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may not work.

To write a Spark application, you need to add a dependency on Spark. If you use SBT or Maven, Spark is available through Maven Central at:

groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION}}

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS. Some common HDFS version tags are listed on the third party distributions page.

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

Finally, you need to import some Spark classes and implicit conversions into your program. Add the following lines:

{% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf {% endhighlight %}

Initializing Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

{% highlight scala %} val conf = new SparkConf().setAppName().setMaster() new SparkContext(conf) {% endhighlight %}

The <master> parameter is a string specifying a Spark, Mesos or YARN cluster URL to connect to, or a special "local" string to run in local mode, as described below. <app name> is a name for your application, which will be shown in the cluster web UI. It's also possible to set these variables using a configuration file which avoids hard-coding the master name in your application.

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the --master argument, and you can add JARs to the classpath by passing a comma separated list to the --jars argument. For example, to run bin/spark-shell on exactly four cores, use

{% highlight bash %} $ ./bin/spark-shell --master local[4] {% endhighlight %}

Or, to also add code.jar to its classpath, use:

{% highlight bash %} $ ./bin/spark-shell --master local[4] --jars code.jar {% endhighlight %}

Master URLs

The master URL passed to Spark can be in one of the following formats:

Master URL Meaning
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, which is 5050 by default.
yarn-client Connect to a YARN cluster in client mode. The cluster location will be inferred based on the local Hadoop configuration.
yarn-cluster Connect to a YARN cluster in cluster mode. The cluster location will be inferred based on the local Hadoop configuration.

If no master URL is specified, the spark shell defaults to "local[*]".

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.

Parallelized Collections

Parallelized collections are created by calling SparkContext's parallelize method on an existing Scala collection (a Seq object). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is some interpreter output showing how to create a parallel collection from an array:

{% highlight scala %} scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data) distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e {% endhighlight %}

Once created, the distributed dataset (distData here) can be operated on in parallel. For example, we might call distData.reduce(_ + _) to add up the elements of the array. We describe operations on distributed datasets later on.

One important parameter for parallel collections is the number of slices to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)).

Hadoop Datasets

Spark can create distributed datasets from any file stored in the Hadoop distributed file system (HDFS) or other storage systems supported by Hadoop (including your local file system, Amazon S3, Hypertable, HBase, etc). Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext's textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, kfs://, etc URI). Here is an example invocation:

{% highlight scala %} scala> val distFile = sc.textFile("data.txt") distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08 {% endhighlight %}

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(_.size).reduce(_ + _).

The textFile method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks.