Skip to content
Snippets Groups Projects
Commit 170e451f authored by Josh Rosen's avatar Josh Rosen
Browse files

Minor documentation and style fixes for PySpark.

parent 6f6a6b79
No related branches found
No related tags found
No related merge requests found
...@@ -17,9 +17,9 @@ private[spark] class PythonPartitioner(override val numPartitions: Int) extends ...@@ -17,9 +17,9 @@ private[spark] class PythonPartitioner(override val numPartitions: Int) extends
val hashCode = { val hashCode = {
if (key.isInstanceOf[Array[Byte]]) { if (key.isInstanceOf[Array[Byte]]) {
Arrays.hashCode(key.asInstanceOf[Array[Byte]]) Arrays.hashCode(key.asInstanceOf[Array[Byte]])
} } else {
else
key.hashCode() key.hashCode()
}
} }
val mod = hashCode % numPartitions val mod = hashCode % numPartitions
if (mod < 0) { if (mod < 0) {
......
...@@ -13,8 +13,12 @@ import spark.rdd.PipedRDD ...@@ -13,8 +13,12 @@ import spark.rdd.PipedRDD
private[spark] class PythonRDD[T: ClassManifest]( private[spark] class PythonRDD[T: ClassManifest](
parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], parent: RDD[T],
preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) command: Seq[String],
envVars: java.util.Map[String, String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) { extends RDD[Array[Byte]](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words // Similar to Runtime.exec(), if we are given a single string, split it into words
...@@ -38,8 +42,8 @@ private[spark] class PythonRDD[T: ClassManifest]( ...@@ -38,8 +42,8 @@ private[spark] class PythonRDD[T: ClassManifest](
// Add the environmental variables to the process. // Add the environmental variables to the process.
val currentEnvVars = pb.environment() val currentEnvVars = pb.environment()
envVars.foreach { for ((variable, value) <- envVars) {
case (variable, value) => currentEnvVars.put(variable, value) currentEnvVars.put(variable, value)
} }
val proc = pb.start() val proc = pb.start()
...@@ -116,6 +120,10 @@ private[spark] class PythonRDD[T: ClassManifest]( ...@@ -116,6 +120,10 @@ private[spark] class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
} }
/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev.context) { RDD[(Array[Byte], Array[Byte])](prev.context) {
override def splits = prev.splits override def splits = prev.splits
...@@ -139,6 +147,16 @@ private[spark] object PythonRDD { ...@@ -139,6 +147,16 @@ private[spark] object PythonRDD {
* Write strings, pickled Python objects, or pairs of pickled objects to a data output stream. * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
* The data format is a 32-bit integer representing the pickled object's length (in bytes), * The data format is a 32-bit integer representing the pickled object's length (in bytes),
* followed by the pickled data. * followed by the pickled data.
*
* Pickle module:
*
* http://docs.python.org/2/library/pickle.html
*
* The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
*
* http://hg.python.org/cpython/file/2.6/Lib/pickle.py
* http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
*
* @param elem the object to write * @param elem the object to write
* @param dOut a data output stream * @param dOut a data output stream
*/ */
...@@ -201,15 +219,14 @@ private[spark] object PythonRDD { ...@@ -201,15 +219,14 @@ private[spark] object PythonRDD {
} }
private object Pickle { private object Pickle {
def b(x: Int): Byte = x.asInstanceOf[Byte] val PROTO: Byte = 0x80.toByte
val PROTO: Byte = b(0x80) val TWO: Byte = 0x02.toByte
val TWO: Byte = b(0x02) val BINUNICODE: Byte = 'X'
val BINUNICODE : Byte = 'X' val STOP: Byte = '.'
val STOP : Byte = '.' val TUPLE2: Byte = 0x86.toByte
val TUPLE2 : Byte = b(0x86) val EMPTY_LIST: Byte = ']'
val EMPTY_LIST : Byte = ']' val MARK: Byte = '('
val MARK : Byte = '(' val APPENDS: Byte = 'e'
val APPENDS : Byte = 'e'
} }
private class ExtractValue extends spark.api.java.function.Function[(Array[Byte], private class ExtractValue extends spark.api.java.function.Function[(Array[Byte],
......
...@@ -8,7 +8,7 @@ TODO(andyk): Rewrite to make the Java API a first class part of the story. ...@@ -8,7 +8,7 @@ TODO(andyk): Rewrite to make the Java API a first class part of the story.
{% endcomment %} {% endcomment %}
Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter.
It provides clean, language-integrated APIs in Scala, Java, and Python, with a rich array of parallel operators. It provides clean, language-integrated APIs in [Scala](scala-programming-guide.html), [Java](java-programming-guide.html), and [Python](python-programming-guide.html), with a rich array of parallel operators.
Spark can run on top of the [Apache Mesos](http://incubator.apache.org/mesos/) cluster manager, Spark can run on top of the [Apache Mesos](http://incubator.apache.org/mesos/) cluster manager,
[Hadoop YARN](http://hadoop.apache.org/docs/r2.0.1-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html), [Hadoop YARN](http://hadoop.apache.org/docs/r2.0.1-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html),
Amazon EC2, or without an independent resource manager ("standalone mode"). Amazon EC2, or without an independent resource manager ("standalone mode").
...@@ -61,6 +61,11 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`). ...@@ -61,6 +61,11 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
* [Java Programming Guide](java-programming-guide.html): using Spark from Java * [Java Programming Guide](java-programming-guide.html): using Spark from Java
* [Python Programming Guide](python-programming-guide.html): using Spark from Python * [Python Programming Guide](python-programming-guide.html): using Spark from Python
**API Docs:**
* [Java/Scala (Scaladoc)](api/core/index.html)
* [Python (Epydoc)](api/pyspark/index.html)
**Deployment guides:** **Deployment guides:**
* [Running Spark on Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes * [Running Spark on Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
...@@ -73,7 +78,6 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`). ...@@ -73,7 +78,6 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
* [Configuration](configuration.html): customize Spark via its configuration system * [Configuration](configuration.html): customize Spark via its configuration system
* [Tuning Guide](tuning.html): best practices to optimize performance and memory use * [Tuning Guide](tuning.html): best practices to optimize performance and memory use
* API Docs: [Java/Scala (Scaladoc)](api/core/index.html) and [Python (Epydoc)](api/pyspark/index.html)
* [Bagel](bagel-programming-guide.html): an implementation of Google's Pregel on Spark * [Bagel](bagel-programming-guide.html): an implementation of Google's Pregel on Spark
* [Contributing to Spark](contributing-to-spark.html) * [Contributing to Spark](contributing-to-spark.html)
......
...@@ -17,8 +17,7 @@ There are a few key differences between the Python and Scala APIs: ...@@ -17,8 +17,7 @@ There are a few key differences between the Python and Scala APIs:
* Python is dynamically typed, so RDDs can hold objects of different types. * Python is dynamically typed, so RDDs can hold objects of different types.
* PySpark does not currently support the following Spark features: * PySpark does not currently support the following Spark features:
- Accumulators - Accumulators
- Special functions on RRDs of doubles, such as `mean` and `stdev` - Special functions on RDDs of doubles, such as `mean` and `stdev`
- Approximate jobs / functions, such as `countApprox` and `sumApprox`.
- `lookup` - `lookup`
- `mapPartitionsWithSplit` - `mapPartitionsWithSplit`
- `persist` at storage levels other than `MEMORY_ONLY` - `persist` at storage levels other than `MEMORY_ONLY`
......
"""
This example requires numpy (http://www.numpy.org/)
"""
import sys import sys
from pyspark.context import SparkContext import numpy as np
from numpy import array, sum as np_sum from pyspark import SparkContext
def parseVector(line): def parseVector(line):
return array([float(x) for x in line.split(' ')]) return np.array([float(x) for x in line.split(' ')])
def closestPoint(p, centers): def closestPoint(p, centers):
bestIndex = 0 bestIndex = 0
closest = float("+inf") closest = float("+inf")
for i in range(len(centers)): for i in range(len(centers)):
tempDist = np_sum((p - centers[i]) ** 2) tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest: if tempDist < closest:
closest = tempDist closest = tempDist
bestIndex = i bestIndex = i
...@@ -41,7 +44,7 @@ if __name__ == "__main__": ...@@ -41,7 +44,7 @@ if __name__ == "__main__":
newPoints = pointStats.map( newPoints = pointStats.map(
lambda (x, (y, z)): (x, y / z)).collect() lambda (x, (y, z)): (x, y / z)).collect()
tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints: for (x, y) in newPoints:
kPoints[x] = y kPoints[x] = y
......
...@@ -7,7 +7,7 @@ from os.path import realpath ...@@ -7,7 +7,7 @@ from os.path import realpath
import sys import sys
import numpy as np import numpy as np
from pyspark.context import SparkContext from pyspark import SparkContext
N = 100000 # Number of data points N = 100000 # Number of data points
...@@ -32,7 +32,7 @@ def generateData(): ...@@ -32,7 +32,7 @@ def generateData():
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) == 1: if len(sys.argv) == 1:
print >> sys.stderr, \ print >> sys.stderr, \
"Usage: PythonLR <host> [<slices>]" "Usage: PythonLR <master> [<slices>]"
exit(-1) exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
......
import sys import sys
from random import random from random import random
from operator import add from operator import add
from pyspark.context import SparkContext
from pyspark import SparkContext
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) == 1: if len(sys.argv) == 1:
print >> sys.stderr, \ print >> sys.stderr, \
"Usage: PythonPi <host> [<slices>]" "Usage: PythonPi <master> [<slices>]"
exit(-1) exit(-1)
sc = SparkContext(sys.argv[1], "PythonPi") sc = SparkContext(sys.argv[1], "PythonPi")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
......
import sys import sys
from random import Random from random import Random
from pyspark.context import SparkContext
from pyspark import SparkContext
numEdges = 200 numEdges = 200
numVertices = 100 numVertices = 100
...@@ -20,7 +21,7 @@ def generateGraph(): ...@@ -20,7 +21,7 @@ def generateGraph():
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) == 1: if len(sys.argv) == 1:
print >> sys.stderr, \ print >> sys.stderr, \
"Usage: PythonTC <host> [<slices>]" "Usage: PythonTC <master> [<slices>]"
exit(-1) exit(-1)
sc = SparkContext(sys.argv[1], "PythonTC") sc = SparkContext(sys.argv[1], "PythonTC")
slices = sys.argv[2] if len(sys.argv) > 2 else 2 slices = sys.argv[2] if len(sys.argv) > 2 else 2
......
import sys import sys
from operator import add from operator import add
from pyspark.context import SparkContext
from pyspark import SparkContext
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 3: if len(sys.argv) < 3:
......
"""
PySpark is a Python API for Spark.
Public classes:
- L{SparkContext<pyspark.context.SparkContext>}
Main entry point for Spark functionality.
- L{RDD<pyspark.rdd.RDD>}
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
"""
import sys import sys
import os import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "pyspark/lib/py4j0.7.egg")) sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "pyspark/lib/py4j0.7.egg"))
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.rdd import RDD
__all__ = ["SparkContext"] __all__ = ["SparkContext", "RDD"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment