Skip to content
Snippets Groups Projects
Commit 511d4929 authored by JeremyNixon's avatar JeremyNixon Committed by Xiangrui Meng
Browse files

[SPARK-12877][ML] Add train-validation-split to pyspark

## What changes were proposed in this pull request?
The changes proposed were to add train-validation-split to pyspark.ml.tuning.

## How was the this patch tested?
This patch was tested through unit tests located in pyspark/ml/test.py.

This is my original work and I license it to Spark.

Author: JeremyNixon <jnixon2@gmail.com>

Closes #11335 from JeremyNixon/tvs_pyspark.
parent 9a48c656
No related branches found
No related tags found
No related merge requests found
......@@ -45,7 +45,7 @@ from pyspark.ml.feature import *
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.tuning import *
from pyspark.ml.util import keyword_only
from pyspark.mllib.linalg import DenseVector
from pyspark.sql import DataFrame, SQLContext, Row
......@@ -423,6 +423,57 @@ class CrossValidatorTests(PySparkTestCase):
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
class TrainValidationSplitTests(PySparkTestCase):
def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
bestModel = tvsModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvsModel = tvs.fit(dataset)
bestModel = tvsModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
class PersistenceTest(PySparkTestCase):
def test_linear_regression(self):
......
......@@ -25,7 +25,8 @@ from pyspark.ml.param.shared import HasSeed
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import rand
__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel']
__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit',
'TrainValidationSplitModel']
class ParamGridBuilder(object):
......@@ -288,6 +289,196 @@ class CrossValidatorModel(Model):
return CrossValidatorModel(self.bestModel.copy(extra))
class TrainValidationSplit(Estimator, HasSeed):
"""
Train-Validation-Split.
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> from pyspark.mllib.linalg import Vectors
>>> dataset = sqlContext.createDataFrame(
... [(Vectors.dense([0.0]), 0.0),
... (Vectors.dense([0.4]), 1.0),
... (Vectors.dense([0.5]), 0.0),
... (Vectors.dense([0.6]), 1.0),
... (Vectors.dense([1.0]), 1.0)] * 10,
... ["features", "label"])
>>> lr = LogisticRegression()
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
>>> evaluator = BinaryClassificationEvaluator()
>>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
>>> tvsModel = tvs.fit(dataset)
>>> evaluator.evaluate(tvsModel.transform(dataset))
0.8333...
.. versionadded:: 2.0.0
"""
estimator = Param(Params._dummy(), "estimator", "estimator to be tested")
estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps")
evaluator = Param(
Params._dummy(), "evaluator",
"evaluator used to select hyper-parameters that maximize the validated metric")
trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\
validation data. Must be between 0 and 1.")
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
seed=None):
"""
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
seed=None)
"""
super(TrainValidationSplit, self).__init__()
self._setDefault(trainRatio=0.75)
kwargs = self.__init__._input_kwargs
self._set(**kwargs)
@since("2.0.0")
@keyword_only
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
seed=None):
"""
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
seed=None):
Sets params for the train validation split.
"""
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
@since("2.0.0")
def setEstimator(self, value):
"""
Sets the value of :py:attr:`estimator`.
"""
self._paramMap[self.estimator] = value
return self
@since("2.0.0")
def getEstimator(self):
"""
Gets the value of estimator or its default value.
"""
return self.getOrDefault(self.estimator)
@since("2.0.0")
def setEstimatorParamMaps(self, value):
"""
Sets the value of :py:attr:`estimatorParamMaps`.
"""
self._paramMap[self.estimatorParamMaps] = value
return self
@since("2.0.0")
def getEstimatorParamMaps(self):
"""
Gets the value of estimatorParamMaps or its default value.
"""
return self.getOrDefault(self.estimatorParamMaps)
@since("2.0.0")
def setEvaluator(self, value):
"""
Sets the value of :py:attr:`evaluator`.
"""
self._paramMap[self.evaluator] = value
return self
@since("2.0.0")
def getEvaluator(self):
"""
Gets the value of evaluator or its default value.
"""
return self.getOrDefault(self.evaluator)
@since("2.0.0")
def setTrainRatio(self, value):
"""
Sets the value of :py:attr:`trainRatio`.
"""
self._paramMap[self.trainRatio] = value
return self
@since("2.0.0")
def getTrainRatio(self):
"""
Gets the value of trainRatio or its default value.
"""
return self.getOrDefault(self.trainRatio)
def _fit(self, dataset):
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
tRatio = self.getOrDefault(self.trainRatio)
seed = self.getOrDefault(self.seed)
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
metrics = np.zeros(numModels)
condition = (df[randCol] >= tRatio)
validation = df.filter(condition)
train = df.filter(~condition)
for j in range(numModels):
model = est.fit(train, epm[j])
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return TrainValidationSplitModel(bestModel)
@since("2.0.0")
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies creates a deep copy of
the embedded paramMap, and copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
newTVS = Params.copy(self, extra)
if self.isSet(self.estimator):
newTVS.setEstimator(self.getEstimator().copy(extra))
# estimatorParamMaps remain the same
if self.isSet(self.evaluator):
newTVS.setEvaluator(self.getEvaluator().copy(extra))
return newTVS
class TrainValidationSplitModel(Model):
"""
Model from train validation split.
"""
def __init__(self, bestModel):
super(TrainValidationSplitModel, self).__init__()
#: best model from cross validation
self.bestModel = bestModel
def _transform(self, dataset):
return self.bestModel.transform(dataset)
@since("2.0.0")
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
return TrainValidationSplitModel(self.bestModel.copy(extra))
if __name__ == "__main__":
import doctest
from pyspark.context import SparkContext
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment