Skip to content
Snippets Groups Projects
Commit 1f0a4695 authored by sethah's avatar sethah Committed by Xiangrui Meng
Browse files

[SPARK-16008][ML] Remove unnecessary serialization in logistic regression

JIRA: [SPARK-16008](https://issues.apache.org/jira/browse/SPARK-16008)

## What changes were proposed in this pull request?
`LogisticAggregator` stores references to two arrays of dimension `numFeatures` which are serialized before the combine op, unnecessarily. This results in the shuffle write being ~3x (for multiclass logistic regression, this number will go up) larger than it should be (in MLlib, for instance, it is 3x smaller).

This patch modifies `LogisticAggregator.add` to accept the two arrays as method parameters which avoids the serialization.

## How was this patch tested?

I tested this locally and verified the serialization reduction.

![image](https://cloud.githubusercontent.com/assets/7275795/16140387/d2974bac-3404-11e6-94f9-268860c931a2.png)

Additionally, I ran some tests of a 4 node cluster (4x48 cores, 4x128 GB RAM). Data set size of 2M rows and 10k features showed >2x iteration speedup.

Author: sethah <seth.hendrickson16@gmail.com>

Closes #13729 from sethah/lr_improvement.
parent 34d6c4cd
No related branches found
No related tags found
No related merge requests found
...@@ -937,50 +937,47 @@ class BinaryLogisticRegressionSummary private[classification] ( ...@@ -937,50 +937,47 @@ class BinaryLogisticRegressionSummary private[classification] (
* Two LogisticAggregator can be merged together to have a summary of loss and gradient of * Two LogisticAggregator can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset. * the corresponding joint dataset.
* *
* @param coefficients The coefficients corresponding to the features.
* @param numClasses the number of possible outcomes for k classes classification problem in * @param numClasses the number of possible outcomes for k classes classification problem in
* Multinomial Logistic Regression. * Multinomial Logistic Regression.
* @param fitIntercept Whether to fit an intercept term. * @param fitIntercept Whether to fit an intercept term.
* @param featuresStd The standard deviation values of the features.
* @param featuresMean The mean values of the features.
*/ */
private class LogisticAggregator( private class LogisticAggregator(
coefficients: Vector, private val numFeatures: Int,
numClasses: Int, numClasses: Int,
fitIntercept: Boolean, fitIntercept: Boolean) extends Serializable {
featuresStd: Array[Double],
featuresMean: Array[Double]) extends Serializable {
private var weightSum = 0.0 private var weightSum = 0.0
private var lossSum = 0.0 private var lossSum = 0.0
private val coefficientsArray = coefficients match { private val gradientSumArray =
case dv: DenseVector => dv.values Array.ofDim[Double](if (fitIntercept) numFeatures + 1 else numFeatures)
case _ =>
throw new IllegalArgumentException(
s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
}
private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length
private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)
/** /**
* Add a new training instance to this LogisticAggregator, and update the loss and gradient * Add a new training instance to this LogisticAggregator, and update the loss and gradient
* of the objective function. * of the objective function.
* *
* @param instance The instance of data point to be added. * @param instance The instance of data point to be added.
* @param coefficients The coefficients corresponding to the features.
* @param featuresStd The standard deviation values of the features.
* @return This LogisticAggregator object. * @return This LogisticAggregator object.
*/ */
def add(instance: Instance): this.type = { def add(
instance: Instance,
coefficients: Vector,
featuresStd: Array[Double]): this.type = {
instance match { case Instance(label, weight, features) => instance match { case Instance(label, weight, features) =>
require(dim == features.size, s"Dimensions mismatch when adding new instance." + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
s" Expecting $dim but got ${features.size}.") s" Expecting $numFeatures but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this if (weight == 0.0) return this
val localCoefficientsArray = coefficientsArray val coefficientsArray = coefficients match {
case dv: DenseVector => dv.values
case _ =>
throw new IllegalArgumentException(
s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
}
val localGradientSumArray = gradientSumArray val localGradientSumArray = gradientSumArray
numClasses match { numClasses match {
...@@ -990,11 +987,11 @@ private class LogisticAggregator( ...@@ -990,11 +987,11 @@ private class LogisticAggregator(
var sum = 0.0 var sum = 0.0
features.foreachActive { (index, value) => features.foreachActive { (index, value) =>
if (featuresStd(index) != 0.0 && value != 0.0) { if (featuresStd(index) != 0.0 && value != 0.0) {
sum += localCoefficientsArray(index) * (value / featuresStd(index)) sum += coefficientsArray(index) * (value / featuresStd(index))
} }
} }
sum + { sum + {
if (fitIntercept) localCoefficientsArray(dim) else 0.0 if (fitIntercept) coefficientsArray(numFeatures) else 0.0
} }
} }
...@@ -1007,7 +1004,7 @@ private class LogisticAggregator( ...@@ -1007,7 +1004,7 @@ private class LogisticAggregator(
} }
if (fitIntercept) { if (fitIntercept) {
localGradientSumArray(dim) += multiplier localGradientSumArray(numFeatures) += multiplier
} }
if (label > 0) { if (label > 0) {
...@@ -1034,8 +1031,8 @@ private class LogisticAggregator( ...@@ -1034,8 +1031,8 @@ private class LogisticAggregator(
* @return This LogisticAggregator object. * @return This LogisticAggregator object.
*/ */
def merge(other: LogisticAggregator): this.type = { def merge(other: LogisticAggregator): this.type = {
require(dim == other.dim, s"Dimensions mismatch when merging with another " + require(numFeatures == other.numFeatures, s"Dimensions mismatch when merging with another " +
s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.") s"LeastSquaresAggregator. Expecting $numFeatures but got ${other.numFeatures}.")
if (other.weightSum != 0.0) { if (other.weightSum != 0.0) {
weightSum += other.weightSum weightSum += other.weightSum
...@@ -1086,13 +1083,17 @@ private class LogisticCostFun( ...@@ -1086,13 +1083,17 @@ private class LogisticCostFun(
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val numFeatures = featuresStd.length val numFeatures = featuresStd.length
val coeffs = Vectors.fromBreeze(coefficients) val coeffs = Vectors.fromBreeze(coefficients)
val n = coeffs.size
val localFeaturesStd = featuresStd
val logisticAggregator = { val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) val seqOp = (c: LogisticAggregator, instance: Instance) =>
c.add(instance, coeffs, localFeaturesStd)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
instances.treeAggregate( instances.treeAggregate(
new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean) new LogisticAggregator(numFeatures, numClasses, fitIntercept)
)(seqOp, combOp) )(seqOp, combOp)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment