diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 961b5e80b013c189219cc47b3dffda7888439c8d..6f00d1df209c080629c4b852502531b68bbf3c4a 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -18,7 +18,7 @@ import numpy as np from numpy import array -from pyspark import RDD +from pyspark import RDD, since from pyspark.streaming.dstream import DStream from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector @@ -43,6 +43,8 @@ class LabeledPoint(object): column matrix) Note: 'label' and 'features' are accessible as class attributes. + + .. versionadded:: 1.0.0 """ def __init__(self, label, features): @@ -66,6 +68,8 @@ class LinearModel(object): :param weights: Weights computed for every feature. :param intercept: Intercept computed for this model. + + .. versionadded:: 0.9.0 """ def __init__(self, weights, intercept): @@ -73,11 +77,15 @@ class LinearModel(object): self._intercept = float(intercept) @property + @since("1.0.0") def weights(self): + """Weights computed for every feature.""" return self._coeff @property + @since("1.0.0") def intercept(self): + """Intercept computed for this model.""" return self._intercept def __repr__(self): @@ -94,8 +102,11 @@ class LinearRegressionModelBase(LinearModel): True >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 True + + .. versionadded:: 0.9.0 """ + @since("0.9.0") def predict(self, x): """ Predict the value of the dependent variable given a vector or @@ -163,14 +174,20 @@ class LinearRegressionModel(LinearRegressionModelBase): True >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 True + + .. versionadded:: 0.9.0 """ + @since("1.4.0") def save(self, sc, path): + """Save a LinearRegressionModel.""" java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel( _py2java(sc, self._coeff), self.intercept) java_model.save(sc._jsc.sc(), path) @classmethod + @since("1.4.0") def load(cls, sc, path): + """Load a LinearRegressionModel.""" java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel.load( sc._jsc.sc(), path) weights = _java2py(sc, java_model.weights()) @@ -199,8 +216,20 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights): class LinearRegressionWithSGD(object): + """ + Train a linear regression model with no regularization using Stochastic Gradient Descent. + This solves the least squares regression formulation + f(weights) = 1/n ||A weights-y||^2^ + (which is the mean squared error). + Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + its corresponding right hand side label y. + See also the documentation for the precise formulation. + + .. versionadded:: 0.9.0 + """ @classmethod + @since("0.9.0") def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.0, regType=None, intercept=False, validateData=True, convergenceTol=0.001): @@ -313,14 +342,20 @@ class LassoModel(LinearRegressionModelBase): True >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 True + + .. versionadded:: 0.9.0 """ + @since("1.4.0") def save(self, sc, path): + """Save a LassoModel.""" java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel( _py2java(sc, self._coeff), self.intercept) java_model.save(sc._jsc.sc(), path) @classmethod + @since("1.4.0") def load(cls, sc, path): + """Load a LassoModel.""" java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel.load( sc._jsc.sc(), path) weights = _java2py(sc, java_model.weights()) @@ -330,8 +365,19 @@ class LassoModel(LinearRegressionModelBase): class LassoWithSGD(object): + """ + Train a regression model with L1-regularization using Stochastic Gradient Descent. + This solves the l1-regularized least squares regression formulation + f(weights) = 1/2n ||A weights-y||^2^ + regParam ||weights||_1 + Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + its corresponding right hand side label y. + See also the documentation for the precise formulation. + + .. versionadded:: 0.9.0 + """ @classmethod + @since("0.9.0") def train(cls, data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None, intercept=False, validateData=True, convergenceTol=0.001): @@ -434,14 +480,20 @@ class RidgeRegressionModel(LinearRegressionModelBase): True >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 True + + .. versionadded:: 0.9.0 """ + @since("1.4.0") def save(self, sc, path): + """Save a RidgeRegressionMode.""" java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel( _py2java(sc, self._coeff), self.intercept) java_model.save(sc._jsc.sc(), path) @classmethod + @since("1.4.0") def load(cls, sc, path): + """Load a RidgeRegressionMode.""" java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel.load( sc._jsc.sc(), path) weights = _java2py(sc, java_model.weights()) @@ -451,8 +503,19 @@ class RidgeRegressionModel(LinearRegressionModelBase): class RidgeRegressionWithSGD(object): + """ + Train a regression model with L2-regularization using Stochastic Gradient Descent. + This solves the l2-regularized least squares regression formulation + f(weights) = 1/2n ||A weights-y||^2^ + regParam/2 ||weights||^2^ + Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + its corresponding right hand side label y. + See also the documentation for the precise formulation. + + .. versionadded:: 0.9.0 + """ @classmethod + @since("0.9.0") def train(cls, data, iterations=100, step=1.0, regParam=0.01, miniBatchFraction=1.0, initialWeights=None, intercept=False, validateData=True, convergenceTol=0.001): @@ -531,6 +594,8 @@ class IsotonicRegressionModel(Saveable, Loader): ... rmtree(path) ... except OSError: ... pass + + .. versionadded:: 1.4.0 """ def __init__(self, boundaries, predictions, isotonic): @@ -538,6 +603,7 @@ class IsotonicRegressionModel(Saveable, Loader): self.predictions = predictions self.isotonic = isotonic + @since("1.4.0") def predict(self, x): """ Predict labels for provided features. @@ -562,7 +628,9 @@ class IsotonicRegressionModel(Saveable, Loader): return x.map(lambda v: self.predict(v)) return np.interp(x, self.boundaries, self.predictions) + @since("1.4.0") def save(self, sc, path): + """Save a IsotonicRegressionModel.""" java_boundaries = _py2java(sc, self.boundaries.tolist()) java_predictions = _py2java(sc, self.predictions.tolist()) java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel( @@ -570,7 +638,9 @@ class IsotonicRegressionModel(Saveable, Loader): java_model.save(sc._jsc.sc(), path) @classmethod + @since("1.4.0") def load(cls, sc, path): + """Load a IsotonicRegressionModel.""" java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load( sc._jsc.sc(), path) py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray() @@ -579,8 +649,29 @@ class IsotonicRegressionModel(Saveable, Loader): class IsotonicRegression(object): + """ + Isotonic regression. + Currently implemented using parallelized pool adjacent violators algorithm. + Only univariate (single feature) algorithm supported. + + Sequential PAV implementation based on: + Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani. + "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61. + Available from [[http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf]] + + Sequential PAV parallelization based on: + Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset. + "An approach to parallelizing isotonic regression." + Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147. + Available from [[http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf]] + + @see [[http://en.wikipedia.org/wiki/Isotonic_regression Isotonic regression (Wikipedia)]] + + .. versionadded:: 1.4.0 + """ @classmethod + @since("1.4.0") def train(cls, data, isotonic=True): """ Train a isotonic regression model on the given data. @@ -598,10 +689,13 @@ class StreamingLinearAlgorithm(object): Base class that has to be inherited by any StreamingLinearAlgorithm. Prevents reimplementation of methods predictOn and predictOnValues. + + .. versionadded:: 1.5.0 """ def __init__(self, model): self._model = model + @since("1.5.0") def latestModel(self): """ Returns the latest model. @@ -616,6 +710,7 @@ class StreamingLinearAlgorithm(object): raise ValueError( "Model must be intialized using setInitialWeights") + @since("1.5.0") def predictOn(self, dstream): """ Make predictions on a dstream. @@ -625,6 +720,7 @@ class StreamingLinearAlgorithm(object): self._validate(dstream) return dstream.map(lambda x: self._model.predict(x)) + @since("1.5.0") def predictOnValues(self, dstream): """ Make predictions on a keyed dstream. @@ -649,6 +745,8 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): :param miniBatchFraction: Fraction of data on which SGD is run for each iteration. :param convergenceTol: A condition which decides iteration termination. + + .. versionadded:: 1.5.0 """ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001): self.stepSize = stepSize @@ -659,6 +757,7 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): super(StreamingLinearRegressionWithSGD, self).__init__( model=self._model) + @since("1.5.0") def setInitialWeights(self, initialWeights): """ Set the initial value of weights. @@ -669,6 +768,7 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): self._model = LinearRegressionModel(initialWeights, 0) return self + @since("1.5.0") def trainOn(self, dstream): """Train the model on the incoming dstream.""" self._validate(dstream)