Skip to content
Snippets Groups Projects
Commit fbb3fc41 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #346 from JoshRosen/python-api

Python API (PySpark)
parents 44b3e41f 49c74ba2
No related branches found
No related tags found
No related merge requests found
Showing
with 762 additions and 12 deletions
package spark.api.python
import spark.Partitioner
import java.util.Arrays
/**
* A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
*/
private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
if (key == null) {
return 0
}
else {
val hashCode = {
if (key.isInstanceOf[Array[Byte]]) {
Arrays.hashCode(key.asInstanceOf[Array[Byte]])
} else {
key.hashCode()
}
}
val mod = hashCode % numPartitions
if (mod < 0) {
mod + numPartitions
} else {
mod // Guard against negative hash codes
}
}
}
override def equals(other: Any): Boolean = other match {
case h: PythonPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
}
package spark.api.python
import java.io._
import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.io.Source
import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import spark.broadcast.Broadcast
import spark._
import spark.rdd.PipedRDD
import java.util
private[spark] class PythonRDD[T: ClassManifest](
parent: RDD[T],
command: Seq[String],
envVars: java.util.Map[String, String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
preservePartitoning: Boolean, pythonExec: String,
broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
broadcastVars)
override def splits = parent.splits
override val dependencies = List(new OneToOneDependency(parent))
override val partitioner = if (preservePartitoning) parent.partitioner else None
override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = {
val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py"))
// Add the environmental variables to the process.
val currentEnvVars = pb.environment()
for ((variable, value) <- envVars) {
currentEnvVars.put(variable, value)
}
val proc = pb.start()
val env = SparkEnv.get
// Start a thread to print the process's stderr to ours
new Thread("stderr reader for " + command) {
override def run() {
for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
System.err.println(line)
}
}
}.start()
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + command) {
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
val dOut = new DataOutputStream(proc.getOutputStream)
// Split index
dOut.writeInt(split.index)
// Broadcast variables
dOut.writeInt(broadcastVars.length)
for (broadcast <- broadcastVars) {
dOut.writeLong(broadcast.id)
dOut.writeInt(broadcast.value.length)
dOut.write(broadcast.value)
dOut.flush()
}
// Serialized user code
for (elem <- command) {
out.println(elem)
}
out.flush()
// Data values
for (elem <- parent.iterator(split, context)) {
PythonRDD.writeAsPickle(elem, dOut)
}
dOut.flush()
out.flush()
proc.getOutputStream.close()
}
}.start()
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(proc.getInputStream)
return new Iterator[Array[Byte]] {
def next() = {
val obj = _nextObj
_nextObj = read()
obj
}
private def read() = {
try {
val length = stream.readInt()
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
} catch {
case eof: EOFException => {
val exitStatus = proc.waitFor()
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
new Array[Byte](0)
}
case e => throw e
}
}
var _nextObj = read()
def hasNext = _nextObj.length != 0
}
}
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split, context: TaskContext) =
prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (a, b)
case x => throw new Exception("PairwiseRDD: unexpected value: " + x)
}
val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
}
private[spark] object PythonRDD {
/** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
def stripPickle(arr: Array[Byte]) : Array[Byte] = {
arr.slice(2, arr.length - 1)
}
/**
* Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
* The data format is a 32-bit integer representing the pickled object's length (in bytes),
* followed by the pickled data.
*
* Pickle module:
*
* http://docs.python.org/2/library/pickle.html
*
* The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
*
* http://hg.python.org/cpython/file/2.6/Lib/pickle.py
* http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
*
* @param elem the object to write
* @param dOut a data output stream
*/
def writeAsPickle(elem: Any, dOut: DataOutputStream) {
if (elem.isInstanceOf[Array[Byte]]) {
val arr = elem.asInstanceOf[Array[Byte]]
dOut.writeInt(arr.length)
dOut.write(arr)
} else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) {
val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes
dOut.writeInt(length)
dOut.writeByte(Pickle.PROTO)
dOut.writeByte(Pickle.TWO)
dOut.write(PythonRDD.stripPickle(t._1))
dOut.write(PythonRDD.stripPickle(t._2))
dOut.writeByte(Pickle.TUPLE2)
dOut.writeByte(Pickle.STOP)
} else if (elem.isInstanceOf[String]) {
// For uniformity, strings are wrapped into Pickles.
val s = elem.asInstanceOf[String].getBytes("UTF-8")
val length = 2 + 1 + 4 + s.length + 1
dOut.writeInt(length)
dOut.writeByte(Pickle.PROTO)
dOut.writeByte(Pickle.TWO)
dOut.write(Pickle.BINUNICODE)
dOut.writeInt(Integer.reverseBytes(s.length))
dOut.write(s)
dOut.writeByte(Pickle.STOP)
} else {
throw new Exception("Unexpected RDD type")
}
}
def readRDDFromPickleFile(sc: JavaSparkContext, filename: String, parallelism: Int) :
JavaRDD[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
try {
while (true) {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
objs.append(obj)
}
} catch {
case eof: EOFException => {}
case e => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
val file = new DataOutputStream(new FileOutputStream(filename))
for (item <- items) {
writeAsPickle(item, file)
}
file.close()
}
def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] =
rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head
}
private object Pickle {
val PROTO: Byte = 0x80.toByte
val TWO: Byte = 0x02.toByte
val BINUNICODE: Byte = 'X'
val STOP: Byte = '.'
val TUPLE2: Byte = 0x86.toByte
val EMPTY_LIST: Byte = ']'
val MARK: Byte = '('
val APPENDS: Byte = 'e'
}
private class ExtractValue extends spark.api.java.function.Function[(Array[Byte],
Array[Byte]), Array[Byte]] {
override def call(pair: (Array[Byte], Array[Byte])) : Array[Byte] = pair._2
}
private class BytesToString extends spark.api.java.function.Function[Array[Byte], String] {
override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
}
......@@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong
import spark._
abstract class Broadcast[T](id: Long) extends Serializable {
abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
def value: T
// We cannot have an abstract readObject here due to some weird issues with
......
......@@ -25,10 +25,12 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri
// supported languages too.
{% endhighlight %}
## Scaladoc
## API Docs (Scaladoc and Epydoc)
You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory.
When you run `jekyll` in the docs directory, it will also copy over the scala doc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc.
Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory.
NOTE: To skip the step of building and copying over the scaladoc when you build the docs, run `SKIP_SCALADOC=1 jekyll`.
When you run `jekyll` in the docs directory, it will also copy over the scaladoc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the PySpark docs using [epydoc](http://epydoc.sourceforge.net/).
NOTE: To skip the step of building and copying over the scaladoc when you build the docs, run `SKIP_SCALADOC=1 jekyll`. Similarly, `SKIP_EPYDOC=1 jekyll` will skip PySpark API doc generation.
......@@ -47,10 +47,17 @@
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="scala-programming-guide.html">Scala</a></li>
<li><a href="java-programming-guide.html">Java</a></li>
<li><a href="python-programming-guide.html">Python</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/core/index.html">Scala/Java (Scaladoc)</a></li>
<li><a href="api/pyspark/index.html">Python (Epydoc)</a></li>
</ul>
</li>
<li><a href="api/core/index.html">API (Scaladoc)</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
......
......@@ -28,3 +28,20 @@ if ENV['SKIP_SCALADOC'] != '1'
cp_r(source + "/.", dest)
end
end
if ENV['SKIP_EPYDOC'] != '1'
puts "Moving to python directory and building epydoc."
cd("../python")
puts `epydoc --config epydoc.conf`
puts "Moving back into docs dir."
cd("../docs")
puts "echo making directory pyspark"
mkdir_p "pyspark"
puts "cp -r ../python/docs/. api/pyspark"
cp_r("../python/docs/.", "api/pyspark")
cd("..")
end
......@@ -8,3 +8,4 @@ Here you can find links to the Scaladoc generated for the Spark sbt subprojects.
- [Core](api/core/index.html)
- [Examples](api/examples/index.html)
- [Bagel](api/bagel/index.html)
- [PySpark](api/pyspark/index.html)
......@@ -7,11 +7,11 @@ title: Spark Overview
TODO(andyk): Rewrite to make the Java API a first class part of the story.
{% endcomment %}
Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an
interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can
run on top of the [Apache Mesos](http://incubator.apache.org/mesos/) cluster manager,
Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter.
It provides clean, language-integrated APIs in [Scala](scala-programming-guide.html), [Java](java-programming-guide.html), and [Python](python-programming-guide.html), with a rich array of parallel operators.
Spark can run on top of the [Apache Mesos](http://incubator.apache.org/mesos/) cluster manager,
[Hadoop YARN](http://hadoop.apache.org/docs/r2.0.1-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html),
Amazon EC2, or without an independent resource manager ("standalone mode").
Amazon EC2, or without an independent resource manager ("standalone mode").
# Downloading
......@@ -59,6 +59,12 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
* [Quick Start](quick-start.html): a quick introduction to the Spark API; start here!
* [Spark Programming Guide](scala-programming-guide.html): an overview of Spark concepts, and details on the Scala API
* [Java Programming Guide](java-programming-guide.html): using Spark from Java
* [Python Programming Guide](python-programming-guide.html): using Spark from Python
**API Docs:**
* [Java/Scala (Scaladoc)](api/core/index.html)
* [Python (Epydoc)](api/pyspark/index.html)
**Deployment guides:**
......@@ -72,7 +78,6 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
* [Configuration](configuration.html): customize Spark via its configuration system
* [Tuning Guide](tuning.html): best practices to optimize performance and memory use
* [API Docs (Scaladoc)](api/core/index.html)
* [Bagel](bagel-programming-guide.html): an implementation of Google's Pregel on Spark
* [Contributing to Spark](contributing-to-spark.html)
......
---
layout: global
title: Python Programming Guide
---
The Spark Python API (PySpark) exposes most of the Spark features available in the Scala version to Python.
To learn the basics of Spark, we recommend reading through the
[Scala programming guide](scala-programming-guide.html) first; it should be
easy to follow even if you don't know Scala.
This guide will show how to use the Spark features described there in Python.
# Key Differences in the Python API
There are a few key differences between the Python and Scala APIs:
* Python is dynamically typed, so RDDs can hold objects of different types.
* PySpark does not currently support the following Spark features:
- Accumulators
- Special functions on RDDs of doubles, such as `mean` and `stdev`
- `lookup`
- `persist` at storage levels other than `MEMORY_ONLY`
- `sample`
- `sort`
In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types.
Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax:
{% highlight python %}
logData = sc.textFile(logFile).cache()
errors = logData.filter(lambda s: 'ERROR' in s.split())
{% endhighlight %}
You can also pass functions that are defined using the `def` keyword; this is useful for more complicated functions that cannot be expressed using `lambda`:
{% highlight python %}
def is_error(line):
return 'ERROR' in line.split()
errors = logData.filter(is_error)
{% endhighlight %}
Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated to other tasks:
{% highlight python %}
error_keywords = ["Exception", "Error"]
def is_error(line):
words = line.split()
return any(keyword in words for keyword in error_keywords)
errors = logData.filter(is_error)
{% endhighlight %}
PySpark will automatically ship these functions to workers, along with any objects that they reference.
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
# Installing and Configuring PySpark
PySpark requires Python 2.6 or higher.
PySpark jobs are executed using a standard cPython interpreter in order to support Python modules that use C extensions.
We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/).
By default, PySpark's scripts will run programs using `python`; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh`.
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh`.
The script automatically adds the `pyspark` package to the `PYTHONPATH`.
# Interactive Use
The `pyspark` script launches a Python interpreter that is configured to run PySpark jobs.
When run without any input files, `pyspark` launches a shell that can be used explore data interactively, which is a simple way to learn the API:
{% highlight python %}
>>> words = sc.textFile("/usr/share/dict/words")
>>> words.filter(lambda w: w.startswith("spar")).take(5)
[u'spar', u'sparable', u'sparada', u'sparadrap', u'sparagrass']
{% endhighlight %}
By default, the `pyspark` shell creates SparkContext that runs jobs locally.
To connect to a non-local cluster, set the `MASTER` environment variable.
For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
{% highlight shell %}
$ MASTER=spark://IP:PORT ./pyspark
{% endhighlight %}
# Standalone Use
PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `pyspark`.
The Quick Start guide includes a [complete example](quick-start.html#a-standalone-job-in-python) of a standalone Python job.
Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor:
{% highlight python %}
from pyspark import SparkContext
sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
{% endhighlight %}
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
# Where to Go from Here
PySpark includes several sample programs using the Python API in `python/examples`.
You can run them by passing the files to the `pyspark` script -- for example `./pyspark python/examples/wordcount.py`.
Each example program prints usage help when run without any arguments.
We currently provide [API documentation](api/pyspark/index.html) for the Python API as Epydoc.
Many of the RDD method descriptions contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
......@@ -6,7 +6,8 @@ title: Quick Start
* This will become a table of contents (this text will be scraped).
{:toc}
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will not need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a more complete reference.
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will not need much for this), then show how to write standalone jobs in Scala, Java, and Python.
See the [programming guide](scala-programming-guide.html) for a more complete reference.
To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run:
......@@ -240,3 +241,40 @@ Lines with a: 8422, Lines with b: 1836
{% endhighlight %}
This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
# A Standalone Job In Python
Now we will show how to write a standalone job using the Python API (PySpark).
As an example, we'll create a simple Spark job, `SimpleJob.py`:
{% highlight python %}
"""SimpleJob.py"""
from pyspark import SparkContext
logFile = "/var/log/syslog" # Should be some file on your system
sc = SparkContext("local", "Simple job")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
{% endhighlight %}
This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file.
Like in the Scala and Java examples, we use a SparkContext to create RDDs.
We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference.
For jobs that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide).
`SimpleJob` is simple enough that we do not need to specify any code dependencies.
We can run this job using the `pyspark` script:
{% highlight python %}
$ cd $SPARK_HOME
$ ./pyspark SimpleJob.py
...
Lines with a: 8422, Lines with b: 1836
{% endhighlight python %}
This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
pyspark 0 → 100755
#!/usr/bin/env bash
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
# Figure out which Python executable to use
if [ -z "$PYSPARK_PYTHON" ] ; then
PYSPARK_PYTHON="python"
fi
export PYSPARK_PYTHON
# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
# Launch with `scala` by default:
if [[ "$SPARK_LAUNCH_WITH_SCALA" != "0" ]] ; then
export SPARK_LAUNCH_WITH_SCALA=1
fi
exec "$PYSPARK_PYTHON" "$@"
*.pyc
docs/
[epydoc] # Epydoc section marker (required by ConfigParser)
# Information about the project.
name: PySpark
url: http://spark-project.org
# The list of modules to document. Modules can be named using
# dotted names, module filenames, or package directory names.
# This option may be repeated.
modules: pyspark
# Write html output to the directory "apidocs"
output: html
target: docs/
private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.serializers
pyspark.java_gateway pyspark.examples pyspark.shell
"""
This example requires numpy (http://www.numpy.org/)
"""
import sys
import numpy as np
from pyspark import SparkContext
def parseVector(line):
return np.array([float(x) for x in line.split(' ')])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex
if __name__ == "__main__":
if len(sys.argv) < 5:
print >> sys.stderr, \
"Usage: PythonKMeans <master> <file> <k> <convergeDist>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonKMeans")
lines = sc.textFile(sys.argv[2])
data = lines.map(parseVector).cache()
K = int(sys.argv[3])
convergeDist = float(sys.argv[4])
# TODO: change this after we port takeSample()
#kPoints = data.takeSample(False, K, 34)
kPoints = data.take(K)
tempDist = 1.0
while tempDist > convergeDist:
closest = data.map(
lambda p : (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.map(
lambda (x, (y, z)): (x, y / z)).collect()
tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints:
kPoints[x] = y
print "Final centers: " + str(kPoints)
"""
This example requires numpy (http://www.numpy.org/)
"""
from collections import namedtuple
from math import exp
from os.path import realpath
import sys
import numpy as np
from pyspark import SparkContext
N = 100000 # Number of data points
D = 10 # Number of dimensions
R = 0.7 # Scaling factor
ITERATIONS = 5
np.random.seed(42)
DataPoint = namedtuple("DataPoint", ['x', 'y'])
from lr import DataPoint # So that DataPoint is properly serialized
def generateData():
def generatePoint(i):
y = -1 if i % 2 == 0 else 1
x = np.random.normal(size=D) + (y * R)
return DataPoint(x, y)
return [generatePoint(i) for i in range(N)]
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, \
"Usage: PythonLR <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
points = sc.parallelize(generateData(), slices).cache()
# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
print "Initial w: " + str(w)
def add(x, y):
x += y
return x
for i in range(1, ITERATIONS + 1):
print "On iteration %i" % i
gradient = points.map(lambda p:
(1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x
).reduce(add)
w -= gradient
print "Final w: " + str(w)
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, \
"Usage: PythonPi <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonPi")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
n = 100000 * slices
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
import sys
from random import Random
from pyspark import SparkContext
numEdges = 200
numVertices = 100
rand = Random(42)
def generateGraph():
edges = set()
while len(edges) < numEdges:
src = rand.randrange(0, numEdges)
dst = rand.randrange(0, numEdges)
if src != dst:
edges.add((src, dst))
return edges
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, \
"Usage: PythonTC <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonTC")
slices = sys.argv[2] if len(sys.argv) > 2 else 2
tc = sc.parallelize(generateGraph(), slices).cache()
# Linear transitive closure: each round grows paths by one edge,
# by joining the graph's edges with the already-discovered paths.
# e.g. join the path (y, z) from the TC with the edge (x, y) from
# the graph to obtain the path (x, z).
# Because join() joins on keys, the edges are stored in reversed order.
edges = tc.map(lambda (x, y): (y, x))
oldCount = 0L
nextCount = tc.count()
while True:
oldCount = nextCount
# Perform the join, obtaining an RDD of (y, (z, x)) pairs,
# then project the result to obtain the new (x, z) paths.
new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
tc = tc.union(new_edges).distinct().cache()
nextCount = tc.count()
if nextCount == oldCount:
break
print "TC has %i edges" % tc.count()
import sys
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, \
"Usage: PythonWordCount <master> <file>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonWordCount")
lines = sc.textFile(sys.argv[2], 1)
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print "%s : %i" % (word, count)
Copyright (c) 2009-2011, Barthelemy Dagenais All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- The name of the author may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
b7924aabe9c5e63f0a4d8bbd17019534c7ec014e
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment