Skip to content
Snippets Groups Projects
Commit ce6f3f16 authored by Yanbo Liang's avatar Yanbo Liang Committed by Xiangrui Meng
Browse files

[SPARK-10194] [MLLIB] [PYSPARK] SGD algorithms need convergenceTol parameter in Python

[SPARK-3382](https://issues.apache.org/jira/browse/SPARK-3382) added a ```convergenceTol``` parameter for GradientDescent-based methods in Scala. We need that parameter in Python; otherwise, Python users will not be able to adjust that behavior (or even reproduce behavior from previous releases since the default changed).

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #8457 from yanboliang/spark-10194.
parent cf2821ef
No related branches found
No related tags found
No related merge requests found
......@@ -132,7 +132,8 @@ private[python] class PythonMLLibAPI extends Serializable {
regParam: Double,
regType: String,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val lrAlg = new LinearRegressionWithSGD()
lrAlg.setIntercept(intercept)
.setValidateData(validateData)
......@@ -141,6 +142,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
lrAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
lrAlg,
......@@ -159,7 +161,8 @@ private[python] class PythonMLLibAPI extends Serializable {
miniBatchFraction: Double,
initialWeights: Vector,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val lassoAlg = new LassoWithSGD()
lassoAlg.setIntercept(intercept)
.setValidateData(validateData)
......@@ -168,6 +171,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
trainRegressionModel(
lassoAlg,
data,
......@@ -185,7 +189,8 @@ private[python] class PythonMLLibAPI extends Serializable {
miniBatchFraction: Double,
initialWeights: Vector,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val ridgeAlg = new RidgeRegressionWithSGD()
ridgeAlg.setIntercept(intercept)
.setValidateData(validateData)
......@@ -194,6 +199,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
trainRegressionModel(
ridgeAlg,
data,
......@@ -212,7 +218,8 @@ private[python] class PythonMLLibAPI extends Serializable {
initialWeights: Vector,
regType: String,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val SVMAlg = new SVMWithSGD()
SVMAlg.setIntercept(intercept)
.setValidateData(validateData)
......@@ -221,6 +228,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
SVMAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
SVMAlg,
......@@ -240,7 +248,8 @@ private[python] class PythonMLLibAPI extends Serializable {
regParam: Double,
regType: String,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val LogRegAlg = new LogisticRegressionWithSGD()
LogRegAlg.setIntercept(intercept)
.setValidateData(validateData)
......@@ -249,6 +258,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
LogRegAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
LogRegAlg,
......
......@@ -241,7 +241,7 @@ class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.01, regType="l2", intercept=False,
validateData=True):
validateData=True, convergenceTol=0.001):
"""
Train a logistic regression model on the given data.
......@@ -274,11 +274,13 @@ class LogisticRegressionWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
:param convergenceTol: A condition which decides iteration termination.
(default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainLogisticRegressionModelWithSGD", rdd, int(iterations),
float(step), float(miniBatchFraction), i, float(regParam), regType,
bool(intercept), bool(validateData))
bool(intercept), bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
......@@ -439,7 +441,7 @@ class SVMWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, regType="l2",
intercept=False, validateData=True):
intercept=False, validateData=True, convergenceTol=0.001):
"""
Train a support vector machine on the given data.
......@@ -472,11 +474,13 @@ class SVMWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
:param convergenceTol: A condition which decides iteration termination.
(default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainSVMModelWithSGD", rdd, int(iterations), float(step),
float(regParam), float(miniBatchFraction), i, regType,
bool(intercept), bool(validateData))
bool(intercept), bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, SVMModel, data, initialWeights)
......@@ -600,12 +604,15 @@ class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
:param miniBatchFraction: Fraction of data on which SGD is run for each
iteration.
:param regParam: L2 Regularization parameter.
:param convergenceTol: A condition which decides iteration termination.
"""
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01,
convergenceTol=0.001):
self.stepSize = stepSize
self.numIterations = numIterations
self.regParam = regParam
self.miniBatchFraction = miniBatchFraction
self.convergenceTol = convergenceTol
self._model = None
super(StreamingLogisticRegressionWithSGD, self).__init__(
model=self._model)
......
......@@ -28,7 +28,8 @@ __all__ = ['LabeledPoint', 'LinearModel',
'LinearRegressionModel', 'LinearRegressionWithSGD',
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
'IsotonicRegression']
'IsotonicRegression', 'StreamingLinearAlgorithm',
'StreamingLinearRegressionWithSGD']
class LabeledPoint(object):
......@@ -202,7 +203,7 @@ class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.0, regType=None, intercept=False,
validateData=True):
validateData=True, convergenceTol=0.001):
"""
Train a linear regression model using Stochastic Gradient
Descent (SGD).
......@@ -244,11 +245,14 @@ class LinearRegressionWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
:param convergenceTol: A condition which decides iteration termination.
(default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainLinearRegressionModelWithSGD", rdd, int(iterations),
float(step), float(miniBatchFraction), i, float(regParam),
regType, bool(intercept), bool(validateData))
regType, bool(intercept), bool(validateData),
float(convergenceTol))
return _regression_train_wrapper(train, LinearRegressionModel, data, initialWeights)
......@@ -330,7 +334,7 @@ class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False,
validateData=True):
validateData=True, convergenceTol=0.001):
"""
Train a regression model with L1-regularization using
Stochastic Gradient Descent.
......@@ -362,11 +366,13 @@ class LassoWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
:param convergenceTol: A condition which decides iteration termination.
(default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainLassoModelWithSGD", rdd, int(iterations), float(step),
float(regParam), float(miniBatchFraction), i, bool(intercept),
bool(validateData))
bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, LassoModel, data, initialWeights)
......@@ -449,7 +455,7 @@ class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False,
validateData=True):
validateData=True, convergenceTol=0.001):
"""
Train a regression model with L2-regularization using
Stochastic Gradient Descent.
......@@ -481,11 +487,13 @@ class RidgeRegressionWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
:param convergenceTol: A condition which decides iteration termination.
(default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainRidgeModelWithSGD", rdd, int(iterations), float(step),
float(regParam), float(miniBatchFraction), i, bool(intercept),
bool(validateData))
bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
......@@ -636,15 +644,17 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
After training on a batch of data, the weights obtained at the end of
training are used as initial weights for the next batch.
:param: stepSize Step size for each iteration of gradient descent.
:param: numIterations Total number of iterations run.
:param: miniBatchFraction Fraction of data on which SGD is run for each
:param stepSize: Step size for each iteration of gradient descent.
:param numIterations: Total number of iterations run.
:param miniBatchFraction: Fraction of data on which SGD is run for each
iteration.
:param convergenceTol: A condition which decides iteration termination.
"""
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001):
self.stepSize = stepSize
self.numIterations = numIterations
self.miniBatchFraction = miniBatchFraction
self.convergenceTol = convergenceTol
self._model = None
super(StreamingLinearRegressionWithSGD, self).__init__(
model=self._model)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment