Skip to content
Snippets Groups Projects
Commit 3b9d9de5 authored by Edison Tung's avatar Edison Tung
Browse files

Added KMeans examples

LocalKMeans runs locally with a randomly generated dataset.
SparkLocalKMeans takes an input file and runs KMeans on it.
parent 07532021
No related branches found
No related tags found
No related merge requests found
package spark.examples
import java.util.Random
import Vector._
import spark.SparkContext
import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
object LocalKMeans {
val N = 1000
val R = 1000 // Scaling factor
val D = 10
val K = 10
val convergeDist = 0.001
val rand = new Random(42)
def generateData = {
def generatePoint(i: Int) = {
Vector(D, _ => rand.nextDouble * R)
}
Array.tabulate(N)(generatePoint)
}
def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}
return bestIndex
}
def main(args: Array[String]) {
val data = generateData
var points = new HashSet[Vector]
var kPoints = new HashMap[Int, Vector]
var tempDist = 1.0
while (points.size < K) {
points.add(data(rand.nextInt(N)))
}
val iter = points.iterator
for (i <- 1 to points.size) {
kPoints.put(i, iter.next())
}
println("Initial centers: " + kPoints)
while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
var mappings = closest.groupBy[Int] (x => x._1)
var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
tempDist = 0.0
for (mapping <- newPoints) {
tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
}
for (newP <- newPoints) {
kPoints.put(newP._1, newP._2)
}
}
println("Final centers: " + kPoints)
}
}
package spark.examples
import java.util.Random
import Vector._
import spark.SparkContext
import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
object SparkLocalKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)
def parseVector(line: String): Vector = {
return new Vector(line.split(' ').map(_.toDouble))
}
def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}
return bestIndex
}
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLocalKMeans")
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
val convergeDist = args(3).toDouble
var points = data.sample(false, (K+1)/data.count().toDouble, 42).collect
var kPoints = new HashMap[Int, Vector]
var tempDist = 1.0
for (i <- 1 to points.size) {
kPoints.put(i, points(i-1))
}
while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)}
var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}.collect()
tempDist = 0.0
for (mapping <- newPoints) {
tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
}
for (newP <- newPoints) {
kPoints.put(newP._1, newP._2)
}
}
println("Final centers: " + kPoints)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment