Skip to content
Snippets Groups Projects
Commit 282a15f7 authored by Yu ISHIKAWA's avatar Yu ISHIKAWA Committed by Xiangrui Meng
Browse files

[SPARK-10277] [MLLIB] [PYSPARK] Add @since annotation to pyspark.mllib.regression

Author: Yu ISHIKAWA <yuu.ishikawa@gmail.com>

Closes #8684 from yu-iskw/SPARK-10277.
parent 03ccb220
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
import numpy as np import numpy as np
from numpy import array from numpy import array
from pyspark import RDD from pyspark import RDD, since
from pyspark.streaming.dstream import DStream from pyspark.streaming.dstream import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
...@@ -43,6 +43,8 @@ class LabeledPoint(object): ...@@ -43,6 +43,8 @@ class LabeledPoint(object):
column matrix) column matrix)
Note: 'label' and 'features' are accessible as class attributes. Note: 'label' and 'features' are accessible as class attributes.
.. versionadded:: 1.0.0
""" """
def __init__(self, label, features): def __init__(self, label, features):
...@@ -66,6 +68,8 @@ class LinearModel(object): ...@@ -66,6 +68,8 @@ class LinearModel(object):
:param weights: Weights computed for every feature. :param weights: Weights computed for every feature.
:param intercept: Intercept computed for this model. :param intercept: Intercept computed for this model.
.. versionadded:: 0.9.0
""" """
def __init__(self, weights, intercept): def __init__(self, weights, intercept):
...@@ -73,11 +77,15 @@ class LinearModel(object): ...@@ -73,11 +77,15 @@ class LinearModel(object):
self._intercept = float(intercept) self._intercept = float(intercept)
@property @property
@since("1.0.0")
def weights(self): def weights(self):
"""Weights computed for every feature."""
return self._coeff return self._coeff
@property @property
@since("1.0.0")
def intercept(self): def intercept(self):
"""Intercept computed for this model."""
return self._intercept return self._intercept
def __repr__(self): def __repr__(self):
...@@ -94,8 +102,11 @@ class LinearRegressionModelBase(LinearModel): ...@@ -94,8 +102,11 @@ class LinearRegressionModelBase(LinearModel):
True True
>>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
True True
.. versionadded:: 0.9.0
""" """
@since("0.9.0")
def predict(self, x): def predict(self, x):
""" """
Predict the value of the dependent variable given a vector or Predict the value of the dependent variable given a vector or
...@@ -163,14 +174,20 @@ class LinearRegressionModel(LinearRegressionModelBase): ...@@ -163,14 +174,20 @@ class LinearRegressionModel(LinearRegressionModelBase):
True True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True True
.. versionadded:: 0.9.0
""" """
@since("1.4.0")
def save(self, sc, path): def save(self, sc, path):
"""Save a LinearRegressionModel."""
java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel( java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel(
_py2java(sc, self._coeff), self.intercept) _py2java(sc, self._coeff), self.intercept)
java_model.save(sc._jsc.sc(), path) java_model.save(sc._jsc.sc(), path)
@classmethod @classmethod
@since("1.4.0")
def load(cls, sc, path): def load(cls, sc, path):
"""Load a LinearRegressionModel."""
java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel.load( java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel.load(
sc._jsc.sc(), path) sc._jsc.sc(), path)
weights = _java2py(sc, java_model.weights()) weights = _java2py(sc, java_model.weights())
...@@ -199,8 +216,20 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights): ...@@ -199,8 +216,20 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
class LinearRegressionWithSGD(object): class LinearRegressionWithSGD(object):
"""
Train a linear regression model with no regularization using Stochastic Gradient Descent.
This solves the least squares regression formulation
f(weights) = 1/n ||A weights-y||^2^
(which is the mean squared error).
Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
its corresponding right hand side label y.
See also the documentation for the precise formulation.
.. versionadded:: 0.9.0
"""
@classmethod @classmethod
@since("0.9.0")
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.0, regType=None, intercept=False, initialWeights=None, regParam=0.0, regType=None, intercept=False,
validateData=True, convergenceTol=0.001): validateData=True, convergenceTol=0.001):
...@@ -313,14 +342,20 @@ class LassoModel(LinearRegressionModelBase): ...@@ -313,14 +342,20 @@ class LassoModel(LinearRegressionModelBase):
True True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True True
.. versionadded:: 0.9.0
""" """
@since("1.4.0")
def save(self, sc, path): def save(self, sc, path):
"""Save a LassoModel."""
java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel( java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel(
_py2java(sc, self._coeff), self.intercept) _py2java(sc, self._coeff), self.intercept)
java_model.save(sc._jsc.sc(), path) java_model.save(sc._jsc.sc(), path)
@classmethod @classmethod
@since("1.4.0")
def load(cls, sc, path): def load(cls, sc, path):
"""Load a LassoModel."""
java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel.load( java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel.load(
sc._jsc.sc(), path) sc._jsc.sc(), path)
weights = _java2py(sc, java_model.weights()) weights = _java2py(sc, java_model.weights())
...@@ -330,8 +365,19 @@ class LassoModel(LinearRegressionModelBase): ...@@ -330,8 +365,19 @@ class LassoModel(LinearRegressionModelBase):
class LassoWithSGD(object): class LassoWithSGD(object):
"""
Train a regression model with L1-regularization using Stochastic Gradient Descent.
This solves the l1-regularized least squares regression formulation
f(weights) = 1/2n ||A weights-y||^2^ + regParam ||weights||_1
Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
its corresponding right hand side label y.
See also the documentation for the precise formulation.
.. versionadded:: 0.9.0
"""
@classmethod @classmethod
@since("0.9.0")
def train(cls, data, iterations=100, step=1.0, regParam=0.01, def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False, miniBatchFraction=1.0, initialWeights=None, intercept=False,
validateData=True, convergenceTol=0.001): validateData=True, convergenceTol=0.001):
...@@ -434,14 +480,20 @@ class RidgeRegressionModel(LinearRegressionModelBase): ...@@ -434,14 +480,20 @@ class RidgeRegressionModel(LinearRegressionModelBase):
True True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True True
.. versionadded:: 0.9.0
""" """
@since("1.4.0")
def save(self, sc, path): def save(self, sc, path):
"""Save a RidgeRegressionMode."""
java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel( java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel(
_py2java(sc, self._coeff), self.intercept) _py2java(sc, self._coeff), self.intercept)
java_model.save(sc._jsc.sc(), path) java_model.save(sc._jsc.sc(), path)
@classmethod @classmethod
@since("1.4.0")
def load(cls, sc, path): def load(cls, sc, path):
"""Load a RidgeRegressionMode."""
java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel.load( java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel.load(
sc._jsc.sc(), path) sc._jsc.sc(), path)
weights = _java2py(sc, java_model.weights()) weights = _java2py(sc, java_model.weights())
...@@ -451,8 +503,19 @@ class RidgeRegressionModel(LinearRegressionModelBase): ...@@ -451,8 +503,19 @@ class RidgeRegressionModel(LinearRegressionModelBase):
class RidgeRegressionWithSGD(object): class RidgeRegressionWithSGD(object):
"""
Train a regression model with L2-regularization using Stochastic Gradient Descent.
This solves the l2-regularized least squares regression formulation
f(weights) = 1/2n ||A weights-y||^2^ + regParam/2 ||weights||^2^
Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
its corresponding right hand side label y.
See also the documentation for the precise formulation.
.. versionadded:: 0.9.0
"""
@classmethod @classmethod
@since("0.9.0")
def train(cls, data, iterations=100, step=1.0, regParam=0.01, def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False, miniBatchFraction=1.0, initialWeights=None, intercept=False,
validateData=True, convergenceTol=0.001): validateData=True, convergenceTol=0.001):
...@@ -531,6 +594,8 @@ class IsotonicRegressionModel(Saveable, Loader): ...@@ -531,6 +594,8 @@ class IsotonicRegressionModel(Saveable, Loader):
... rmtree(path) ... rmtree(path)
... except OSError: ... except OSError:
... pass ... pass
.. versionadded:: 1.4.0
""" """
def __init__(self, boundaries, predictions, isotonic): def __init__(self, boundaries, predictions, isotonic):
...@@ -538,6 +603,7 @@ class IsotonicRegressionModel(Saveable, Loader): ...@@ -538,6 +603,7 @@ class IsotonicRegressionModel(Saveable, Loader):
self.predictions = predictions self.predictions = predictions
self.isotonic = isotonic self.isotonic = isotonic
@since("1.4.0")
def predict(self, x): def predict(self, x):
""" """
Predict labels for provided features. Predict labels for provided features.
...@@ -562,7 +628,9 @@ class IsotonicRegressionModel(Saveable, Loader): ...@@ -562,7 +628,9 @@ class IsotonicRegressionModel(Saveable, Loader):
return x.map(lambda v: self.predict(v)) return x.map(lambda v: self.predict(v))
return np.interp(x, self.boundaries, self.predictions) return np.interp(x, self.boundaries, self.predictions)
@since("1.4.0")
def save(self, sc, path): def save(self, sc, path):
"""Save a IsotonicRegressionModel."""
java_boundaries = _py2java(sc, self.boundaries.tolist()) java_boundaries = _py2java(sc, self.boundaries.tolist())
java_predictions = _py2java(sc, self.predictions.tolist()) java_predictions = _py2java(sc, self.predictions.tolist())
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel( java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
...@@ -570,7 +638,9 @@ class IsotonicRegressionModel(Saveable, Loader): ...@@ -570,7 +638,9 @@ class IsotonicRegressionModel(Saveable, Loader):
java_model.save(sc._jsc.sc(), path) java_model.save(sc._jsc.sc(), path)
@classmethod @classmethod
@since("1.4.0")
def load(cls, sc, path): def load(cls, sc, path):
"""Load a IsotonicRegressionModel."""
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load( java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
sc._jsc.sc(), path) sc._jsc.sc(), path)
py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray() py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
...@@ -579,8 +649,29 @@ class IsotonicRegressionModel(Saveable, Loader): ...@@ -579,8 +649,29 @@ class IsotonicRegressionModel(Saveable, Loader):
class IsotonicRegression(object): class IsotonicRegression(object):
"""
Isotonic regression.
Currently implemented using parallelized pool adjacent violators algorithm.
Only univariate (single feature) algorithm supported.
Sequential PAV implementation based on:
Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
"Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
Available from [[http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf]]
Sequential PAV parallelization based on:
Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
"An approach to parallelizing isotonic regression."
Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
Available from [[http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf]]
@see [[http://en.wikipedia.org/wiki/Isotonic_regression Isotonic regression (Wikipedia)]]
.. versionadded:: 1.4.0
"""
@classmethod @classmethod
@since("1.4.0")
def train(cls, data, isotonic=True): def train(cls, data, isotonic=True):
""" """
Train a isotonic regression model on the given data. Train a isotonic regression model on the given data.
...@@ -598,10 +689,13 @@ class StreamingLinearAlgorithm(object): ...@@ -598,10 +689,13 @@ class StreamingLinearAlgorithm(object):
Base class that has to be inherited by any StreamingLinearAlgorithm. Base class that has to be inherited by any StreamingLinearAlgorithm.
Prevents reimplementation of methods predictOn and predictOnValues. Prevents reimplementation of methods predictOn and predictOnValues.
.. versionadded:: 1.5.0
""" """
def __init__(self, model): def __init__(self, model):
self._model = model self._model = model
@since("1.5.0")
def latestModel(self): def latestModel(self):
""" """
Returns the latest model. Returns the latest model.
...@@ -616,6 +710,7 @@ class StreamingLinearAlgorithm(object): ...@@ -616,6 +710,7 @@ class StreamingLinearAlgorithm(object):
raise ValueError( raise ValueError(
"Model must be intialized using setInitialWeights") "Model must be intialized using setInitialWeights")
@since("1.5.0")
def predictOn(self, dstream): def predictOn(self, dstream):
""" """
Make predictions on a dstream. Make predictions on a dstream.
...@@ -625,6 +720,7 @@ class StreamingLinearAlgorithm(object): ...@@ -625,6 +720,7 @@ class StreamingLinearAlgorithm(object):
self._validate(dstream) self._validate(dstream)
return dstream.map(lambda x: self._model.predict(x)) return dstream.map(lambda x: self._model.predict(x))
@since("1.5.0")
def predictOnValues(self, dstream): def predictOnValues(self, dstream):
""" """
Make predictions on a keyed dstream. Make predictions on a keyed dstream.
...@@ -649,6 +745,8 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): ...@@ -649,6 +745,8 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
:param miniBatchFraction: Fraction of data on which SGD is run for each :param miniBatchFraction: Fraction of data on which SGD is run for each
iteration. iteration.
:param convergenceTol: A condition which decides iteration termination. :param convergenceTol: A condition which decides iteration termination.
.. versionadded:: 1.5.0
""" """
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001): def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001):
self.stepSize = stepSize self.stepSize = stepSize
...@@ -659,6 +757,7 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): ...@@ -659,6 +757,7 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
super(StreamingLinearRegressionWithSGD, self).__init__( super(StreamingLinearRegressionWithSGD, self).__init__(
model=self._model) model=self._model)
@since("1.5.0")
def setInitialWeights(self, initialWeights): def setInitialWeights(self, initialWeights):
""" """
Set the initial value of weights. Set the initial value of weights.
...@@ -669,6 +768,7 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): ...@@ -669,6 +768,7 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
self._model = LinearRegressionModel(initialWeights, 0) self._model = LinearRegressionModel(initialWeights, 0)
return self return self
@since("1.5.0")
def trainOn(self, dstream): def trainOn(self, dstream):
"""Train the model on the incoming dstream.""" """Train the model on the incoming dstream."""
self._validate(dstream) self._validate(dstream)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment