diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index c4651054fd7653231c1648424d760571f0bc6340..18b9b3043db8a1286143346edaa6091b9166ac4a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -438,18 +438,14 @@ class LogisticRegression @Since("1.2.0") ( val standardizationParam = $(standardization) def regParamL1Fun = (index: Int) => { // Remove the L1 penalization on the intercept - val isIntercept = $(fitIntercept) && ((index + 1) % numFeaturesPlusIntercept == 0) + val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets if (isIntercept) { 0.0 } else { if (standardizationParam) { regParamL1 } else { - val featureIndex = if ($(fitIntercept)) { - index % numFeaturesPlusIntercept - } else { - index % numFeatures - } + val featureIndex = index / numCoefficientSets // If `standardization` is false, we still standardize the data // to improve the rate of convergence; as a result, we have to // perform this reverse standardization by penalizing each component @@ -466,6 +462,15 @@ class LogisticRegression @Since("1.2.0") ( new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) } + /* + The coefficients are laid out in column major order during training. e.g. for + `numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is: + + Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2, + intercept_3) + + where beta_jk corresponds to the coefficient for class `j` and feature `k`. + */ val initialCoefficientsWithIntercept = Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept) @@ -489,13 +494,14 @@ class LogisticRegression @Since("1.2.0") ( val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray val providedCoef = optInitialModel.get.coefficientMatrix providedCoef.foreachActive { (row, col, value) => - val flatIndex = row * numFeaturesPlusIntercept + col + // convert matrix to column major for training + val flatIndex = col * numCoefficientSets + row // We need to scale the coefficients since they will be trained in the scaled space initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col) } if ($(fitIntercept)) { optInitialModel.get.interceptVector.foreachActive { (index, value) => - val coefIndex = (index + 1) * numFeaturesPlusIntercept - 1 + val coefIndex = numCoefficientSets * numFeatures + index initialCoefWithInterceptArray(coefIndex) = value } } @@ -526,7 +532,7 @@ class LogisticRegression @Since("1.2.0") ( val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing val rawMean = rawIntercepts.sum / rawIntercepts.length rawIntercepts.indices.foreach { i => - initialCoefficientsWithIntercept.toArray(i * numFeaturesPlusIntercept + numFeatures) = + initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) = rawIntercepts(i) - rawMean } } else if ($(fitIntercept)) { @@ -572,16 +578,20 @@ class LogisticRegression @Since("1.2.0") ( /* The coefficients are trained in the scaled space; we're converting them back to the original space. + + Additionally, since the coefficients were laid out in column major order during training + to avoid extra computation, we convert them back to row major before passing them to the + model. + Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ val rawCoefficients = state.x.toArray.clone() val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i => - // flatIndex will loop though rawCoefficients, and skip the intercept terms. - val flatIndex = if ($(fitIntercept)) i + i / numFeatures else i + val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures val featureIndex = i % numFeatures if (featuresStd(featureIndex) != 0.0) { - rawCoefficients(flatIndex) / featuresStd(featureIndex) + rawCoefficients(colMajorIndex) / featuresStd(featureIndex) } else { 0.0 } @@ -618,7 +628,7 @@ class LogisticRegression @Since("1.2.0") ( val interceptsArray: Array[Double] = if ($(fitIntercept)) { Array.tabulate(numCoefficientSets) { i => - val coefIndex = (i + 1) * numFeaturesPlusIntercept - 1 + val coefIndex = numFeatures * numCoefficientSets + i rawCoefficients(coefIndex) } } else { @@ -697,6 +707,7 @@ class LogisticRegressionModel private[spark] ( /** * A vector of model coefficients for "binomial" logistic regression. If this model was trained * using the "multinomial" family then an exception is thrown. + * * @return Vector */ @Since("2.0.0") @@ -720,6 +731,7 @@ class LogisticRegressionModel private[spark] ( /** * The model intercept for "binomial" logistic regression. If this model was fit with the * "multinomial" family then an exception is thrown. + * * @return Double */ @Since("1.3.0") @@ -1389,6 +1401,12 @@ class BinaryLogisticRegressionSummary private[classification] ( * $$ * </blockquote></p> * + * @note In order to avoid unnecessary computation during calculation of the gradient updates + * we lay out the coefficients in column major order during training. This allows us to + * perform feature standardization once, while still retaining sequential memory access + * for speed. We convert back to row major order when we create the model, + * since this form is optimal for the matrix operations used for prediction. + * * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in @@ -1486,23 +1504,25 @@ private class LogisticAggregator( var marginOfLabel = 0.0 var maxMargin = Double.NegativeInfinity - val margins = Array.tabulate(numClasses) { i => - var margin = 0.0 - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - margin += localCoefficients(i * numFeaturesPlusIntercept + index) * - value / localFeaturesStd(index) - } + val margins = new Array[Double](numClasses) + features.foreachActive { (index, value) => + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + margins(j) += localCoefficients(index * numClasses + j) * stdValue + j += 1 } - + } + var i = 0 + while (i < numClasses) { if (fitIntercept) { - margin += localCoefficients(i * numFeaturesPlusIntercept + numFeatures) + margins(i) += localCoefficients(numClasses * numFeatures + i) } - if (i == label.toInt) marginOfLabel = margin - if (margin > maxMargin) { - maxMargin = margin + if (i == label.toInt) marginOfLabel = margins(i) + if (margins(i) > maxMargin) { + maxMargin = margins(i) } - margin + i += 1 } /** @@ -1510,33 +1530,39 @@ private class LogisticAggregator( * We address this by subtracting maxMargin from all the margins, so it's guaranteed * that all of the new margins will be smaller than zero to prevent arithmetic overflow. */ + val multipliers = new Array[Double](numClasses) val sum = { var temp = 0.0 - if (maxMargin > 0) { - for (i <- 0 until numClasses) { - margins(i) -= maxMargin - temp += math.exp(margins(i)) - } - } else { - for (i <- 0 until numClasses) { - temp += math.exp(margins(i)) - } + var i = 0 + while (i < numClasses) { + if (maxMargin > 0) margins(i) -= maxMargin + val exp = math.exp(margins(i)) + temp += exp + multipliers(i) = exp + i += 1 } temp } - for (i <- 0 until numClasses) { - val multiplier = math.exp(margins(i)) / sum - { - if (label == i) 1.0 else 0.0 - } - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - localGradientArray(i * numFeaturesPlusIntercept + index) += - weight * multiplier * value / localFeaturesStd(index) + margins.indices.foreach { i => + multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) + } + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + localGradientArray(index * numClasses + j) += + weight * multipliers(j) * stdValue + j += 1 } } - if (fitIntercept) { - localGradientArray(i * numFeaturesPlusIntercept + numFeatures) += weight * multiplier + } + if (fitIntercept) { + var i = 0 + while (i < numClasses) { + localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i) + i += 1 } } @@ -1637,6 +1663,7 @@ private class LogisticCostFun( val bcCoeffs = instances.context.broadcast(coeffs) val featuresStd = bcFeaturesStd.value val numFeatures = featuresStd.length + val numCoefficientSets = if (multinomial) numClasses else 1 val logisticAggregator = { val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) @@ -1656,7 +1683,7 @@ private class LogisticCostFun( var sum = 0.0 coeffs.foreachActive { case (index, value) => // We do not apply regularization to the intercepts - val isIntercept = fitIntercept && ((index + 1) % (numFeatures + 1) == 0) + val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures if (!isIntercept) { // The following code will compute the loss of the regularization; also // the gradient of the regularization, and add back to totalGradientArray. @@ -1665,11 +1692,7 @@ private class LogisticCostFun( totalGradientArray(index) += regParamL2 * value value * value } else { - val featureIndex = if (fitIntercept) { - index % (numFeatures + 1) - } else { - index % numFeatures - } + val featureIndex = index / numCoefficientSets if (featuresStd(featureIndex) != 0.0) { // If `standardization` is false, we still standardize the data // to improve the rate of convergence; as a result, we have to