Skip to content
Snippets Groups Projects
scala-programming-guide.md 26.05 KiB
layout: global
title: Spark Programming Guide
  • This will become a table of contents (this text will be scraped). {:toc}

Overview

At a high level, every Spark application consists of a driver program that runs the user's main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only "added" to, such as counters and sums.

This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for closures. Note that you can also run Spark interactively using the bin/spark-shell script. We highly recommend doing that to follow along!

Linking with Spark

Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_VERSION}}. If you write applications in Scala, you'll need to use this same version of Scala in your program -- newer major versions may not work.

To write a Spark application, you need to add a dependency on Spark. If you use SBT or Maven, Spark is available through Maven Central at:

groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}} 

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

For other build systems, you can run sbt assembly to pack Spark and its dependencies into one JAR (assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop*.jar), then add this to your CLASSPATH. Set the HDFS version as described here.

Finally, you need to import some Spark classes and implicit conversions into your program. Add the following lines:

{% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ {% endhighlight %}

Initializing Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. This is done through the following constructor:

{% highlight scala %} new SparkContext(master, appName, [sparkHome], [jars]) {% endhighlight %}

or through new SparkContext(conf), which takes a SparkConf object for more advanced configuration.

The master parameter is a string specifying a Spark or Mesos cluster URL to connect to, or a special "local" string to run in local mode, as described below. appName is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the MASTER environment variable, and you can add JARs to the classpath with the ADD_JARS variable. For example, to run bin/spark-shell on four cores, use

{% highlight bash %} $ MASTER=local[4] ./bin/spark-shell {% endhighlight %}

Or, to also add code.jar to its classpath, use:

{% highlight bash %} $ MASTER=local[4] ADD_JARS=code.jar ./bin/spark-shell {% endhighlight %}

Master URLs

The master URL passed to Spark can be in one of the following formats:

Master URL Meaning
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, which is 5050 by default.

If no master URL is specified, the spark shell defaults to "local".

For running on YARN, Spark launches an instance of the standalone deploy cluster within YARN; see running on YARN for details.

Deploying Code on a Cluster

If you want to run your application on a cluster, you will need to specify the two optional parameters to SparkContext to let it find your code:

  • sparkHome: The path at which Spark is installed on your worker machines (it should be the same on all of them).
  • jars: A list of JAR files on the local machine containing your application's code and any dependencies, which Spark will deploy to all the worker nodes. You'll need to package your application into a set of JARs using your build system. For example, if you're using SBT, the sbt-assembly plugin is a good way to make a single JAR with your code and dependencies.

If you run bin/spark-shell on a cluster, you can add JARs to it by specifying the ADD_JARS environment variable before you launch it. This variable should contain a comma-separated list of JARs. For example, ADD_JARS=a.jar,b.jar ./bin/spark-shell will launch a shell with a.jar and b.jar on its classpath. In addition, any new classes you define in the shell will automatically be distributed.

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.

Parallelized Collections

Parallelized collections are created by calling SparkContext's parallelize method on an existing Scala collection (a Seq object). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is some interpreter output showing how to create a parallel collection from an array:

{% highlight scala %} scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data) distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e {% endhighlight %}

Once created, the distributed dataset (distData here) can be operated on in parallel. For example, we might call distData.reduce(_ + _) to add up the elements of the array. We describe operations on distributed datasets later on.