Skip to content
Snippets Groups Projects
Unverified Commit 8d33e1e5 authored by Sean Owen's avatar Sean Owen
Browse files

[SPARK-11560][MLLIB] Optimize KMeans implementation / remove 'runs'

## What changes were proposed in this pull request?

This is a revival of https://github.com/apache/spark/pull/14948 and related to https://github.com/apache/spark/pull/14937. This removes the 'runs' parameter, which has already been disabled, from the K-means implementation and further deprecates API methods that involve it.

This also happens to resolve the issue that K-means should not return duplicate centers, meaning that it may return less than k centroids if not enough data is available.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #15342 from srowen/SPARK-11560.
parent c264ef9b
No related branches found
No related tags found
No related merge requests found
...@@ -43,18 +43,17 @@ import org.apache.spark.util.random.XORShiftRandom ...@@ -43,18 +43,17 @@ import org.apache.spark.util.random.XORShiftRandom
class KMeans private ( class KMeans private (
private var k: Int, private var k: Int,
private var maxIterations: Int, private var maxIterations: Int,
private var runs: Int,
private var initializationMode: String, private var initializationMode: String,
private var initializationSteps: Int, private var initializationSteps: Int,
private var epsilon: Double, private var epsilon: Double,
private var seed: Long) extends Serializable with Logging { private var seed: Long) extends Serializable with Logging {
/** /**
* Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20,
* initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random}. * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random}.
*/ */
@Since("0.8.0") @Since("0.8.0")
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong())
/** /**
* Number of clusters to create (k). * Number of clusters to create (k).
...@@ -112,15 +111,17 @@ class KMeans private ( ...@@ -112,15 +111,17 @@ class KMeans private (
* This function has no effect since Spark 2.0.0. * This function has no effect since Spark 2.0.0.
*/ */
@Since("1.4.0") @Since("1.4.0")
@deprecated("This has no effect and always returns 1", "2.1.0")
def getRuns: Int = { def getRuns: Int = {
logWarning("Getting number of runs has no effect since Spark 2.0.0.") logWarning("Getting number of runs has no effect since Spark 2.0.0.")
runs 1
} }
/** /**
* This function has no effect since Spark 2.0.0. * This function has no effect since Spark 2.0.0.
*/ */
@Since("0.8.0") @Since("0.8.0")
@deprecated("This has no effect", "2.1.0")
def setRuns(runs: Int): this.type = { def setRuns(runs: Int): this.type = {
logWarning("Setting number of runs has no effect since Spark 2.0.0.") logWarning("Setting number of runs has no effect since Spark 2.0.0.")
this this
...@@ -239,17 +240,9 @@ class KMeans private ( ...@@ -239,17 +240,9 @@ class KMeans private (
val initStartTime = System.nanoTime() val initStartTime = System.nanoTime()
// Only one run is allowed when initialModel is given
val numRuns = if (initialModel.nonEmpty) {
if (runs > 1) logWarning("Ignoring runs; one run is allowed when initialModel is given.")
1
} else {
runs
}
val centers = initialModel match { val centers = initialModel match {
case Some(kMeansCenters) => case Some(kMeansCenters) =>
Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s))) kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
case None => case None =>
if (initializationMode == KMeans.RANDOM) { if (initializationMode == KMeans.RANDOM) {
initRandom(data) initRandom(data)
...@@ -258,89 +251,62 @@ class KMeans private ( ...@@ -258,89 +251,62 @@ class KMeans private (
} }
} }
val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
" seconds.")
val active = Array.fill(numRuns)(true) var converged = false
val costs = Array.fill(numRuns)(0.0) var cost = 0.0
var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
var iteration = 0 var iteration = 0
val iterationStartTime = System.nanoTime() val iterationStartTime = System.nanoTime()
instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) instr.foreach(_.logNumFeatures(centers.head.vector.size))
// Execute iterations of Lloyd's algorithm until all runs have converged // Execute iterations of Lloyd's algorithm until converged
while (iteration < maxIterations && !activeRuns.isEmpty) { while (iteration < maxIterations && !converged) {
type WeightedPoint = (Vector, Long) val costAccum = sc.doubleAccumulator
def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { val bcCenters = sc.broadcast(centers)
axpy(1.0, x._1, y._1)
(y._1, x._2 + y._2)
}
val activeCenters = activeRuns.map(r => centers(r)).toArray
val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
val bcActiveCenters = sc.broadcast(activeCenters)
// Find the sum and count of points mapping to each center // Find the sum and count of points mapping to each center
val totalContribs = data.mapPartitions { points => val totalContribs = data.mapPartitions { points =>
val thisActiveCenters = bcActiveCenters.value val thisCenters = bcCenters.value
val runs = thisActiveCenters.length val dims = thisCenters.head.vector.size
val k = thisActiveCenters(0).length
val dims = thisActiveCenters(0)(0).vector.size
val sums = Array.fill(runs, k)(Vectors.zeros(dims)) val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
val counts = Array.fill(runs, k)(0L) val counts = Array.fill(thisCenters.length)(0L)
points.foreach { point => points.foreach { point =>
(0 until runs).foreach { i => val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) costAccum.add(cost)
costAccums(i).add(cost) val sum = sums(bestCenter)
val sum = sums(i)(bestCenter) axpy(1.0, point.vector, sum)
axpy(1.0, point.vector, sum) counts(bestCenter) += 1
counts(i)(bestCenter) += 1
}
} }
val contribs = for (i <- 0 until runs; j <- 0 until k) yield { counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
((i, j), (sums(i)(j), counts(i)(j))) }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
} axpy(1.0, sum2, sum1)
contribs.iterator (sum1, count1 + count2)
}.reduceByKey(mergeContribs).collectAsMap() }.collectAsMap()
bcActiveCenters.destroy(blocking = false) bcCenters.destroy(blocking = false)
// Update the cluster centers and costs for each active run // Update the cluster centers and costs
for ((run, i) <- activeRuns.zipWithIndex) { converged = true
var changed = false totalContribs.foreach { case (j, (sum, count)) =>
var j = 0 scal(1.0 / count, sum)
while (j < k) { val newCenter = new VectorWithNorm(sum)
val (sum, count) = totalContribs((i, j)) if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
if (count != 0) { converged = false
scal(1.0 / count, sum)
val newCenter = new VectorWithNorm(sum)
if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
changed = true
}
centers(run)(j) = newCenter
}
j += 1
}
if (!changed) {
active(run) = false
logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")
} }
costs(run) = costAccums(i).value centers(j) = newCenter
} }
activeRuns = activeRuns.filter(active(_)) cost = costAccum.value
iteration += 1 iteration += 1
} }
val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.") logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
if (iteration == maxIterations) { if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.") logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
...@@ -348,59 +314,43 @@ class KMeans private ( ...@@ -348,59 +314,43 @@ class KMeans private (
logInfo(s"KMeans converged in $iteration iterations.") logInfo(s"KMeans converged in $iteration iterations.")
} }
val (minCost, bestRun) = costs.zipWithIndex.min logInfo(s"The cost is $cost.")
logInfo(s"The cost for the best run is $minCost.") new KMeansModel(centers.map(_.vector))
new KMeansModel(centers(bestRun).map(_.vector))
} }
/** /**
* Initialize `runs` sets of cluster centers at random. * Initialize a set of cluster centers at random.
*/ */
private def initRandom(data: RDD[VectorWithNorm]) private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
: Array[Array[VectorWithNorm]] = { data.takeSample(true, k, new XORShiftRandom(this.seed).nextInt()).map(_.toDense)
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)
}.toArray)
} }
/** /**
* Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. * Initialize a set of cluster centers using the k-means|| algorithm by Bahmani et al.
* (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
* to find with dissimilar cluster centers by starting with a random center and then doing * to find dissimilar cluster centers by starting with a random center and then doing
* passes where more centers are chosen with probability proportional to their squared distance * passes where more centers are chosen with probability proportional to their squared distance
* to the current cluster set. It results in a provable approximation to an optimal clustering. * to the current cluster set. It results in a provable approximation to an optimal clustering.
* *
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/ */
private def initKMeansParallel(data: RDD[VectorWithNorm]) private def initKMeansParallel(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
: Array[Array[VectorWithNorm]] = {
// Initialize empty centers and point costs. // Initialize empty centers and point costs.
val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) var costs = data.map(_ => Double.PositiveInfinity)
var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
// Initialize each run's first center to a random point. // Initialize the first center to a random point.
val seed = new XORShiftRandom(this.seed).nextInt() val seed = new XORShiftRandom(this.seed).nextInt()
val sample = data.takeSample(true, runs, seed).toSeq val sample = data.takeSample(false, 1, seed)
// Could be empty if data is empty; fail with a better message early: // Could be empty if data is empty; fail with a better message early:
require(sample.size >= runs, s"Required $runs samples but got ${sample.size} from $data") require(sample.nonEmpty, s"No samples available from $data")
val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense))
val centers = ArrayBuffer[VectorWithNorm]()
/** Merges new centers to centers. */ var newCenters = Seq(sample.head.toDense)
def mergeNewCenters(): Unit = { centers ++= newCenters
var r = 0
while (r < runs) {
centers(r) ++= newCenters(r)
newCenters(r).clear()
r += 1
}
}
// On each step, sample 2 * k points on average for each run with probability proportional // On each step, sample 2 * k points on average with probability proportional
// to their squared distance from that run's centers. Note that only distances between points // to their squared distance from the centers. Note that only distances between points
// and new centers are computed in each iteration. // and new centers are computed in each iteration.
var step = 0 var step = 0
var bcNewCentersList = ArrayBuffer[Broadcast[_]]() var bcNewCentersList = ArrayBuffer[Broadcast[_]]()
...@@ -409,74 +359,39 @@ class KMeans private ( ...@@ -409,74 +359,39 @@ class KMeans private (
bcNewCentersList += bcNewCenters bcNewCentersList += bcNewCenters
val preCosts = costs val preCosts = costs
costs = data.zip(preCosts).map { case (point, cost) => costs = data.zip(preCosts).map { case (point, cost) =>
Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value, point), cost)
math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) }.persist(StorageLevel.MEMORY_AND_DISK)
} val sumCosts = costs.sum()
}.persist(StorageLevel.MEMORY_AND_DISK)
val sumCosts = costs
.aggregate(new Array[Double](runs))(
seqOp = (s, v) => {
// s += v
var r = 0
while (r < runs) {
s(r) += v(r)
r += 1
}
s
},
combOp = (s0, s1) => {
// s0 += s1
var r = 0
while (r < runs) {
s0(r) += s1(r)
r += 1
}
s0
}
)
bcNewCenters.unpersist(blocking = false) bcNewCenters.unpersist(blocking = false)
preCosts.unpersist(blocking = false) preCosts.unpersist(blocking = false)
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointsWithCosts.flatMap { case (p, c) => pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1)
val rs = (0 until runs).filter { r =>
rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
}
if (rs.nonEmpty) Some((p, rs)) else None
}
}.collect() }.collect()
mergeNewCenters() newCenters = chosen.map(_.toDense)
chosen.foreach { case (p, rs) => centers ++= newCenters
rs.foreach(newCenters(_) += p.toDense)
}
step += 1 step += 1
} }
mergeNewCenters()
costs.unpersist(blocking = false) costs.unpersist(blocking = false)
bcNewCentersList.foreach(_.destroy(false)) bcNewCentersList.foreach(_.destroy(false))
// Finally, we might have a set of more than k candidate centers for each run; weigh each if (centers.size == k) {
// candidate by the number of points in the dataset mapping to it and run a local k-means++ centers.toArray
// on the weighted centers to pick just k of them } else {
val bcCenters = data.context.broadcast(centers) // Finally, we might have a set of more or less than k candidate centers; weight each
val weightMap = data.flatMap { p => // candidate by the number of points in the dataset mapping to it and run a local k-means++
Iterator.tabulate(runs) { r => // on the weighted centers to pick k of them
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) val bcCenters = data.context.broadcast(centers)
} val countMap = data.map(KMeans.findClosest(bcCenters.value, _)._1).countByValue()
}.reduceByKey(_ + _).collectAsMap()
bcCenters.destroy(blocking = false) bcCenters.destroy(blocking = false)
val finalCenters = (0 until runs).par.map { r => val myWeights = centers.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
val myCenters = centers(r).toArray LocalKMeans.kMeansPlusPlus(0, centers.toArray, myWeights, k, 30)
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
} }
finalCenters.toArray
} }
} }
...@@ -493,6 +408,52 @@ object KMeans { ...@@ -493,6 +408,52 @@ object KMeans {
@Since("0.8.0") @Since("0.8.0")
val K_MEANS_PARALLEL = "k-means||" val K_MEANS_PARALLEL = "k-means||"
/**
* Trains a k-means model using the given set of parameters.
*
* @param data Training points as an `RDD` of `Vector` types.
* @param k Number of clusters to create.
* @param maxIterations Maximum number of iterations allowed.
* @param initializationMode The initialization algorithm. This can either be "random" or
* "k-means||". (default: "k-means||")
* @param seed Random seed for cluster initialization. Default is to generate seed based
* on system time.
*/
@Since("2.1.0")
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
initializationMode: String,
seed: Long): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initializationMode)
.setSeed(seed)
.run(data)
}
/**
* Trains a k-means model using the given set of parameters.
*
* @param data Training points as an `RDD` of `Vector` types.
* @param k Number of clusters to create.
* @param maxIterations Maximum number of iterations allowed.
* @param initializationMode The initialization algorithm. This can either be "random" or
* "k-means||". (default: "k-means||")
*/
@Since("2.1.0")
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
initializationMode: String): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initializationMode)
.run(data)
}
/** /**
* Trains a k-means model using the given set of parameters. * Trains a k-means model using the given set of parameters.
* *
...@@ -506,6 +467,7 @@ object KMeans { ...@@ -506,6 +467,7 @@ object KMeans {
* on system time. * on system time.
*/ */
@Since("1.3.0") @Since("1.3.0")
@deprecated("Use train method without 'runs'", "2.1.0")
def train( def train(
data: RDD[Vector], data: RDD[Vector],
k: Int, k: Int,
...@@ -531,6 +493,7 @@ object KMeans { ...@@ -531,6 +493,7 @@ object KMeans {
* "k-means||". (default: "k-means||") * "k-means||". (default: "k-means||")
*/ */
@Since("0.8.0") @Since("0.8.0")
@deprecated("Use train method without 'runs'", "2.1.0")
def train( def train(
data: RDD[Vector], data: RDD[Vector],
k: Int, k: Int,
...@@ -551,19 +514,24 @@ object KMeans { ...@@ -551,19 +514,24 @@ object KMeans {
data: RDD[Vector], data: RDD[Vector],
k: Int, k: Int,
maxIterations: Int): KMeansModel = { maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL) new KMeans().setK(k)
.setMaxIterations(maxIterations)
.run(data)
} }
/** /**
* Trains a k-means model using specified parameters and the default values for unspecified. * Trains a k-means model using specified parameters and the default values for unspecified.
*/ */
@Since("0.8.0") @Since("0.8.0")
@deprecated("Use train method without 'runs'", "2.1.0")
def train( def train(
data: RDD[Vector], data: RDD[Vector],
k: Int, k: Int,
maxIterations: Int, maxIterations: Int,
runs: Int): KMeansModel = { runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL) new KMeans().setK(k)
.setMaxIterations(maxIterations)
.run(data)
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment