Skip to content
Snippets Groups Projects
Commit 98b5ccd3 authored by Zheng RuiFeng's avatar Zheng RuiFeng Committed by Sean Owen
Browse files

[SPARK-20930][ML] Destroy broadcasted centers after computing cost in KMeans

## What changes were proposed in this pull request?
 Destroy broadcasted centers after computing cost
## How was this patch tested?
existing tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #18152 from zhengruifeng/destroy_kmeans_model.
parent 2d39711b
No related branches found
No related tags found
No related merge requests found
...@@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec ...@@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec
@Since("0.8.0") @Since("0.8.0")
def computeCost(data: RDD[Vector]): Double = { def computeCost(data: RDD[Vector]): Double = {
val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm) val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm)
data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() val cost = data
.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
bcCentersWithNorm.destroy(blocking = false)
cost
} }
......
...@@ -320,6 +320,7 @@ class LocalLDAModel private[spark] ( ...@@ -320,6 +320,7 @@ class LocalLDAModel private[spark] (
docBound docBound
}.sum() }.sum()
ElogbetaBc.destroy(blocking = false)
// Bound component for prob(topic-term distributions): // Bound component for prob(topic-term distributions):
// E[log p(beta | eta) - log q(beta | lambda)] // E[log p(beta | eta) - log q(beta | lambda)]
...@@ -372,7 +373,6 @@ class LocalLDAModel private[spark] ( ...@@ -372,7 +373,6 @@ class LocalLDAModel private[spark] (
*/ */
private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = { private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = {
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t) val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
val expElogbetaBc = sc.broadcast(expElogbeta)
val docConcentrationBrz = this.docConcentration.asBreeze val docConcentrationBrz = this.docConcentration.asBreeze
val gammaShape = this.gammaShape val gammaShape = this.gammaShape
val k = this.k val k = this.k
...@@ -383,7 +383,7 @@ class LocalLDAModel private[spark] ( ...@@ -383,7 +383,7 @@ class LocalLDAModel private[spark] (
} else { } else {
val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference( val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, termCounts,
expElogbetaBc.value, expElogbeta,
docConcentrationBrz, docConcentrationBrz,
gammaShape, gammaShape,
k) k)
......
...@@ -246,6 +246,7 @@ object GradientDescent extends Logging { ...@@ -246,6 +246,7 @@ object GradientDescent extends Logging {
// c: (grad, loss, count) // c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}) })
bcWeights.destroy(blocking = false)
if (miniBatchSize > 0) { if (miniBatchSize > 0) {
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment