Skip to content
Snippets Groups Projects
Commit 4d6d8192 authored by dardelet's avatar dardelet Committed by Sean Owen
Browse files

[SPARK-21268][MLLIB] Move center calculations to a distributed map in KMeans

## What changes were proposed in this pull request?

The scal() and creation of newCenter vector is done in the driver, after a collectAsMap operation while it could be done in the distributed RDD.
This PR moves this code before the collectAsMap for more efficiency

## How was this patch tested?

This was tested manually by running the KMeansExample and verifying that the new code ran without error and gave same output as before.

Author: dardelet <guillaumegorp@gmail.com>
Author: Guillaume Dardelet <dardelet@users.noreply.github.com>

Closes #18491 from dardelet/move-center-calculation-to-distributed-map-kmean.
parent 1b50e0e0
No related branches found
No related tags found
No related merge requests found
......@@ -272,8 +272,8 @@ class KMeans private (
val costAccum = sc.doubleAccumulator
val bcCenters = sc.broadcast(centers)
// Find the sum and count of points mapping to each center
val totalContribs = data.mapPartitions { points =>
// Find the new centers
val newCenters = data.mapPartitions { points =>
val thisCenters = bcCenters.value
val dims = thisCenters.head.vector.size
......@@ -292,15 +292,16 @@ class KMeans private (
}.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.mapValues { case (sum, count) =>
scal(1.0 / count, sum)
new VectorWithNorm(sum)
}.collectAsMap()
bcCenters.destroy(blocking = false)
// Update the cluster centers and costs
converged = true
totalContribs.foreach { case (j, (sum, count)) =>
scal(1.0 / count, sum)
val newCenter = new VectorWithNorm(sum)
newCenters.foreach { case (j, newCenter) =>
if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
converged = false
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment