Skip to content
Snippets Groups Projects
Unverified Commit 15eb86c2 authored by sethah's avatar sethah Committed by DB Tsai
Browse files

[SPARK-18456][ML][FOLLOWUP] Use matrix abstraction for coefficients in LogisticRegression training

## What changes were proposed in this pull request?

This is a follow up to some of the discussion [here](https://github.com/apache/spark/pull/15593

). During LogisticRegression training, we store the coefficients combined with intercepts as a flat vector, but a more natural abstraction is a matrix. Here, we refactor the code to use matrix where possible, which makes the code more readable and greatly simplifies the indexing.

Note: We do not use a Breeze matrix for the cost function as was mentioned in the linked PR. This is because LBFGS/OWLQN require an implicit `MutableInnerProductModule[DenseMatrix[Double], Double]` which is not natively defined in Breeze. We would need to extend Breeze in Spark to define it ourselves. Also, we do not modify the `regParamL1Fun` because OWLQN in Breeze requires a `MutableEnumeratedCoordinateField[(Int, Int), DenseVector[Double]]` (since we still use a dense vector for coefficients). Here again we would have to extend Breeze inside Spark.

## How was this patch tested?

This is internal code refactoring - the current unit tests passing show us that the change did not break anything. No added functionality in this patch.

Author: sethah <seth.hendrickson16@gmail.com>

Closes #15893 from sethah/logreg_refactor.

(cherry picked from commit 856e0042)
Signed-off-by: default avatarDB Tsai <dbtsai@dbtsai.com>
parent 15ad3a31
No related branches found
No related tags found
No related merge requests found
......@@ -463,16 +463,11 @@ class LogisticRegression @Since("1.2.0") (
}
/*
The coefficients are laid out in column major order during training. e.g. for
`numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:
Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
intercept_3)
where beta_jk corresponds to the coefficient for class `j` and feature `k`.
The coefficients are laid out in column major order during training. Here we initialize
a column major matrix of initial coefficients.
*/
val initialCoefficientsWithIntercept =
Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
val initialCoefWithInterceptMatrix =
Matrices.zeros(numCoefficientSets, numFeaturesPlusIntercept)
val initialModelIsValid = optInitialModel match {
case Some(_initialModel) =>
......@@ -491,18 +486,15 @@ class LogisticRegression @Since("1.2.0") (
}
if (initialModelIsValid) {
val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
val providedCoef = optInitialModel.get.coefficientMatrix
providedCoef.foreachActive { (row, col, value) =>
// convert matrix to column major for training
val flatIndex = col * numCoefficientSets + row
providedCoef.foreachActive { (classIndex, featureIndex, value) =>
// We need to scale the coefficients since they will be trained in the scaled space
initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
initialCoefWithInterceptMatrix.update(classIndex, featureIndex,
value * featuresStd(featureIndex))
}
if ($(fitIntercept)) {
optInitialModel.get.interceptVector.foreachActive { (index, value) =>
val coefIndex = numCoefficientSets * numFeatures + index
initialCoefWithInterceptArray(coefIndex) = value
optInitialModel.get.interceptVector.foreachActive { (classIndex, value) =>
initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value)
}
}
} else if ($(fitIntercept) && isMultinomial) {
......@@ -532,8 +524,7 @@ class LogisticRegression @Since("1.2.0") (
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
rawIntercepts(i) - rawMean
initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
}
} else if ($(fitIntercept)) {
/*
......@@ -549,12 +540,12 @@ class LogisticRegression @Since("1.2.0") (
b = \log{P(1) / P(0)} = \log{count_1 / count_0}
}}}
*/
initialCoefficientsWithIntercept.toArray(numFeatures) = math.log(
histogram(1) / histogram(0))
initialCoefWithInterceptMatrix.update(0, numFeatures,
math.log(histogram(1) / histogram(0)))
}
val states = optimizer.iterations(new CachedDiffFunction(costFun),
initialCoefficientsWithIntercept.asBreeze.toDenseVector)
new BDV[Double](initialCoefWithInterceptMatrix.toArray))
/*
Note that in Logistic Regression, the objective history (loss + regularization)
......@@ -586,15 +577,24 @@ class LogisticRegression @Since("1.2.0") (
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state.x.toArray.clone()
val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
val featureIndex = i % numFeatures
if (featuresStd(featureIndex) != 0.0) {
rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
} else {
0.0
val allCoefficients = state.x.toArray.clone()
val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept,
allCoefficients)
val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures,
new Array[Double](numCoefficientSets * numFeatures), isTransposed = true)
val interceptVec = if ($(fitIntercept) || !isMultinomial) {
Vectors.zeros(numCoefficientSets)
} else {
Vectors.sparse(numCoefficientSets, Seq())
}
// separate intercepts and coefficients from the combined matrix
allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>
val isIntercept = $(fitIntercept) && (featureIndex == numFeatures)
if (!isIntercept && featuresStd(featureIndex) != 0.0) {
denseCoefficientMatrix.update(classIndex, featureIndex,
value / featuresStd(featureIndex))
}
if (isIntercept) interceptVec.toArray(classIndex) = value
}
if ($(regParam) == 0.0 && isMultinomial) {
......@@ -607,17 +607,16 @@ class LogisticRegression @Since("1.2.0") (
Friedman, et al. "Regularization Paths for Generalized Linear Models via
Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf
*/
val coefficientMean = coefficientArray.sum / coefficientArray.length
coefficientArray.indices.foreach { i => coefficientArray(i) -= coefficientMean}
val denseValues = denseCoefficientMatrix.values
val coefficientMean = denseValues.sum / denseValues.length
denseCoefficientMatrix.update(_ - coefficientMean)
}
val denseCoefficientMatrix =
new DenseMatrix(numCoefficientSets, numFeatures, coefficientArray, isTransposed = true)
// TODO: use `denseCoefficientMatrix.compressed` after SPARK-17471
val compressedCoefficientMatrix = if (isMultinomial) {
denseCoefficientMatrix
} else {
val compressedVector = Vectors.dense(coefficientArray).compressed
val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed
compressedVector match {
case dv: DenseVector => denseCoefficientMatrix
case sv: SparseVector =>
......@@ -626,25 +625,13 @@ class LogisticRegression @Since("1.2.0") (
}
}
val interceptsArray: Array[Double] = if ($(fitIntercept)) {
Array.tabulate(numCoefficientSets) { i =>
val coefIndex = numFeatures * numCoefficientSets + i
rawCoefficients(coefIndex)
}
} else {
Array.empty[Double]
}
val interceptVector = if (interceptsArray.nonEmpty && isMultinomial) {
// The intercepts are never regularized, so we always center the mean.
val interceptMean = interceptsArray.sum / numClasses
interceptsArray.indices.foreach { i => interceptsArray(i) -= interceptMean }
Vectors.dense(interceptsArray)
} else if (interceptsArray.length == 1) {
Vectors.dense(interceptsArray)
} else {
Vectors.sparse(numCoefficientSets, Seq())
// center the intercepts when using multinomial algorithm
if ($(fitIntercept) && isMultinomial) {
val interceptArray = interceptVec.toArray
val interceptMean = interceptArray.sum / interceptArray.length
(0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean }
}
(compressedCoefficientMatrix, interceptVector.compressed, arrayBuilder.result())
(compressedCoefficientMatrix, interceptVec.compressed, arrayBuilder.result())
}
}
......@@ -1424,6 +1411,7 @@ private class LogisticAggregator(
private val numFeatures = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
private val coefficientSize = bcCoefficients.value.size
private val numCoefficientSets = if (multinomial) numClasses else 1
if (multinomial) {
require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " +
s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize")
......@@ -1633,12 +1621,12 @@ private class LogisticAggregator(
lossSum / weightSum
}
def gradient: Vector = {
def gradient: Matrix = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but $weightSum.")
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / weightSum, result)
result
new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray)
}
}
......@@ -1664,6 +1652,7 @@ private class LogisticCostFun(
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length
val numCoefficientSets = if (multinomial) numClasses else 1
val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
......@@ -1675,24 +1664,25 @@ private class LogisticCostFun(
)(seqOp, combOp, aggregationDepth)
}
val totalGradientArray = logisticAggregator.gradient.toArray
val totalGradientMatrix = logisticAggregator.gradient
val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray)
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
var sum = 0.0
coeffs.foreachActive { case (index, value) =>
coefMatrix.foreachActive { case (classIndex, featureIndex, value) =>
// We do not apply regularization to the intercepts
val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
val isIntercept = fitIntercept && (featureIndex == numFeatures)
if (!isIntercept) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
totalGradientArray(index) += regParamL2 * value
val gradValue = totalGradientMatrix(classIndex, featureIndex)
totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value)
value * value
} else {
val featureIndex = index / numCoefficientSets
if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
......@@ -1700,7 +1690,8 @@ private class LogisticCostFun(
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex))
totalGradientArray(index) += regParamL2 * temp
val gradValue = totalGradientMatrix(classIndex, featureIndex)
totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp)
value * temp
} else {
0.0
......@@ -1713,6 +1704,6 @@ private class LogisticCostFun(
}
bcCoeffs.destroy(blocking = false)
(logisticAggregator.loss + regVal, new BDV(totalGradientArray))
(logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment