-
Patrick Wendell authoredPatrick Wendell authored
layout: global
title: Quick Start
- This will become a table of contents (this text will be scraped). {:toc}
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will need much for this), then show how to write standalone jobs in Scala and Java. See the programming guide for a fuller reference.
To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run:
{% highlight bash %} $ sbt/sbt package {% endhighlight %}
Interactive Analysis with the Spark Shell
Basics
Spark's interactive shell provides a simple way to learn the API, as well as a powerful tool to analyze datasets interactively.
Start the shell by running ./spark-shell
in the Spark directory.
Spark's primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. Let's make a new RDD from the text of the README file in the Spark source directory:
{% highlight scala %} scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 {% endhighlight %}
RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Let's start with a few actions:
{% highlight scala %} scala> textFile.count() // Number of items in this RDD res0: Long = 74
scala> textFile.first() // First item in this RDD res1: String = # Spark {% endhighlight %}
Now let's use a transformation. We will use the filter
transformation to return a new RDD with a subset of the items in the file.
{% highlight scala %} scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09 {% endhighlight %}
We can chain together transformations and actions:
{% highlight scala %} scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 {% endhighlight %}
Transformations
RDD transformations can be used for more complex computations. Let's say we want to find the line with the most words:
{% highlight scala %} scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 16 {% endhighlight %}
This first maps a line to an integer value, creating a new RDD. reduce
is called on that RDD to find the largest line count. The arguments to map
and reduce
are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use Math.max()
function to make this code easier to understand:
{% highlight scala %} scala> import java.lang.Math import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 16 {% endhighlight %}
One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
{% highlight scala %} scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(java.lang.String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 {% endhighlight %}
Here, we combined the flatMap
, map
and reduceByKey
transformations to compute the per-word counts in the file as an RDD of (String, Int) pairs. To collect the word counts in our shell, we can use the collect
action:
{% highlight scala %}
scala> wordCounts.collect()
res6: Array[(java.lang.String, Int)] = Array((need,2), ("",43), (Extra,3), (using,1), (passed,1), (etc.,1), (its,1), (/usr/local/lib/libmesos.so
,1), (SCALA_HOME
,1), (option,1), (these,1), (#,1), (PATH
,,2), (200,1), (To,3),...
{% endhighlight %}
Caching
Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small "hot" dataset or when running an iterative algorithm like PageRank. As a simple example, let's mark our linesWithSpark
dataset to be cached:
{% highlight scala %} scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082