From 5995ada96b661546a80657f2c5ed20604593e4aa Mon Sep 17 00:00:00 2001
From: Hrishikesh Subramonian <hrishikesh.subramonian@flytxt.com>
Date: Tue, 5 May 2015 07:57:39 -0700
Subject: [PATCH] [SPARK-6612] [MLLIB] [PYSPARK] Python KMeans parity

The following items are added to Python kmeans:

kmeans - setEpsilon, setInitializationSteps
KMeansModel - computeCost, k

Author: Hrishikesh Subramonian <hrishikesh.subramonian@flytxt.com>

Closes #5647 from FlytxtRnD/newPyKmeansAPI and squashes the following commits:

b9e451b [Hrishikesh Subramonian] set seed to fixed value in doc test
5fd3ced [Hrishikesh Subramonian] doc test corrections
20b3c68 [Hrishikesh Subramonian] python 3 fixes
4d4e695 [Hrishikesh Subramonian] added arguments in python tests
21eb84c [Hrishikesh Subramonian] Python Kmeans - setEpsilon, setInitializationSteps, k and computeCost added.
---
 examples/src/main/python/mllib/kmeans.py      |  1 +
 .../mllib/api/python/PythonMLLibAPI.scala     | 15 +++++++++-
 python/pyspark/mllib/clustering.py            | 29 ++++++++++++++++---
 python/pyspark/mllib/tests.py                 |  9 ++++--
 4 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
index f901a87fa6..002fc75799 100755
--- a/examples/src/main/python/mllib/kmeans.py
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -43,4 +43,5 @@ if __name__ == "__main__":
     k = int(sys.argv[2])
     model = KMeans.train(data, k)
     print("Final centers: " + str(model.clusterCenters))
+    print("Total Cost: " + str(model.computeCost(data)))
     sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 8e9a208d61..b086cec083 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -291,12 +291,16 @@ private[python] class PythonMLLibAPI extends Serializable {
       maxIterations: Int,
       runs: Int,
       initializationMode: String,
-      seed: java.lang.Long): KMeansModel = {
+      seed: java.lang.Long,
+      initializationSteps: Int,
+      epsilon: Double): KMeansModel = {
     val kMeansAlg = new KMeans()
       .setK(k)
       .setMaxIterations(maxIterations)
       .setRuns(runs)
       .setInitializationMode(initializationMode)
+      .setInitializationSteps(initializationSteps)
+      .setEpsilon(epsilon)
 
     if (seed != null) kMeansAlg.setSeed(seed)
 
@@ -307,6 +311,15 @@ private[python] class PythonMLLibAPI extends Serializable {
     }
   }
 
+  /**
+   * Java stub for Python mllib KMeansModel.computeCost()
+   */
+  def computeCostKmeansModel(
+      data: JavaRDD[Vector],
+      centers: java.util.ArrayList[Vector]): Double = {
+    new KMeansModel(centers).computeCost(data)
+  }
+
   /**
    * Java stub for Python mllib GaussianMixture.run()
    * Returns a list containing weights, mean and covariance of each mixture component.
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index abbb7cf60e..04e6715851 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -40,11 +40,16 @@ class KMeansModel(Saveable, Loader):
 
     >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
     >>> model = KMeans.train(
-    ...     sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
+    ...     sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random",
+    ...                    seed=50, initializationSteps=5, epsilon=1e-4)
     >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
     True
     >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
     True
+    >>> model.k
+    2
+    >>> model.computeCost(sc.parallelize(data))
+    2.0000000000000004
     >>> model = KMeans.train(sc.parallelize(data), 2)
     >>> sparse_data = [
     ...     SparseVector(3, {1: 1.0}),
@@ -52,7 +57,8 @@ class KMeansModel(Saveable, Loader):
     ...     SparseVector(3, {2: 1.0}),
     ...     SparseVector(3, {2: 1.1})
     ... ]
-    >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
+    >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||",
+    ...                                     seed=50, initializationSteps=5, epsilon=1e-4)
     >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
     True
     >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
@@ -83,6 +89,11 @@ class KMeansModel(Saveable, Loader):
         """Get the cluster centers, represented as a list of NumPy arrays."""
         return self.centers
 
+    @property
+    def k(self):
+        """Total number of clusters."""
+        return len(self.centers)
+
     def predict(self, x):
         """Find the cluster to which x belongs in this model."""
         best = 0
@@ -95,6 +106,15 @@ class KMeansModel(Saveable, Loader):
                 best_distance = distance
         return best
 
+    def computeCost(self, rdd):
+        """
+        Return the K-means cost (sum of squared distances of points to
+        their nearest center) for this model on the given data.
+        """
+        cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
+                             [_convert_to_vector(c) for c in self.centers])
+        return cost
+
     def save(self, sc, path):
         java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers])
         java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers)
@@ -109,10 +129,11 @@ class KMeansModel(Saveable, Loader):
 class KMeans(object):
 
     @classmethod
-    def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None):
+    def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||",
+              seed=None, initializationSteps=5, epsilon=1e-4):
         """Train a k-means clustering model."""
         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
-                              runs, initializationMode, seed)
+                              runs, initializationMode, seed, initializationSteps, epsilon)
         centers = callJavaFunc(rdd.context, model.clusterCenters)
         return KMeansModel([c.toArray() for c in centers])
 
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 1d9c6ebf3b..d05cfe2af0 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -236,7 +236,8 @@ class ListTests(MLlibTestCase):
             [1.1, 0],
             [1.2, 0],
         ]
-        clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+        clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||",
+                                initializationSteps=7, epsilon=1e-4)
         self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
         self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
 
@@ -246,9 +247,11 @@ class ListTests(MLlibTestCase):
         Y = range(0, 100, 10)
         data = [[x, y] for x, y in zip(X, Y)]
         clusters1 = KMeans.train(self.sc.parallelize(data),
-                                 3, initializationMode="k-means||", seed=42)
+                                 3, initializationMode="k-means||",
+                                 seed=42, initializationSteps=7, epsilon=1e-4)
         clusters2 = KMeans.train(self.sc.parallelize(data),
-                                 3, initializationMode="k-means||", seed=42)
+                                 3, initializationMode="k-means||",
+                                 seed=42, initializationSteps=7, epsilon=1e-4)
         centers1 = clusters1.centers
         centers2 = clusters2.centers
         for c1, c2 in zip(centers1, centers2):
-- 
GitLab