Skip to content
Snippets Groups Projects
Commit 7583681e authored by noelsmith's avatar noelsmith Committed by Joseph K. Bradley
Browse files

[SPARK-10188] [PYSPARK] Pyspark CrossValidator with RMSE selects incorrect model

* Added isLargerBetter() method to Pyspark Evaluator to match the Scala version.
* JavaEvaluator delegates isLargerBetter() to underlying Scala object.
* Added check for isLargerBetter() in CrossValidator to determine whether to use argmin or argmax.
* Added test cases for where smaller is better (RMSE) and larger is better (R-Squared).

(This contribution is my original work and that I license the work to the project under Sparks' open source license)

Author: noelsmith <mail@noelsmith.com>

Closes #8399 from noel-smith/pyspark-rmse-xval-fix.
parent 89b94343
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,14 @@ class Evaluator(Params):
else:
raise ValueError("Params must be a param map but got %s." % type(params))
def isLargerBetter(self):
"""
Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
(True, default) or minimized (False).
A given evaluator may support multiple metrics which may be maximized or minimized.
"""
return True
@inherit_doc
class JavaEvaluator(Evaluator, JavaWrapper):
......@@ -85,6 +93,10 @@ class JavaEvaluator(Evaluator, JavaWrapper):
self._transfer_params_to_java()
return self._java_obj.evaluate(dataset._jdf)
def isLargerBetter(self):
self._transfer_params_to_java()
return self._java_obj.isLargerBetter()
@inherit_doc
class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol):
......
......@@ -32,11 +32,14 @@ else:
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
from pyspark.sql import DataFrame, SQLContext
from pyspark.sql.functions import rand
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.util import keyword_only
from pyspark.ml import Estimator, Model, Pipeline, Transformer
from pyspark.ml.feature import *
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.mllib.linalg import DenseVector
......@@ -264,5 +267,89 @@ class FeatureTests(PySparkTestCase):
self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"])
class HasInducedError(Params):
def __init__(self):
super(HasInducedError, self).__init__()
self.inducedError = Param(self, "inducedError",
"Uniformly-distributed error added to feature")
def getInducedError(self):
return self.getOrDefault(self.inducedError)
class InducedErrorModel(Model, HasInducedError):
def __init__(self):
super(InducedErrorModel, self).__init__()
def _transform(self, dataset):
return dataset.withColumn("prediction",
dataset.feature + (rand(0) * self.getInducedError()))
class InducedErrorEstimator(Estimator, HasInducedError):
def __init__(self, inducedError=1.0):
super(InducedErrorEstimator, self).__init__()
self._set(inducedError=inducedError)
def _fit(self, dataset):
model = InducedErrorModel()
self._copyValues(model)
return model
class CrossValidatorTests(PySparkTestCase):
def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])
iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
if __name__ == "__main__":
unittest.main()
......@@ -223,7 +223,11 @@ class CrossValidator(Estimator):
# TODO: duplicate evaluator to take extra params from input
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
bestIndex = np.argmax(metrics)
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment