Skip to content
Snippets Groups Projects
Commit 68f99571 authored by Xiangrui Meng's avatar Xiangrui Meng
Browse files

[SPARK-9918] [MLLIB] remove runs from k-means and rename epsilon to tol

This requires some discussion. I'm not sure whether `runs` is a useful parameter. It certainly complicates the implementation. We might want to optimize the k-means implementation with block matrix operations. In this case, having `runs` may not be worth the trade-off. Also it increases the communication cost in a single job, which might cause other issues.

This PR also renames `epsilon` to `tol` to have consistent naming among algorithms. The Python constructor is updated to include all parameters.

jkbradley yu-iskw

Author: Xiangrui Meng <meng@databricks.com>

Closes #8148 from mengxr/SPARK-9918 and squashes the following commits:

149b9e5 [Xiangrui Meng] fix constructor in Python and rename epsilon to tol
3cc15b3 [Xiangrui Meng] fix test and change initStep to initSteps in python
a0a0274 [Xiangrui Meng] remove runs from k-means in the pipeline API
parent d0b18919
No related branches found
No related tags found
No related merge requests found
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
package org.apache.spark.ml.clustering package org.apache.spark.ml.clustering
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.param.{Param, Params, IntParam, DoubleParam, ParamMap} import org.apache.spark.ml.param.{Param, Params, IntParam, ParamMap}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasMaxIter, HasPredictionCol, HasSeed} import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.{Identifiable, SchemaUtils} import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
...@@ -27,14 +27,13 @@ import org.apache.spark.mllib.linalg.{Vector, VectorUDT} ...@@ -27,14 +27,13 @@ import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.Utils
/** /**
* Common params for KMeans and KMeansModel * Common params for KMeans and KMeansModel
*/ */
private[clustering] trait KMeansParams private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFeaturesCol
extends Params with HasMaxIter with HasFeaturesCol with HasSeed with HasPredictionCol { with HasSeed with HasPredictionCol with HasTol {
/** /**
* Set the number of clusters to create (k). Must be > 1. Default: 2. * Set the number of clusters to create (k). Must be > 1. Default: 2.
...@@ -45,31 +44,6 @@ private[clustering] trait KMeansParams ...@@ -45,31 +44,6 @@ private[clustering] trait KMeansParams
/** @group getParam */ /** @group getParam */
def getK: Int = $(k) def getK: Int = $(k)
/**
* Param the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Must be >= 1. Default: 1.
* @group param
*/
final val runs = new IntParam(this, "runs",
"number of runs of the algorithm to execute in parallel", (value: Int) => value >= 1)
/** @group getParam */
def getRuns: Int = $(runs)
/**
* Param the distance threshold within which we've consider centers to have converged.
* If all centers move less than this Euclidean distance, we stop iterating one run.
* Must be >= 0.0. Default: 1e-4
* @group param
*/
final val epsilon = new DoubleParam(this, "epsilon",
"distance threshold within which we've consider centers to have converge",
(value: Double) => value >= 0.0)
/** @group getParam */
def getEpsilon: Double = $(epsilon)
/** /**
* Param for the initialization algorithm. This can be either "random" to choose random points as * Param for the initialization algorithm. This can be either "random" to choose random points as
* initial cluster centers, or "k-means||" to use a parallel variant of k-means++ * initial cluster centers, or "k-means||" to use a parallel variant of k-means++
...@@ -136,9 +110,9 @@ class KMeansModel private[ml] ( ...@@ -136,9 +110,9 @@ class KMeansModel private[ml] (
/** /**
* :: Experimental :: * :: Experimental ::
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization * K-means clustering with support for k-means|| initialization proposed by Bahmani et al.
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, *
* they are executed together with joint passes over the data for efficiency. * @see [[http://dx.doi.org/10.14778/2180912.2180915 Bahmani et al., Scalable k-means++.]]
*/ */
@Experimental @Experimental
class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams { class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams {
...@@ -146,10 +120,9 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean ...@@ -146,10 +120,9 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean
setDefault( setDefault(
k -> 2, k -> 2,
maxIter -> 20, maxIter -> 20,
runs -> 1,
initMode -> MLlibKMeans.K_MEANS_PARALLEL, initMode -> MLlibKMeans.K_MEANS_PARALLEL,
initSteps -> 5, initSteps -> 5,
epsilon -> 1e-4) tol -> 1e-4)
override def copy(extra: ParamMap): KMeans = defaultCopy(extra) override def copy(extra: ParamMap): KMeans = defaultCopy(extra)
...@@ -174,10 +147,7 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean ...@@ -174,10 +147,7 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean
def setMaxIter(value: Int): this.type = set(maxIter, value) def setMaxIter(value: Int): this.type = set(maxIter, value)
/** @group setParam */ /** @group setParam */
def setRuns(value: Int): this.type = set(runs, value) def setTol(value: Double): this.type = set(tol, value)
/** @group setParam */
def setEpsilon(value: Double): this.type = set(epsilon, value)
/** @group setParam */ /** @group setParam */
def setSeed(value: Long): this.type = set(seed, value) def setSeed(value: Long): this.type = set(seed, value)
...@@ -191,8 +161,7 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean ...@@ -191,8 +161,7 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean
.setInitializationSteps($(initSteps)) .setInitializationSteps($(initSteps))
.setMaxIterations($(maxIter)) .setMaxIterations($(maxIter))
.setSeed($(seed)) .setSeed($(seed))
.setEpsilon($(epsilon)) .setEpsilon($(tol))
.setRuns($(runs))
val parentModel = algo.run(rdd) val parentModel = algo.run(rdd)
val model = new KMeansModel(uid, parentModel) val model = new KMeansModel(uid, parentModel)
copyValues(model) copyValues(model)
......
...@@ -52,10 +52,9 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { ...@@ -52,10 +52,9 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(kmeans.getFeaturesCol === "features") assert(kmeans.getFeaturesCol === "features")
assert(kmeans.getPredictionCol === "prediction") assert(kmeans.getPredictionCol === "prediction")
assert(kmeans.getMaxIter === 20) assert(kmeans.getMaxIter === 20)
assert(kmeans.getRuns === 1)
assert(kmeans.getInitMode === MLlibKMeans.K_MEANS_PARALLEL) assert(kmeans.getInitMode === MLlibKMeans.K_MEANS_PARALLEL)
assert(kmeans.getInitSteps === 5) assert(kmeans.getInitSteps === 5)
assert(kmeans.getEpsilon === 1e-4) assert(kmeans.getTol === 1e-4)
} }
test("set parameters") { test("set parameters") {
...@@ -64,21 +63,19 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { ...@@ -64,21 +63,19 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
.setFeaturesCol("test_feature") .setFeaturesCol("test_feature")
.setPredictionCol("test_prediction") .setPredictionCol("test_prediction")
.setMaxIter(33) .setMaxIter(33)
.setRuns(7)
.setInitMode(MLlibKMeans.RANDOM) .setInitMode(MLlibKMeans.RANDOM)
.setInitSteps(3) .setInitSteps(3)
.setSeed(123) .setSeed(123)
.setEpsilon(1e-3) .setTol(1e-3)
assert(kmeans.getK === 9) assert(kmeans.getK === 9)
assert(kmeans.getFeaturesCol === "test_feature") assert(kmeans.getFeaturesCol === "test_feature")
assert(kmeans.getPredictionCol === "test_prediction") assert(kmeans.getPredictionCol === "test_prediction")
assert(kmeans.getMaxIter === 33) assert(kmeans.getMaxIter === 33)
assert(kmeans.getRuns === 7)
assert(kmeans.getInitMode === MLlibKMeans.RANDOM) assert(kmeans.getInitMode === MLlibKMeans.RANDOM)
assert(kmeans.getInitSteps === 3) assert(kmeans.getInitSteps === 3)
assert(kmeans.getSeed === 123) assert(kmeans.getSeed === 123)
assert(kmeans.getEpsilon === 1e-3) assert(kmeans.getTol === 1e-3)
} }
test("parameters validation") { test("parameters validation") {
...@@ -91,9 +88,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { ...@@ -91,9 +88,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
new KMeans().setInitSteps(0) new KMeans().setInitSteps(0)
} }
intercept[IllegalArgumentException] {
new KMeans().setRuns(0)
}
} }
test("fit & transform") { test("fit & transform") {
......
...@@ -19,7 +19,6 @@ from pyspark.ml.util import keyword_only ...@@ -19,7 +19,6 @@ from pyspark.ml.util import keyword_only
from pyspark.ml.wrapper import JavaEstimator, JavaModel from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import * from pyspark.ml.param.shared import *
from pyspark.mllib.common import inherit_doc from pyspark.mllib.common import inherit_doc
from pyspark.mllib.linalg import _convert_to_vector
__all__ = ['KMeans', 'KMeansModel'] __all__ = ['KMeans', 'KMeansModel']
...@@ -35,7 +34,7 @@ class KMeansModel(JavaModel): ...@@ -35,7 +34,7 @@ class KMeansModel(JavaModel):
@inherit_doc @inherit_doc
class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed): class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed):
""" """
K-means clustering with support for multiple parallel runs and a k-means++ like initialization K-means clustering with support for multiple parallel runs and a k-means++ like initialization
mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
...@@ -45,7 +44,7 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed): ...@@ -45,7 +44,7 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
>>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), >>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] ... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
>>> df = sqlContext.createDataFrame(data, ["features"]) >>> df = sqlContext.createDataFrame(data, ["features"])
>>> kmeans = KMeans().setK(2).setSeed(1).setFeaturesCol("features") >>> kmeans = KMeans(k=2, seed=1)
>>> model = kmeans.fit(df) >>> model = kmeans.fit(df)
>>> centers = model.clusterCenters() >>> centers = model.clusterCenters()
>>> len(centers) >>> len(centers)
...@@ -60,10 +59,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed): ...@@ -60,10 +59,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
# a placeholder to make it appear in the generated doc # a placeholder to make it appear in the generated doc
k = Param(Params._dummy(), "k", "number of clusters to create") k = Param(Params._dummy(), "k", "number of clusters to create")
epsilon = Param(Params._dummy(), "epsilon",
"distance threshold within which " +
"we've consider centers to have converged")
runs = Param(Params._dummy(), "runs", "number of runs of the algorithm to execute in parallel")
initMode = Param(Params._dummy(), "initMode", initMode = Param(Params._dummy(), "initMode",
"the initialization algorithm. This can be either \"random\" to " + "the initialization algorithm. This can be either \"random\" to " +
"choose random points as initial cluster centers, or \"k-means||\" " + "choose random points as initial cluster centers, or \"k-means||\" " +
...@@ -71,21 +66,21 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed): ...@@ -71,21 +66,21 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
initSteps = Param(Params._dummy(), "initSteps", "steps for k-means initialization mode") initSteps = Param(Params._dummy(), "initSteps", "steps for k-means initialization mode")
@keyword_only @keyword_only
def __init__(self, k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initStep=5): def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None):
"""
__init__(self, featuresCol="features", predictionCol="prediction", k=2, \
initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None)
"""
super(KMeans, self).__init__() super(KMeans, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.KMeans", self.uid) self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.KMeans", self.uid)
self.k = Param(self, "k", "number of clusters to create") self.k = Param(self, "k", "number of clusters to create")
self.epsilon = Param(self, "epsilon",
"distance threshold within which " +
"we've consider centers to have converged")
self.runs = Param(self, "runs", "number of runs of the algorithm to execute in parallel")
self.seed = Param(self, "seed", "random seed")
self.initMode = Param(self, "initMode", self.initMode = Param(self, "initMode",
"the initialization algorithm. This can be either \"random\" to " + "the initialization algorithm. This can be either \"random\" to " +
"choose random points as initial cluster centers, or \"k-means||\" " + "choose random points as initial cluster centers, or \"k-means||\" " +
"to use a parallel variant of k-means++") "to use a parallel variant of k-means++")
self.initSteps = Param(self, "initSteps", "steps for k-means initialization mode") self.initSteps = Param(self, "initSteps", "steps for k-means initialization mode")
self._setDefault(k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initSteps=5) self._setDefault(k=2, initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20)
kwargs = self.__init__._input_kwargs kwargs = self.__init__._input_kwargs
self.setParams(**kwargs) self.setParams(**kwargs)
...@@ -93,9 +88,11 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed): ...@@ -93,9 +88,11 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
return KMeansModel(java_model) return KMeansModel(java_model)
@keyword_only @keyword_only
def setParams(self, k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initSteps=5): def setParams(self, featuresCol="features", predictionCol="prediction", k=2,
initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None):
""" """
setParams(self, k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initSteps=5): setParams(self, featuresCol="features", predictionCol="prediction", k=2, \
initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None)
Sets params for KMeans. Sets params for KMeans.
""" """
...@@ -119,40 +116,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed): ...@@ -119,40 +116,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
""" """
return self.getOrDefault(self.k) return self.getOrDefault(self.k)
def setEpsilon(self, value):
"""
Sets the value of :py:attr:`epsilon`.
>>> algo = KMeans().setEpsilon(1e-5)
>>> abs(algo.getEpsilon() - 1e-5) < 1e-5
True
"""
self._paramMap[self.epsilon] = value
return self
def getEpsilon(self):
"""
Gets the value of `epsilon`
"""
return self.getOrDefault(self.epsilon)
def setRuns(self, value):
"""
Sets the value of :py:attr:`runs`.
>>> algo = KMeans().setRuns(10)
>>> algo.getRuns()
10
"""
self._paramMap[self.runs] = value
return self
def getRuns(self):
"""
Gets the value of `runs`
"""
return self.getOrDefault(self.runs)
def setInitMode(self, value): def setInitMode(self, value):
""" """
Sets the value of :py:attr:`initMode`. Sets the value of :py:attr:`initMode`.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment