diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index df2a9c0dd5094f448f0efb5e1d5388166adf9b3b..3ad08c46d204dad375a5f2c9b9c4163934101fd0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec @Since("0.8.0") def computeCost(data: RDD[Vector]): Double = { val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm) - data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + val cost = data + .map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + bcCentersWithNorm.destroy(blocking = false) + cost } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 663f63c25a940610b3babde585e68356a843f677..4ab420058f33ded7755ddb3c9e49a2e965a08c5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -320,6 +320,7 @@ class LocalLDAModel private[spark] ( docBound }.sum() + ElogbetaBc.destroy(blocking = false) // Bound component for prob(topic-term distributions): // E[log p(beta | eta) - log q(beta | lambda)] @@ -372,7 +373,6 @@ class LocalLDAModel private[spark] ( */ private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = { val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t) - val expElogbetaBc = sc.broadcast(expElogbeta) val docConcentrationBrz = this.docConcentration.asBreeze val gammaShape = this.gammaShape val k = this.k @@ -383,7 +383,7 @@ class LocalLDAModel private[spark] ( } else { val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference( termCounts, - expElogbetaBc.value, + expElogbeta, docConcentrationBrz, gammaShape, k) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 07a67a9e719db956505e89b9b1be09be92d72f18..593cdd602fafc07f14fe488da40f05a49b652693 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -246,6 +246,7 @@ object GradientDescent extends Logging { // c: (grad, loss, count) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) }) + bcWeights.destroy(blocking = false) if (miniBatchSize > 0) { /**