From 3b9d9de583bf2ee0c7b46c75944aedfcfa784a02 Mon Sep 17 00:00:00 2001
From: Edison Tung <edisontung@gmail.com>
Date: Mon, 21 Nov 2011 16:37:58 -0800
Subject: [PATCH] Added KMeans examples

LocalKMeans runs locally with a randomly generated dataset.
SparkLocalKMeans takes an input file and runs KMeans on it.
---
 .../scala/spark/examples/LocalKMeans.scala    | 80 +++++++++++++++++++
 .../spark/examples/SparkLocalKMeans.scala     | 73 +++++++++++++++++
 2 files changed, 153 insertions(+)
 create mode 100644 examples/src/main/scala/spark/examples/LocalKMeans.scala
 create mode 100644 examples/src/main/scala/spark/examples/SparkLocalKMeans.scala

diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
new file mode 100644
index 0000000000..7e8e7a6959
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -0,0 +1,80 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+import spark.SparkContext
+import spark.SparkContext._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+object LocalKMeans {
+	val N = 1000
+	val R = 1000   	// Scaling factor
+	val D = 10
+	val K = 10
+	val convergeDist = 0.001
+	val rand = new Random(42)
+  	
+	def generateData = {
+	    def generatePoint(i: Int) = {
+	      Vector(D, _ => rand.nextDouble * R)
+	    }
+	    Array.tabulate(N)(generatePoint)
+	  }
+	
+	def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+		var index = 0
+		var bestIndex = 0
+		var closest = Double.PositiveInfinity
+	
+		for (i <- 1 to centers.size) {
+			val vCurr = centers.get(i).get
+			val tempDist = p.squaredDist(vCurr)
+			if (tempDist < closest) {
+				closest = tempDist
+				bestIndex = i
+			}
+		}
+	
+		return bestIndex
+	}
+
+	def main(args: Array[String]) {
+	  val data = generateData
+		var points = new HashSet[Vector]
+		var kPoints = new HashMap[Int, Vector]
+		var tempDist = 1.0
+		
+		while (points.size < K) {
+			points.add(data(rand.nextInt(N)))
+		}
+		
+		val iter = points.iterator
+		for (i <- 1 to points.size) {
+			kPoints.put(i, iter.next())
+		}
+
+		println("Initial centers: " + kPoints)
+
+		while(tempDist > convergeDist) {
+			var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+			
+			var mappings = closest.groupBy[Int] (x => x._1)
+			
+			var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+			
+			var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
+			
+			tempDist = 0.0
+			for (mapping <- newPoints) {
+				tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+			}
+			
+			for (newP <- newPoints) {
+				kPoints.put(newP._1, newP._2)
+			}
+		}
+
+		println("Final centers: " + kPoints)
+	}
+}
diff --git a/examples/src/main/scala/spark/examples/SparkLocalKMeans.scala b/examples/src/main/scala/spark/examples/SparkLocalKMeans.scala
new file mode 100644
index 0000000000..8d9527b7c1
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkLocalKMeans.scala
@@ -0,0 +1,73 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+import spark.SparkContext
+import spark.SparkContext._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+object SparkLocalKMeans {
+	val R = 1000   	// Scaling factor
+	val rand = new Random(42)
+  	
+	def parseVector(line: String): Vector = {
+	    return new Vector(line.split(' ').map(_.toDouble))
+	}
+	
+	def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+		var index = 0
+		var bestIndex = 0
+		var closest = Double.PositiveInfinity
+	
+		for (i <- 1 to centers.size) {
+			val vCurr = centers.get(i).get
+			val tempDist = p.squaredDist(vCurr)
+			if (tempDist < closest) {
+				closest = tempDist
+				bestIndex = i
+			}
+		}
+	
+		return bestIndex
+	}
+
+	def main(args: Array[String]) {
+		if (args.length < 4) {
+	      System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
+	      System.exit(1)
+	    }
+	    val sc = new SparkContext(args(0), "SparkLocalKMeans")
+	    val lines = sc.textFile(args(1))
+	    val data = lines.map(parseVector _).cache()
+	   	val K = args(2).toInt
+	    val convergeDist = args(3).toDouble
+	
+		var points = data.sample(false, (K+1)/data.count().toDouble, 42).collect
+		var kPoints = new HashMap[Int, Vector]
+		var tempDist = 1.0
+		
+		for (i <- 1 to points.size) {
+			kPoints.put(i, points(i-1))
+		}
+
+		while(tempDist > convergeDist) {
+			var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+			
+			var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)}
+			
+			var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}.collect()
+			
+			tempDist = 0.0
+			for (mapping <- newPoints) {
+				tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+			}
+			
+			for (newP <- newPoints) {
+				kPoints.put(newP._1, newP._2)
+			}
+		}
+
+		println("Final centers: " + kPoints)
+	}
+}
-- 
GitLab