Skip to content
Snippets Groups Projects
Commit 33683974 authored by Reynold Xin's avatar Reynold Xin
Browse files

Fix PEP8 violations in examples/src/main/python.


Author: Reynold Xin <rxin@apache.org>

Closes #870 from rxin/examples-python-pep8 and squashes the following commits:

2829e84 [Reynold Xin] Fix PEP8 violations in examples/src/main/python.

(cherry picked from commit d79c2b28)
Signed-off-by: default avatarReynold Xin <rxin@apache.org>
parent 832dc594
No related branches found
No related tags found
No related merge requests found
......@@ -29,22 +29,25 @@ from pyspark import SparkContext
LAMBDA = 0.01 # regularization
np.random.seed(42)
def rmse(R, ms, us):
diff = R - ms * us.T
return np.sqrt(np.sum(np.power(diff, 2)) / M * U)
def update(i, vec, mat, ratings):
uu = mat.shape[0]
ff = mat.shape[1]
XtX = mat.T * mat
Xty = mat.T * ratings[i, :].T
for j in range(ff):
XtX[j,j] += LAMBDA * uu
XtX[j, j] += LAMBDA * uu
return np.linalg.solve(XtX, Xty)
if __name__ == "__main__":
"""
Usage: als [M] [U] [F] [iterations] [slices]"
......@@ -57,10 +60,10 @@ if __name__ == "__main__":
slices = int(sys.argv[5]) if len(sys.argv) > 5 else 2
print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \
(M, U, F, ITERATIONS, slices)
(M, U, F, ITERATIONS, slices)
R = matrix(rand(M, F)) * matrix(rand(U, F).T)
ms = matrix(rand(M ,F))
ms = matrix(rand(M, F))
us = matrix(rand(U, F))
Rb = sc.broadcast(R)
......@@ -71,8 +74,9 @@ if __name__ == "__main__":
ms = sc.parallelize(range(M), slices) \
.map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \
.collect()
ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
# collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
ms = matrix(np.array(ms)[:, :, 0])
msb = sc.broadcast(ms)
us = sc.parallelize(range(U), slices) \
......
......@@ -59,7 +59,7 @@ if __name__ == "__main__":
while tempDist > convergeDist:
closest = data.map(
lambda p : (closestPoint(p, kPoints), (p, 1)))
lambda p: (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.map(
......
......@@ -60,8 +60,8 @@ if __name__ == "__main__":
# Compute logistic regression gradient for a matrix of data points
def gradient(matrix, w):
Y = matrix[:,0] # point labels (first column of input file)
X = matrix[:,1:] # point coordinates
Y = matrix[:, 0] # point labels (first column of input file)
X = matrix[:, 1:] # point coordinates
# For each point (x, y), compute gradient function, then sum these up
return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
......
......@@ -15,9 +15,8 @@
# limitations under the License.
#
#!/usr/bin/env python
import re, sys
import re
import sys
from operator import add
from pyspark import SparkContext
......@@ -26,7 +25,8 @@ from pyspark import SparkContext
def computeContribs(urls, rank):
"""Calculates URL contributions to the rank of other URLs."""
num_urls = len(urls)
for url in urls: yield (url, rank / num_urls)
for url in urls:
yield (url, rank / num_urls)
def parseNeighbors(urls):
......@@ -59,8 +59,8 @@ if __name__ == "__main__":
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
computeContribs(urls, rank))
contribs = links.join(ranks).flatMap(
lambda (url, (urls, rank)): computeContribs(urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
......
......@@ -29,9 +29,11 @@ if __name__ == "__main__":
sc = SparkContext(appName="PythonPi")
slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * slices
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
......@@ -27,8 +27,8 @@ if __name__ == "__main__":
sc = SparkContext(appName="PythonSort")
lines = sc.textFile(sys.argv[1], 1)
sortedCount = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (int(x), 1)) \
.sortByKey(lambda x: x)
.map(lambda x: (int(x), 1)) \
.sortByKey(lambda x: x)
# This is just a demo on how to bring all the sorted data back to a single node.
# In reality, we wouldn't want to collect all the data to the driver node.
output = sortedCount.collect()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment