Skip to content
Snippets Groups Projects
Commit 607b53ab authored by Josh Rosen's avatar Josh Rosen
Browse files

Use numpy in Python k-means example.

parent fd94e544
No related branches found
No related tags found
No related merge requests found
...@@ -101,7 +101,13 @@ trait PythonRDDBase { ...@@ -101,7 +101,13 @@ trait PythonRDDBase {
stream.readFully(obj) stream.readFully(obj)
obj obj
} catch { } catch {
case eof: EOFException => { new Array[Byte](0) } case eof: EOFException => {
val exitStatus = proc.waitFor()
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
new Array[Byte](0)
}
case e => throw e case e => throw e
} }
} }
......
import sys import sys
from pyspark.context import SparkContext from pyspark.context import SparkContext
from numpy import array, sum as np_sum
def parseVector(line): def parseVector(line):
return [float(x) for x in line.split(' ')] return array([float(x) for x in line.split(' ')])
def addVec(x, y):
return [a + b for (a, b) in zip(x, y)]
def squaredDist(x, y):
return sum((a - b) ** 2 for (a, b) in zip(x, y))
def closestPoint(p, centers): def closestPoint(p, centers):
bestIndex = 0 bestIndex = 0
closest = float("+inf") closest = float("+inf")
for i in range(len(centers)): for i in range(len(centers)):
tempDist = squaredDist(p, centers[i]) tempDist = np_sum((p - centers[i]) ** 2)
if tempDist < closest: if tempDist < closest:
closest = tempDist closest = tempDist
bestIndex = i bestIndex = i
...@@ -41,14 +34,14 @@ if __name__ == "__main__": ...@@ -41,14 +34,14 @@ if __name__ == "__main__":
tempDist = 1.0 tempDist = 1.0
while tempDist > convergeDist: while tempDist > convergeDist:
closest = data.mapPairs( closest = data.map(
lambda p : (closestPoint(p, kPoints), (p, 1))) lambda p : (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey( pointStats = closest.reduceByKey(
lambda (x1, y1), (x2, y2): (addVec(x1, x2), y1 + y2)) lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.mapPairs( newPoints = pointStats.map(
lambda (x, (y, z)): (x, [a / z for a in y])).collect() lambda (x, (y, z)): (x, y / z)).collect()
tempDist = sum(squaredDist(kPoints[x], y) for (x, y) in newPoints) tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints: for (x, y) in newPoints:
kPoints[x] = y kPoints[x] = y
......
...@@ -71,7 +71,7 @@ class RDD(object): ...@@ -71,7 +71,7 @@ class RDD(object):
def takeSample(self, withReplacement, num, seed): def takeSample(self, withReplacement, num, seed):
vals = self._jrdd.takeSample(withReplacement, num, seed) vals = self._jrdd.takeSample(withReplacement, num, seed)
return [PickleSerializer.loads(x) for x in vals] return [PickleSerializer.loads(bytes(x)) for x in vals]
def union(self, other): def union(self, other):
""" """
...@@ -218,17 +218,16 @@ class RDD(object): ...@@ -218,17 +218,16 @@ class RDD(object):
# TODO: pipelining # TODO: pipelining
# TODO: optimizations # TODO: optimizations
def shuffle(self, numSplits): def shuffle(self, numSplits, hashFunc=hash):
if numSplits is None: if numSplits is None:
numSplits = self.ctx.defaultParallelism numSplits = self.ctx.defaultParallelism
pipe_command = RDD._get_pipe_command('shuffle_map_step', []) pipe_command = RDD._get_pipe_command('shuffle_map_step', [hashFunc])
class_manifest = self._jrdd.classManifest() class_manifest = self._jrdd.classManifest()
python_rdd = self.ctx.jvm.PythonPairRDD(self._jrdd.rdd(), python_rdd = self.ctx.jvm.PythonPairRDD(self._jrdd.rdd(),
pipe_command, False, self.ctx.pythonExec, class_manifest) pipe_command, False, self.ctx.pythonExec, class_manifest)
partitioner = self.ctx.jvm.spark.HashPartitioner(numSplits) partitioner = self.ctx.jvm.spark.HashPartitioner(numSplits)
jrdd = python_rdd.asJavaPairRDD().partitionBy(partitioner) jrdd = python_rdd.asJavaPairRDD().partitionBy(partitioner)
jrdd = jrdd.map(self.ctx.jvm.ExtractValue()) jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
# TODO: extract second value.
return RDD(jrdd, self.ctx) return RDD(jrdd, self.ctx)
...@@ -277,8 +276,6 @@ class RDD(object): ...@@ -277,8 +276,6 @@ class RDD(object):
map_values_fn = lambda (k, v): (k, f(v)) map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True) return self.map(map_values_fn, preservesPartitioning=True)
# TODO: implement shuffle.
# TODO: support varargs cogroup of several RDDs. # TODO: support varargs cogroup of several RDDs.
def groupWith(self, other): def groupWith(self, other):
return self.cogroup(other) return self.cogroup(other)
......
...@@ -48,9 +48,6 @@ def do_map(flat=False): ...@@ -48,9 +48,6 @@ def do_map(flat=False):
f = load_function() f = load_function()
for obj in read_input(): for obj in read_input():
try: try:
#from pickletools import dis
#print repr(obj)
#print dis(obj)
out = f(PickleSerializer.loads(obj)) out = f(PickleSerializer.loads(obj))
if out is not None: if out is not None:
if flat: if flat:
...@@ -64,9 +61,10 @@ def do_map(flat=False): ...@@ -64,9 +61,10 @@ def do_map(flat=False):
def do_shuffle_map_step(): def do_shuffle_map_step():
hashFunc = load_function()
for obj in read_input(): for obj in read_input():
key = PickleSerializer.loads(obj)[1] key = PickleSerializer.loads(obj)[0]
output(str(hash(key))) output(str(hashFunc(key)))
output(obj) output(obj)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment