Skip to content
Snippets Groups Projects
Unverified Commit 9e8a9d7c authored by Anthony Truchet's avatar Anthony Truchet Committed by Sean Owen
Browse files

[SPARK-18471][MLLIB] In LBFGS, avoid sending huge vectors of 0

## What changes were proposed in this pull request?

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.

## How was this patch tested?

Unit test for module mllib run locally for correctness.

As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.

Author: Anthony Truchet <a.truchet@criteo.com>
Author: sethah <seth.hendrickson16@gmail.com>
Author: Anthony Truchet <AnthonyTruchet@users.noreply.github.com>

Closes #16037 from AnthonyTruchet/ENG-17719-lbfgs-only.
parent e57e3938
No related branches found
No related tags found
No related merge requests found
......@@ -241,16 +241,24 @@ object LBFGS extends Logging {
val bcW = data.context.broadcast(w)
val localGradient = gradient
val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, bcW.value, grad)
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
axpy(1.0, grad2, grad1)
(grad1, loss1 + loss2)
})
val seqOp = (c: (Vector, Double), v: (Double, Vector)) =>
(c, v) match {
case ((grad, loss), (label, features)) =>
val denseGrad = grad.toDense
val l = localGradient.compute(features, label, bcW.value, denseGrad)
(denseGrad, loss + l)
}
val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
(c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
val denseGrad1 = grad1.toDense
val denseGrad2 = grad2.toDense
axpy(1.0, denseGrad2, denseGrad1)
(denseGrad1, loss1 + loss2)
}
val zeroSparseVector = Vectors.sparse(n, Seq())
val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp)
// broadcasted model is not needed anymore
bcW.destroy()
......
......@@ -230,6 +230,25 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
(weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02),
"The weight differences between LBFGS and GD should be within 2%.")
}
test("SPARK-18471: LBFGS aggregator on empty partitions") {
val regParam = 0
val initialWeightsWithIntercept = Vectors.dense(0.0)
val convergenceTol = 1e-12
val numIterations = 1
val dataWithEmptyPartitions = sc.parallelize(Seq((1.0, Vectors.dense(2.0))), 2)
LBFGS.runLBFGS(
dataWithEmptyPartitions,
gradient,
simpleUpdater,
numCorrections,
convergenceTol,
numIterations,
regParam,
initialWeightsWithIntercept)
}
}
class LBFGSClusterSuite extends SparkFunSuite with LocalClusterSparkContext {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment