Skip to content
Snippets Groups Projects
Unverified Commit 56859c02 authored by sethah's avatar sethah Committed by DB Tsai
Browse files

[SPARK-18060][ML] Avoid unnecessary computation for MLOR


## What changes were proposed in this pull request?

Before this patch, the gradient updates for multinomial logistic regression were computed by an outer loop over the number of classes and an inner loop over the number of features. Inside the inner loop, we standardized the feature value (`value / featuresStd(index)`), which means we performed the computation `numFeatures * numClasses` times. We only need to perform that computation `numFeatures` times, however. If we re-order the inner and outer loop, we can avoid this, but then we lose sequential memory access. In this patch, we instead lay out the coefficients in column major order while we train, so that we can avoid the extra computation and retain sequential memory access. We convert back to row-major order when we create the model.

## How was this patch tested?

This is an implementation detail only, so the original behavior should be maintained. All tests pass. I ran some performance tests to verify speedups. The results are below, and show significant speedups.
## Performance Tests

**Setup**

3 node bare-metal cluster
120 cores total
384 gb RAM total

**Results**

NOTE: The `currentMasterTime` and `thisPatchTime` are times in seconds for a single iteration of L-BFGS or OWL-QN.

|    |   numPoints |   numFeatures |   numClasses |   regParam |   elasticNetParam |   currentMasterTime (sec) |   thisPatchTime (sec) |   pctSpeedup |
|----|-------------|---------------|--------------|------------|-------------------|---------------------------|-----------------------|--------------|
|  0 |       1e+07 |           100 |          500 |       0.5  |                 0 |                        90 |                    18 |           80 |
|  1 |       1e+08 |           100 |           50 |       0.5  |                 0 |                        90 |                    19 |           78 |
|  2 |       1e+08 |           100 |           50 |       0.05 |                 1 |                        72 |                    19 |           73 |
|  3 |       1e+06 |           100 |         5000 |       0.5  |                 0 |                        93 |                    53 |           43 |
|  4 |       1e+07 |           100 |         5000 |       0.5  |                 0 |                       900 |                   390 |           56 |
|  5 |       1e+08 |           100 |          500 |       0.5  |                 0 |                       840 |                   174 |           79 |
|  6 |       1e+08 |           100 |          200 |       0.5  |                 0 |                       360 |                    72 |           80 |
|  7 |       1e+08 |          1000 |            5 |       0.5  |                 0 |                         9 |                     3 |           66 |

Author: sethah <seth.hendrickson16@gmail.com>

Closes #15593 from sethah/MLOR_PERF_COL_MAJOR_COEF.

(cherry picked from commit 46b2550b)
Signed-off-by: default avatarDB Tsai <dbtsai@dbtsai.com>
parent c2ebda44
No related branches found
No related tags found
No related merge requests found
...@@ -438,18 +438,14 @@ class LogisticRegression @Since("1.2.0") ( ...@@ -438,18 +438,14 @@ class LogisticRegression @Since("1.2.0") (
val standardizationParam = $(standardization) val standardizationParam = $(standardization)
def regParamL1Fun = (index: Int) => { def regParamL1Fun = (index: Int) => {
// Remove the L1 penalization on the intercept // Remove the L1 penalization on the intercept
val isIntercept = $(fitIntercept) && ((index + 1) % numFeaturesPlusIntercept == 0) val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets
if (isIntercept) { if (isIntercept) {
0.0 0.0
} else { } else {
if (standardizationParam) { if (standardizationParam) {
regParamL1 regParamL1
} else { } else {
val featureIndex = if ($(fitIntercept)) { val featureIndex = index / numCoefficientSets
index % numFeaturesPlusIntercept
} else {
index % numFeatures
}
// If `standardization` is false, we still standardize the data // If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to // to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component // perform this reverse standardization by penalizing each component
...@@ -466,6 +462,15 @@ class LogisticRegression @Since("1.2.0") ( ...@@ -466,6 +462,15 @@ class LogisticRegression @Since("1.2.0") (
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
} }
/*
The coefficients are laid out in column major order during training. e.g. for
`numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:
Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
intercept_3)
where beta_jk corresponds to the coefficient for class `j` and feature `k`.
*/
val initialCoefficientsWithIntercept = val initialCoefficientsWithIntercept =
Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept) Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
...@@ -489,13 +494,14 @@ class LogisticRegression @Since("1.2.0") ( ...@@ -489,13 +494,14 @@ class LogisticRegression @Since("1.2.0") (
val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
val providedCoef = optInitialModel.get.coefficientMatrix val providedCoef = optInitialModel.get.coefficientMatrix
providedCoef.foreachActive { (row, col, value) => providedCoef.foreachActive { (row, col, value) =>
val flatIndex = row * numFeaturesPlusIntercept + col // convert matrix to column major for training
val flatIndex = col * numCoefficientSets + row
// We need to scale the coefficients since they will be trained in the scaled space // We need to scale the coefficients since they will be trained in the scaled space
initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col) initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
} }
if ($(fitIntercept)) { if ($(fitIntercept)) {
optInitialModel.get.interceptVector.foreachActive { (index, value) => optInitialModel.get.interceptVector.foreachActive { (index, value) =>
val coefIndex = (index + 1) * numFeaturesPlusIntercept - 1 val coefIndex = numCoefficientSets * numFeatures + index
initialCoefWithInterceptArray(coefIndex) = value initialCoefWithInterceptArray(coefIndex) = value
} }
} }
...@@ -526,7 +532,7 @@ class LogisticRegression @Since("1.2.0") ( ...@@ -526,7 +532,7 @@ class LogisticRegression @Since("1.2.0") (
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i => rawIntercepts.indices.foreach { i =>
initialCoefficientsWithIntercept.toArray(i * numFeaturesPlusIntercept + numFeatures) = initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
rawIntercepts(i) - rawMean rawIntercepts(i) - rawMean
} }
} else if ($(fitIntercept)) { } else if ($(fitIntercept)) {
...@@ -572,16 +578,20 @@ class LogisticRegression @Since("1.2.0") ( ...@@ -572,16 +578,20 @@ class LogisticRegression @Since("1.2.0") (
/* /*
The coefficients are trained in the scaled space; we're converting them back to The coefficients are trained in the scaled space; we're converting them back to
the original space. the original space.
Additionally, since the coefficients were laid out in column major order during training
to avoid extra computation, we convert them back to row major before passing them to the
model.
Note that the intercept in scaled space and original space is the same; Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed. as a result, no scaling is needed.
*/ */
val rawCoefficients = state.x.toArray.clone() val rawCoefficients = state.x.toArray.clone()
val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i => val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
// flatIndex will loop though rawCoefficients, and skip the intercept terms. val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
val flatIndex = if ($(fitIntercept)) i + i / numFeatures else i
val featureIndex = i % numFeatures val featureIndex = i % numFeatures
if (featuresStd(featureIndex) != 0.0) { if (featuresStd(featureIndex) != 0.0) {
rawCoefficients(flatIndex) / featuresStd(featureIndex) rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
} else { } else {
0.0 0.0
} }
...@@ -618,7 +628,7 @@ class LogisticRegression @Since("1.2.0") ( ...@@ -618,7 +628,7 @@ class LogisticRegression @Since("1.2.0") (
val interceptsArray: Array[Double] = if ($(fitIntercept)) { val interceptsArray: Array[Double] = if ($(fitIntercept)) {
Array.tabulate(numCoefficientSets) { i => Array.tabulate(numCoefficientSets) { i =>
val coefIndex = (i + 1) * numFeaturesPlusIntercept - 1 val coefIndex = numFeatures * numCoefficientSets + i
rawCoefficients(coefIndex) rawCoefficients(coefIndex)
} }
} else { } else {
...@@ -697,6 +707,7 @@ class LogisticRegressionModel private[spark] ( ...@@ -697,6 +707,7 @@ class LogisticRegressionModel private[spark] (
/** /**
* A vector of model coefficients for "binomial" logistic regression. If this model was trained * A vector of model coefficients for "binomial" logistic regression. If this model was trained
* using the "multinomial" family then an exception is thrown. * using the "multinomial" family then an exception is thrown.
*
* @return Vector * @return Vector
*/ */
@Since("2.0.0") @Since("2.0.0")
...@@ -720,6 +731,7 @@ class LogisticRegressionModel private[spark] ( ...@@ -720,6 +731,7 @@ class LogisticRegressionModel private[spark] (
/** /**
* The model intercept for "binomial" logistic regression. If this model was fit with the * The model intercept for "binomial" logistic regression. If this model was fit with the
* "multinomial" family then an exception is thrown. * "multinomial" family then an exception is thrown.
*
* @return Double * @return Double
*/ */
@Since("1.3.0") @Since("1.3.0")
...@@ -1389,6 +1401,12 @@ class BinaryLogisticRegressionSummary private[classification] ( ...@@ -1389,6 +1401,12 @@ class BinaryLogisticRegressionSummary private[classification] (
* $$ * $$
* </blockquote></p> * </blockquote></p>
* *
* @note In order to avoid unnecessary computation during calculation of the gradient updates
* we lay out the coefficients in column major order during training. This allows us to
* perform feature standardization once, while still retaining sequential memory access
* for speed. We convert back to row major order when we create the model,
* since this form is optimal for the matrix operations used for prediction.
*
* @param bcCoefficients The broadcast coefficients corresponding to the features. * @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param bcFeaturesStd The broadcast standard deviation values of the features. * @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param numClasses the number of possible outcomes for k classes classification problem in * @param numClasses the number of possible outcomes for k classes classification problem in
...@@ -1486,23 +1504,25 @@ private class LogisticAggregator( ...@@ -1486,23 +1504,25 @@ private class LogisticAggregator(
var marginOfLabel = 0.0 var marginOfLabel = 0.0
var maxMargin = Double.NegativeInfinity var maxMargin = Double.NegativeInfinity
val margins = Array.tabulate(numClasses) { i => val margins = new Array[Double](numClasses)
var margin = 0.0 features.foreachActive { (index, value) =>
features.foreachActive { (index, value) => val stdValue = value / localFeaturesStd(index)
if (localFeaturesStd(index) != 0.0 && value != 0.0) { var j = 0
margin += localCoefficients(i * numFeaturesPlusIntercept + index) * while (j < numClasses) {
value / localFeaturesStd(index) margins(j) += localCoefficients(index * numClasses + j) * stdValue
} j += 1
} }
}
var i = 0
while (i < numClasses) {
if (fitIntercept) { if (fitIntercept) {
margin += localCoefficients(i * numFeaturesPlusIntercept + numFeatures) margins(i) += localCoefficients(numClasses * numFeatures + i)
} }
if (i == label.toInt) marginOfLabel = margin if (i == label.toInt) marginOfLabel = margins(i)
if (margin > maxMargin) { if (margins(i) > maxMargin) {
maxMargin = margin maxMargin = margins(i)
} }
margin i += 1
} }
/** /**
...@@ -1510,33 +1530,39 @@ private class LogisticAggregator( ...@@ -1510,33 +1530,39 @@ private class LogisticAggregator(
* We address this by subtracting maxMargin from all the margins, so it's guaranteed * We address this by subtracting maxMargin from all the margins, so it's guaranteed
* that all of the new margins will be smaller than zero to prevent arithmetic overflow. * that all of the new margins will be smaller than zero to prevent arithmetic overflow.
*/ */
val multipliers = new Array[Double](numClasses)
val sum = { val sum = {
var temp = 0.0 var temp = 0.0
if (maxMargin > 0) { var i = 0
for (i <- 0 until numClasses) { while (i < numClasses) {
margins(i) -= maxMargin if (maxMargin > 0) margins(i) -= maxMargin
temp += math.exp(margins(i)) val exp = math.exp(margins(i))
} temp += exp
} else { multipliers(i) = exp
for (i <- 0 until numClasses) { i += 1
temp += math.exp(margins(i))
}
} }
temp temp
} }
for (i <- 0 until numClasses) { margins.indices.foreach { i =>
val multiplier = math.exp(margins(i)) / sum - { multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
if (label == i) 1.0 else 0.0 }
} features.foreachActive { (index, value) =>
features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) {
if (localFeaturesStd(index) != 0.0 && value != 0.0) { val stdValue = value / localFeaturesStd(index)
localGradientArray(i * numFeaturesPlusIntercept + index) += var j = 0
weight * multiplier * value / localFeaturesStd(index) while (j < numClasses) {
localGradientArray(index * numClasses + j) +=
weight * multipliers(j) * stdValue
j += 1
} }
} }
if (fitIntercept) { }
localGradientArray(i * numFeaturesPlusIntercept + numFeatures) += weight * multiplier if (fitIntercept) {
var i = 0
while (i < numClasses) {
localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i)
i += 1
} }
} }
...@@ -1637,6 +1663,7 @@ private class LogisticCostFun( ...@@ -1637,6 +1663,7 @@ private class LogisticCostFun(
val bcCoeffs = instances.context.broadcast(coeffs) val bcCoeffs = instances.context.broadcast(coeffs)
val featuresStd = bcFeaturesStd.value val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length val numFeatures = featuresStd.length
val numCoefficientSets = if (multinomial) numClasses else 1
val logisticAggregator = { val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
...@@ -1656,7 +1683,7 @@ private class LogisticCostFun( ...@@ -1656,7 +1683,7 @@ private class LogisticCostFun(
var sum = 0.0 var sum = 0.0
coeffs.foreachActive { case (index, value) => coeffs.foreachActive { case (index, value) =>
// We do not apply regularization to the intercepts // We do not apply regularization to the intercepts
val isIntercept = fitIntercept && ((index + 1) % (numFeatures + 1) == 0) val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
if (!isIntercept) { if (!isIntercept) {
// The following code will compute the loss of the regularization; also // The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray. // the gradient of the regularization, and add back to totalGradientArray.
...@@ -1665,11 +1692,7 @@ private class LogisticCostFun( ...@@ -1665,11 +1692,7 @@ private class LogisticCostFun(
totalGradientArray(index) += regParamL2 * value totalGradientArray(index) += regParamL2 * value
value * value value * value
} else { } else {
val featureIndex = if (fitIntercept) { val featureIndex = index / numCoefficientSets
index % (numFeatures + 1)
} else {
index % numFeatures
}
if (featuresStd(featureIndex) != 0.0) { if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data // If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to // to improve the rate of convergence; as a result, we have to
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment