Skip to content
Snippets Groups Projects
Commit 3bf20c27 authored by DB Tsai's avatar DB Tsai Committed by Joseph K. Bradley
Browse files

[SPARK-8845] [ML] ML use of Breeze optimization: use adjustedValue instead of value

In LinearRegression and LogisticRegression, we use Breeze's optimizers (LBFGS and OWLQN). We check the State.value to see the current objective. However, Breeze's documentation makes it sound like value and adjustedValue differ for some optimizers, possibly including OWLQN: https://github.com/scalanlp/breeze/blob/26faf622862e8d7a42a401aef601347aac655f2b/math/src/main/scala/breeze/optimize/FirstOrderMinimizer.scala#L36
If that is the case, then we should use adjustedValue instead of value. This is relevant to SPARK-8538 and SPARK-8539, where we will provide the objective trace to the user.

Author: DB Tsai <dbt@netflix.com>

Closes #7245 from dbtsai/SPARK-8845 and squashes the following commits:

fa4c91e [DB Tsai] address feedback
e6caac1 [DB Tsai] java style multiline comment
b10c574 [DB Tsai] address feedback
c9ff81e [DB Tsai] first commit
parent 35d781e7
No related branches found
No related tags found
No related merge requests found
...@@ -116,7 +116,7 @@ class LogisticRegression(override val uid: String) ...@@ -116,7 +116,7 @@ class LogisticRegression(override val uid: String)
case ((summarizer: MultivariateOnlineSummarizer, labelSummarizer: MultiClassSummarizer), case ((summarizer: MultivariateOnlineSummarizer, labelSummarizer: MultiClassSummarizer),
(label: Double, features: Vector)) => (label: Double, features: Vector)) =>
(summarizer.add(features), labelSummarizer.add(label)) (summarizer.add(features), labelSummarizer.add(label))
}, },
combOp = (c1, c2) => (c1, c2) match { combOp = (c1, c2) => (c1, c2) match {
case ((summarizer1: MultivariateOnlineSummarizer, case ((summarizer1: MultivariateOnlineSummarizer,
classSummarizer1: MultiClassSummarizer), (summarizer2: MultivariateOnlineSummarizer, classSummarizer1: MultiClassSummarizer), (summarizer2: MultivariateOnlineSummarizer,
...@@ -166,18 +166,18 @@ class LogisticRegression(override val uid: String) ...@@ -166,18 +166,18 @@ class LogisticRegression(override val uid: String)
Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures) Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)
if ($(fitIntercept)) { if ($(fitIntercept)) {
/** /*
* For binary logistic regression, when we initialize the weights as zeros, For binary logistic regression, when we initialize the weights as zeros,
* it will converge faster if we initialize the intercept such that it will converge faster if we initialize the intercept such that
* it follows the distribution of the labels. it follows the distribution of the labels.
*
* {{{ {{{
* P(0) = 1 / (1 + \exp(b)), and P(0) = 1 / (1 + \exp(b)), and
* P(1) = \exp(b) / (1 + \exp(b)) P(1) = \exp(b) / (1 + \exp(b))
* }}}, hence }}}, hence
* {{{ {{{
* b = \log{P(1) / P(0)} = \log{count_1 / count_0} b = \log{P(1) / P(0)} = \log{count_1 / count_0}
* }}} }}}
*/ */
initialWeightsWithIntercept.toArray(numFeatures) initialWeightsWithIntercept.toArray(numFeatures)
= math.log(histogram(1).toDouble / histogram(0).toDouble) = math.log(histogram(1).toDouble / histogram(0).toDouble)
...@@ -186,39 +186,48 @@ class LogisticRegression(override val uid: String) ...@@ -186,39 +186,48 @@ class LogisticRegression(override val uid: String)
val states = optimizer.iterations(new CachedDiffFunction(costFun), val states = optimizer.iterations(new CachedDiffFunction(costFun),
initialWeightsWithIntercept.toBreeze.toDenseVector) initialWeightsWithIntercept.toBreeze.toDenseVector)
var state = states.next() val (weights, intercept, objectiveHistory) = {
val lossHistory = mutable.ArrayBuilder.make[Double] /*
Note that in Logistic Regression, the objective history (loss + regularization)
is log-likelihood which is invariance under feature standardization. As a result,
the objective history from optimizer is the same as the one in the original space.
*/
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
while (states.hasNext) { if (state == null) {
lossHistory += state.value val msg = s"${optimizer.getClass.getName} failed."
state = states.next() logError(msg)
} throw new SparkException(msg)
lossHistory += state.value }
// The weights are trained in the scaled space; we're converting them back to /*
// the original space. The weights are trained in the scaled space; we're converting them back to
val weightsWithIntercept = { the original space.
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawWeights = state.x.toArray.clone() val rawWeights = state.x.toArray.clone()
var i = 0 var i = 0
// Note that the intercept in scaled space and original space is the same;
// as a result, no scaling is needed.
while (i < numFeatures) { while (i < numFeatures) {
rawWeights(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 } rawWeights(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
i += 1 i += 1
} }
Vectors.dense(rawWeights)
if ($(fitIntercept)) {
(Vectors.dense(rawWeights.dropRight(1)).compressed, rawWeights.last, arrayBuilder.result())
} else {
(Vectors.dense(rawWeights).compressed, 0.0, arrayBuilder.result())
}
} }
if (handlePersistence) instances.unpersist() if (handlePersistence) instances.unpersist()
val (weights, intercept) = if ($(fitIntercept)) { copyValues(new LogisticRegressionModel(uid, weights, intercept))
(Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
weightsWithIntercept(weightsWithIntercept.size - 1))
} else {
(weightsWithIntercept, 0.0)
}
new LogisticRegressionModel(uid, weights.compressed, intercept)
} }
override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra) override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra)
...@@ -423,16 +432,12 @@ private class LogisticAggregator( ...@@ -423,16 +432,12 @@ private class LogisticAggregator(
require(dim == data.size, s"Dimensions mismatch when adding new sample." + require(dim == data.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $dim but got ${data.size}.") s" Expecting $dim but got ${data.size}.")
val dataSize = data.size
val localWeightsArray = weightsArray val localWeightsArray = weightsArray
val localGradientSumArray = gradientSumArray val localGradientSumArray = gradientSumArray
numClasses match { numClasses match {
case 2 => case 2 =>
/** // For Binary Logistic Regression.
* For Binary Logistic Regression.
*/
val margin = - { val margin = - {
var sum = 0.0 var sum = 0.0
data.foreachActive { (index, value) => data.foreachActive { (index, value) =>
......
...@@ -22,7 +22,7 @@ import scala.collection.mutable ...@@ -22,7 +22,7 @@ import scala.collection.mutable
import breeze.linalg.{DenseVector => BDV, norm => brzNorm} import breeze.linalg.{DenseVector => BDV, norm => brzNorm}
import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}
import org.apache.spark.Logging import org.apache.spark.{SparkException, Logging}
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.ParamMap
...@@ -132,7 +132,6 @@ class LinearRegression(override val uid: String) ...@@ -132,7 +132,6 @@ class LinearRegression(override val uid: String)
val numFeatures = summarizer.mean.size val numFeatures = summarizer.mean.size
val yMean = statCounter.mean val yMean = statCounter.mean
val yStd = math.sqrt(statCounter.variance) val yStd = math.sqrt(statCounter.variance)
// look at glmnet5.m L761 maaaybe that has info
// If the yStd is zero, then the intercept is yMean with zero weights; // If the yStd is zero, then the intercept is yMean with zero weights;
// as a result, training is not needed. // as a result, training is not needed.
...@@ -162,21 +161,34 @@ class LinearRegression(override val uid: String) ...@@ -162,21 +161,34 @@ class LinearRegression(override val uid: String)
} }
val initialWeights = Vectors.zeros(numFeatures) val initialWeights = Vectors.zeros(numFeatures)
val states = val states = optimizer.iterations(new CachedDiffFunction(costFun),
optimizer.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector) initialWeights.toBreeze.toDenseVector)
var state = states.next() val (weights, objectiveHistory) = {
val lossHistory = mutable.ArrayBuilder.make[Double] /*
Note that in Linear Regression, the objective history (loss + regularization) returned
from optimizer is computed in the scaled space given by the following formula.
{{{
L = 1/2n||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2 + regTerms
}}}
*/
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
while (states.hasNext) { if (state == null) {
lossHistory += state.value val msg = s"${optimizer.getClass.getName} failed."
state = states.next() logError(msg)
} throw new SparkException(msg)
lossHistory += state.value }
// The weights are trained in the scaled space; we're converting them back to /*
// the original space. The weights are trained in the scaled space; we're converting them back to
val weights = { the original space.
*/
val rawWeights = state.x.toArray.clone() val rawWeights = state.x.toArray.clone()
var i = 0 var i = 0
val len = rawWeights.length val len = rawWeights.length
...@@ -184,17 +196,20 @@ class LinearRegression(override val uid: String) ...@@ -184,17 +196,20 @@ class LinearRegression(override val uid: String)
rawWeights(i) *= { if (featuresStd(i) != 0.0) yStd / featuresStd(i) else 0.0 } rawWeights(i) *= { if (featuresStd(i) != 0.0) yStd / featuresStd(i) else 0.0 }
i += 1 i += 1
} }
Vectors.dense(rawWeights)
(Vectors.dense(rawWeights).compressed, arrayBuilder.result())
} }
// The intercept in R's GLMNET is computed using closed form after the coefficients are /*
// converged. See the following discussion for detail. The intercept in R's GLMNET is computed using closed form after the coefficients are
// http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet converged. See the following discussion for detail.
http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
*/
val intercept = if ($(fitIntercept)) yMean - dot(weights, Vectors.dense(featuresMean)) else 0.0 val intercept = if ($(fitIntercept)) yMean - dot(weights, Vectors.dense(featuresMean)) else 0.0
if (handlePersistence) instances.unpersist() if (handlePersistence) instances.unpersist()
// TODO: Converts to sparse format based on the storage, but may base on the scoring speed. copyValues(new LinearRegressionModel(uid, weights, intercept))
copyValues(new LinearRegressionModel(uid, weights.compressed, intercept))
} }
override def copy(extra: ParamMap): LinearRegression = defaultCopy(extra) override def copy(extra: ParamMap): LinearRegression = defaultCopy(extra)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment