diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e6f0953810ed74a1c9e7e73668147952687580cc..802a27a8da14d362e954e7bf4ee94134c3805ac0 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -56,7 +56,8 @@ except: # # Sparse double vector format: # -# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values] +# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \ +# [nonzeros*8 bytes of values] # # Double matrix format: # @@ -110,18 +111,18 @@ def _serialize_double_vector(v): return _serialize_sparse_vector(v) else: raise TypeError("_serialize_double_vector called on a %s; " - "wanted ndarray or SparseVector" % type(v)) + "wanted ndarray or SparseVector" % type(v)) def _serialize_dense_vector(v): """Serialize a dense vector given as a NumPy array.""" if v.ndim != 1: raise TypeError("_serialize_double_vector called on a %ddarray; " - "wanted a 1darray" % v.ndim) + "wanted a 1darray" % v.ndim) if v.dtype != float64: if numpy.issubdtype(v.dtype, numpy.complex): raise TypeError("_serialize_double_vector called on an ndarray of %s; " - "wanted ndarray of float64" % v.dtype) + "wanted ndarray of float64" % v.dtype) v = v.astype(float64) length = v.shape[0] ba = bytearray(5 + 8 * length) @@ -158,10 +159,10 @@ def _deserialize_double_vector(ba): """ if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " - "wanted bytearray" % type(ba)) + "wanted bytearray" % type(ba)) if len(ba) < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) if ba[0] == DENSE_VECTOR_MAGIC: return _deserialize_dense_vector(ba) elif ba[0] == SPARSE_VECTOR_MAGIC: @@ -175,7 +176,7 @@ def _deserialize_dense_vector(ba): """Deserialize a dense vector into a numpy array.""" if len(ba) < 5: raise TypeError("_deserialize_dense_vector called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0] if len(ba) != 8 * length + 5: raise TypeError("_deserialize_dense_vector called on bytearray " @@ -187,7 +188,7 @@ def _deserialize_sparse_vector(ba): """Deserialize a sparse vector into a MLlib SparseVector object.""" if len(ba) < 9: raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) size = header[0] nonzeros = header[1] @@ -205,7 +206,7 @@ def _serialize_double_matrix(m): if m.dtype != float64: if numpy.issubdtype(m.dtype, numpy.complex): raise TypeError("_serialize_double_matrix called on an ndarray of %s; " - "wanted ndarray of float64" % m.dtype) + "wanted ndarray of float64" % m.dtype) m = m.astype(float64) rows = m.shape[0] cols = m.shape[1] @@ -225,10 +226,10 @@ def _deserialize_double_matrix(ba): """Deserialize a double matrix from a mutually understood format.""" if type(ba) != bytearray: raise TypeError("_deserialize_double_matrix called on a %s; " - "wanted bytearray" % type(ba)) + "wanted bytearray" % type(ba)) if len(ba) < 9: raise TypeError("_deserialize_double_matrix called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) if ba[0] != DENSE_MATRIX_MAGIC: raise TypeError("_deserialize_double_matrix called on bytearray " "with wrong magic") @@ -267,7 +268,7 @@ def _copyto(array, buffer, offset, shape, dtype): def _get_unmangled_rdd(data, serializer): dataBytes = data.map(serializer) dataBytes._bypass_serializer = True - dataBytes.cache() # TODO: users should unpersist() this later! + dataBytes.cache() # TODO: users should unpersist() this later! return dataBytes @@ -293,14 +294,14 @@ def _linear_predictor_typecheck(x, coeffs): if type(x) == ndarray: if x.ndim == 1: if x.shape != coeffs.shape: - raise RuntimeError("Got array of %d elements; wanted %d" - % (numpy.shape(x)[0], coeffs.shape[0])) + raise RuntimeError("Got array of %d elements; wanted %d" % ( + numpy.shape(x)[0], coeffs.shape[0])) else: raise RuntimeError("Bulk predict not yet supported.") elif type(x) == SparseVector: if x.size != coeffs.shape[0]: - raise RuntimeError("Got sparse vector of size %d; wanted %d" - % (x.size, coeffs.shape[0])) + raise RuntimeError("Got sparse vector of size %d; wanted %d" % ( + x.size, coeffs.shape[0])) elif (type(x) == RDD): raise RuntimeError("Bulk predict not yet supported.") else: @@ -315,7 +316,7 @@ def _get_initial_weights(initial_weights, data): if type(initial_weights) == ndarray: if initial_weights.ndim != 1: raise TypeError("At least one data element has " - + initial_weights.ndim + " dimensions, which is not 1") + + initial_weights.ndim + " dimensions, which is not 1") initial_weights = numpy.zeros([initial_weights.shape[0]]) elif type(initial_weights) == SparseVector: initial_weights = numpy.zeros([initial_weights.size]) @@ -333,10 +334,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: raise RuntimeError("JVM call result had first element of type " - + type(ans[0]).__name__ + " which is not bytearray") + + type(ans[0]).__name__ + " which is not bytearray") elif type(ans[1]) != float: raise RuntimeError("JVM call result had second element of type " - + type(ans[0]).__name__ + " which is not float") + + type(ans[0]).__name__ + " which is not float") return klass(_deserialize_double_vector(ans[0]), ans[1]) @@ -450,8 +451,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 6772e4337ef3996814637cddf7152e9db2e3da4f..1c0c536c4fb3d5090902faf4ba80522c3ae0fb9d 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -29,6 +29,7 @@ from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint, LinearModel from math import exp, log + class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. @@ -68,14 +69,14 @@ class LogisticRegressionModel(LinearModel): class LogisticRegressionWithSGD(object): @classmethod - def train(cls, data, iterations=100, step=1.0, - miniBatchFraction=1.0, initialWeights=None): + def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a logistic regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, - iterations, step, miniBatchFraction, i), - LogisticRegressionModel, data, initialWeights) + train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD( + d._jrdd, iterations, step, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data, + initialWeights) + class SVMModel(LinearModel): """A support vector machine. @@ -106,16 +107,17 @@ class SVMModel(LinearModel): margin = _dot(x, self._coeff) + self._intercept return 1 if margin >= 0 else 0 + class SVMWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a support vector machine on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, - iterations, step, regParam, miniBatchFraction, i), - SVMModel, data, initialWeights) + train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD( + d._jrdd, iterations, step, regParam, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights) + class NaiveBayesModel(object): """ @@ -156,6 +158,7 @@ class NaiveBayesModel(object): """Return the most likely class for a data vector x""" return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))] + class NaiveBayes(object): @classmethod def train(cls, data, lambda_=1.0): @@ -186,8 +189,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index f65088c9170e0c218e7c599b1674c1027bb6fb5a..b380e8f6c8725f0aaadf74fd2f82e93b6aad444a 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -30,7 +30,8 @@ class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") + >>> model = KMeans.train( + ... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) True >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0])) @@ -76,18 +77,17 @@ class KMeansModel(object): class KMeans(object): @classmethod - def train(cls, data, k, maxIterations=100, runs=1, - initializationMode="k-means||"): + def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" sc = data.context dataBytes = _get_unmangled_double_vector_rdd(data) - ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, - k, maxIterations, runs, initializationMode) + ans = sc._jvm.PythonMLLibAPI().trainKMeansModel( + dataBytes._jrdd, k, maxIterations, runs, initializationMode) if len(ans) != 1: raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: raise RuntimeError("JVM call result had first element of type " - + type(ans[0]) + " which is not bytearray") + + type(ans[0]) + " which is not bytearray") matrix = _deserialize_double_matrix(ans[0]) return KMeansModel([row for row in matrix]) @@ -96,8 +96,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 7511ca7573ddb3a32ee26c3f85be6dcb7a5e5d38..276684272068bc9968e8f0bb56cdb1b557188724 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -54,7 +54,7 @@ class SparseVector(object): if len(args) == 1: pairs = args[0] if type(pairs) == dict: - pairs = pairs.items() + pairs = pairs.items() pairs = sorted(pairs) self.indices = array([p[0] for p in pairs], dtype=int32) self.values = array([p[1] for p in pairs], dtype=float64) @@ -88,7 +88,7 @@ class SparseVector(object): result += self.values[i] * other[self.indices[i]] return result elif other.ndim == 2: - results = [self.dot(other[:,i]) for i in xrange(other.shape[1])] + results = [self.dot(other[:, i]) for i in xrange(other.shape[1])] return array(results) else: raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) @@ -135,7 +135,7 @@ class SparseVector(object): return result else: raise Exception("Cannot call squared_distance with %d-dimensional array" % - other.ndim) + other.ndim) else: result = 0.0 i, j = 0, 0 @@ -184,15 +184,14 @@ class SparseVector(object): """ return (isinstance(other, self.__class__) - and other.size == self.size - and array_equal(other.indices, self.indices) - and array_equal(other.values, self.values)) + and other.size == self.size + and array_equal(other.indices, self.indices) + and array_equal(other.values, self.values)) def __ne__(self, other): return not self.__eq__(other) - class Vectors(object): """ Factory methods for working with vectors. Note that dense vectors diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index f4a83f0209e27ce9f5137490c23a7f610ce9a5d5..6c385042ffa5f7433b449ee611eae5fe06048e4d 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -24,6 +24,7 @@ from pyspark.mllib._common import \ _serialize_tuple, RatingDeserializer from pyspark.rdd import RDD + class MatrixFactorizationModel(object): """A matrix factorisation model trained by regularized alternating least-squares. @@ -55,32 +56,34 @@ class MatrixFactorizationModel(object): return RDD(self._java_model.predict(usersProductsJRDD._jrdd), self._context, RatingDeserializer()) + class ALS(object): @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) - mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd, - rank, iterations, lambda_, blocks) + mod = sc._jvm.PythonMLLibAPI().trainALSModel( + ratingBytes._jrdd, rank, iterations, lambda_, blocks) return MatrixFactorizationModel(sc, mod) @classmethod def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) - mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd, - rank, iterations, lambda_, blocks, alpha) + mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel( + ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha) return MatrixFactorizationModel(sc, mod) + def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) + if __name__ == "__main__": _test() diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 266b31d3fab0e64ead3e69d698cafa3ab39f4969..bc7de6d2e89583857cbb15e8175fd39b1b8a2617 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -113,10 +113,9 @@ class LinearRegressionWithSGD(object): miniBatchFraction=1.0, initialWeights=None): """Train a linear regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( - d._jrdd, iterations, step, miniBatchFraction, i), - LinearRegressionModel, data, initialWeights) + train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( + d._jrdd, iterations, step, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) class LassoModel(LinearRegressionModelBase): @@ -157,10 +156,9 @@ class LassoWithSGD(object): miniBatchFraction=1.0, initialWeights=None): """Train a Lasso regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd, - iterations, step, regParam, miniBatchFraction, i), - LassoModel, data, initialWeights) + train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD( + d._jrdd, iterations, step, regParam, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights) class RidgeRegressionModel(LinearRegressionModelBase): @@ -201,18 +199,16 @@ class RidgeRegressionWithSGD(object): miniBatchFraction=1.0, initialWeights=None): """Train a ridge regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd, - iterations, step, regParam, miniBatchFraction, i), - RidgeRegressionModel, data, initialWeights) + train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD( + d._jrdd, iterations, step, regParam, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights) def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 1ee96bb4af37b8def7f8402f248c8c6132eabe71..37ccf1d59074379504627358c1b979568c8d5134 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -23,7 +23,7 @@ from numpy import array, array_equal import unittest from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \ - _deserialize_double_vector, _dot, _squared_distance + _deserialize_double_vector, _dot, _squared_distance from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint from pyspark.tests import PySparkTestCase @@ -46,12 +46,9 @@ class VectorTests(unittest.TestCase): self.assertTrue(sv is _convert_vector(sv)) self.assertTrue(dv is _convert_vector(dv)) self.assertTrue(array_equal(dv, _convert_vector(lst))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(sv))) - self.assertTrue(array_equal(dv, - _deserialize_double_vector(_serialize_double_vector(dv)))) - self.assertTrue(array_equal(dv, - _deserialize_double_vector(_serialize_double_vector(lst)))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv))) + self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) + self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst)))) def test_dot(self): sv = SparseVector(4, {1: 1, 3: 2}) @@ -132,7 +129,7 @@ class ListTests(PySparkTestCase): def test_regression(self): from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ - RidgeRegressionWithSGD + RidgeRegressionWithSGD data = [ LabeledPoint(-1.0, [0, -1]), LabeledPoint(1.0, [0, 1]), @@ -179,14 +176,10 @@ class SciPyTests(PySparkTestCase): self.assertEquals(sv, _convert_vector(lil.tocoo())) self.assertEquals(sv, _convert_vector(lil.tocsr())) self.assertEquals(sv, _convert_vector(lil.todok())) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil.todok()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok()))) def test_dot(self): from scipy.sparse import lil_matrix @@ -265,7 +258,7 @@ class SciPyTests(PySparkTestCase): def test_regression(self): from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ - RidgeRegressionWithSGD + RidgeRegressionWithSGD data = [ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})), LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})), diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 50d0cdd087625d91df2556d9b71c856c2c800805..0e5f4520b94028ce379e6e2857ff3032f443907c 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -21,6 +21,7 @@ from pyspark.mllib.linalg import Vectors, SparseVector from pyspark.mllib.regression import LabeledPoint from pyspark.mllib._common import _convert_vector + class MLUtils: """ Helper methods to load, save and pre-process data used in MLlib. @@ -44,7 +45,6 @@ class MLUtils: values[i] = float(value) return label, indices, values - @staticmethod def _convert_labeled_point_to_libsvm(p): """Converts a LabeledPoint to a string in LIBSVM format.""" @@ -62,7 +62,6 @@ class MLUtils: " but got " % type(v)) return " ".join(items) - @staticmethod def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None): """ @@ -135,7 +134,6 @@ class MLUtils: numFeatures = parsed.map(lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2]))) - @staticmethod def saveAsLibSVMFile(data, dir): """