Skip to content
Snippets Groups Projects
Commit bf1311e6 authored by shivaram's avatar shivaram
Browse files

Merge pull request #678 from mateiz/ml-examples

Start of ML package
parents 6ad85d09 8bbe9075
No related branches found
No related tags found
No related merge requests found
Showing
with 2571 additions and 20 deletions
......@@ -15,6 +15,7 @@ set CORE_DIR=%FWDIR%core
set REPL_DIR=%FWDIR%repl
set EXAMPLES_DIR=%FWDIR%examples
set BAGEL_DIR=%FWDIR%bagel
set MLLIB_DIR=%FWDIR%mllib
set STREAMING_DIR=%FWDIR%streaming
set PYSPARK_DIR=%FWDIR%python
......@@ -29,6 +30,7 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%MLLIB_DIR%\target\scala-%SCALA_VERSION%\classes
rem Add hadoop conf dir - else FileSystem.*, etc fail
rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
......
......@@ -18,6 +18,7 @@ REPL_DIR="$FWDIR/repl"
REPL_BIN_DIR="$FWDIR/repl-bin"
EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
MLLIB_DIR="$FWDIR/mllib"
STREAMING_DIR="$FWDIR/streaming"
PYSPARK_DIR="$FWDIR/python"
......@@ -49,6 +50,7 @@ if [ -e $REPL_BIN_DIR/target ]; then
CLASSPATH+=":$EXAMPLES_JAR"
fi
CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
......
......@@ -105,6 +105,9 @@ abstract class RDD[T: ClassManifest](
// Methods and fields available on all RDDs
// =======================================================================
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
......@@ -119,7 +122,7 @@ abstract class RDD[T: ClassManifest](
/** User-defined generator of this RDD*/
var generator = Utils.getCallSiteInfo.firstUserClass
/** Reset generator*/
def setGenerator(_generator: String) = {
generator = _generator
......@@ -281,31 +284,35 @@ abstract class RDD[T: ClassManifest](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
val multiplier = 3.0
val initialCount = count()
var multiplier = 3.0
var initialCount = this.count()
var maxSelected = 0
if (num < 0) {
throw new IllegalArgumentException("Negative number of elements requested")
}
if (initialCount > Integer.MAX_VALUE - 1) {
maxSelected = Integer.MAX_VALUE - 1
} else {
maxSelected = initialCount.toInt
}
if (num > initialCount) {
if (num > initialCount && !withReplacement) {
total = maxSelected
fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
} else if (num < 0) {
throw(new IllegalArgumentException("Negative number of elements requested"))
fraction = multiplier * (maxSelected + 1) / initialCount
} else {
fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
fraction = multiplier * (num + 1) / initialCount
total = num
}
val rand = new Random(seed)
var samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// If the first sample didn't turn out large enough, keep trying to take samples;
// this shouldn't happen often because we use a big multiplier for thei initial size
while (samples.length < total) {
samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
}
Utils.randomizeInPlace(samples, rand).take(total)
......@@ -363,7 +370,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String, env: Map[String, String]): RDD[String] =
def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
......@@ -374,24 +381,24 @@ abstract class RDD[T: ClassManifest](
* @param command command to run in forked process.
* @param env environment variables to set.
* @param printPipeContext Before piping elements, this function is called as an oppotunity
* to pipe context data. Print line function (like out.println) will be
* to pipe context data. Print line function (like out.println) will be
* passed as printPipeContext's parameter.
* @param printRDDElement Use this function to customize how to pipe elements. This function
* will be called with each RDD element as the 1st parameter, and the
* @param printRDDElement Use this function to customize how to pipe elements. This function
* will be called with each RDD element as the 1st parameter, and the
* print line function (like out.println()) as the 2nd parameter.
* An example of pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the elements:
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
* @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
/**
......
......@@ -251,4 +251,37 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(topK.size === 2)
assert(topK.sorted === Array("b", "a"))
}
test("takeSample") {
val data = sc.parallelize(1 to 100, 2)
for (seed <- 1 to 5) {
val sample = data.takeSample(withReplacement=false, 20, seed)
assert(sample.size === 20) // Got exactly 20 elements
assert(sample.toSet.size === 20) // Elements are distinct
assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
}
for (seed <- 1 to 5) {
val sample = data.takeSample(withReplacement=false, 200, seed)
assert(sample.size === 100) // Got only 100 elements
assert(sample.toSet.size === 100) // Elements are distinct
assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
}
for (seed <- 1 to 5) {
val sample = data.takeSample(withReplacement=true, 20, seed)
assert(sample.size === 20) // Got exactly 20 elements
assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
}
for (seed <- 1 to 5) {
val sample = data.takeSample(withReplacement=true, 100, seed)
assert(sample.size === 100) // Got exactly 100 elements
// Chance of getting all distinct elements is astronomically low, so test we got < 100
assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
}
for (seed <- 1 to 5) {
val sample = data.takeSample(withReplacement=true, 200, seed)
assert(sample.size === 200) // Got exactly 200 elements
// Chance of getting all distinct elements is still quite low, so test we got < 100
assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
}
}
}
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
This diff is collapsed.
-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541
-0.1625189,-2.16691708463163 -0.807993896938655 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.3715636,-0.507874475300631 -0.458834049396776 -0.250631301876899 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.7654678,-2.03612849966376 -0.933954647105133 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.8544153,-0.557312518810673 -0.208756571683607 -0.787896192088153 0.990146852537193 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.2669476,-0.929360463147704 -0.0578991819441687 0.152317365781542 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.2669476,-2.28833047634983 -0.0706369432557794 -0.116315079324086 0.80409888772376 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.2669476,0.223498042876113 -1.41471935455355 -0.116315079324086 -1.02470580167082 -0.522940888712441 -0.29928234305568 0.342627053981254 0.199211097885341
1.3480731,0.107785900236813 -1.47221551299731 0.420949810887169 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.687186906466865
1.446919,0.162180092313795 -1.32557369901905 0.286633588334355 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.4701758,-1.49795329918548 -0.263601072284232 0.823898478545609 0.788388310173035 -0.522940888712441 -0.29928234305568 0.342627053981254 0.199211097885341
1.4929041,0.796247055396743 0.0476559407005752 0.286633588334355 -1.02470580167082 -0.522940888712441 0.394013435896129 -1.04215728919298 -0.864466507337306
1.5581446,-1.62233848461465 -0.843294091975396 -3.07127197548598 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.5993876,-0.990720665490831 0.458513517212311 0.823898478545609 1.07379746308195 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.6389967,-0.171901281967138 -0.489197399065355 -0.65357996953534 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.6956156,-1.60758252338831 -0.590700340358265 -0.65357996953534 -0.619561070667254 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.7137979,0.366273918511144 -0.414014962912583 -0.116315079324086 0.232904453212813 -0.522940888712441 0.971228997418125 0.342627053981254 1.26288870310799
1.8000583,-0.710307384579833 0.211731938156277 0.152317365781542 -1.02470580167082 -0.522940888712441 -0.442797990776478 0.342627053981254 1.61744790484887
1.8484548,-0.262791728113881 -1.16708345615721 0.420949810887169 0.0846342590816532 -0.522940888712441 0.163172393491611 0.342627053981254 1.97200710658975
1.8946169,0.899043117369237 -0.590700340358265 0.152317365781542 -1.02470580167082 -0.522940888712441 1.28643254437683 -1.04215728919298 -0.864466507337306
1.9242487,-0.903451690500615 1.07659722048274 0.152317365781542 1.28380453408541 -0.522940888712441 -0.442797990776478 -1.04215728919298 -0.864466507337306
2.008214,-0.0633337899773081 -1.38088970920094 0.958214701098423 0.80409888772376 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
2.0476928,-1.15393789990757 -0.961853075398404 -0.116315079324086 -1.02470580167082 -0.522940888712441 -0.442797990776478 -1.04215728919298 -0.864466507337306
2.1575593,0.0620203721138446 0.0657973885499142 1.22684714620405 -0.468824786336838 -0.522940888712441 1.31421001659859 1.72741139715549 -0.332627704725983
2.1916535,-0.75731027755674 -2.92717970468456 0.018001143228728 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.332627704725983
2.2137539,1.11226993252773 1.06484916245061 0.555266033439982 0.877691038550889 1.89254797819741 1.43890404648442 0.342627053981254 0.376490698755783
2.2772673,-0.468768642850639 -1.43754788774533 -1.05652863719378 0.576050411655607 -0.522940888712441 0.0120483832567209 0.342627053981254 -0.687186906466865
2.2975726,-0.618884859896728 -1.1366360750781 -0.519263746982526 -1.02470580167082 -0.522940888712441 -0.863171185425945 3.11219574032972 1.97200710658975
2.3272777,-0.651431999123483 0.55329161145762 -0.250631301876899 1.11210019001038 -0.522940888712441 -0.179808625688859 -1.04215728919298 -0.864466507337306
2.5217206,0.115499102435224 -0.512233676577595 0.286633588334355 1.13650173283446 -0.522940888712441 -0.179808625688859 0.342627053981254 -0.155348103855541
2.5533438,0.266341329949937 -0.551137885443386 -0.384947524429713 0.354857790686005 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.332627704725983
2.5687881,1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152
2.6567569,-0.218972367124187 0.851192298581141 0.555266033439982 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 0.908329501367106
2.677591,0.263121415733908 1.4142681068416 0.018001143228728 1.35980653053822 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
2.7180005,-0.0704736333296423 1.52000996595417 0.286633588334355 1.39364261119802 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.332627704725983
2.7942279,-0.751957286017338 0.316843561689933 -1.99674219506348 0.911736065044475 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
2.8063861,-0.685277652430997 1.28214038482516 0.823898478545609 0.232904453212813 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541
2.8124102,-0.244991501432929 0.51882005949686 -0.384947524429713 0.823246560137838 -0.522940888712441 -0.863171185425945 0.342627053981254 0.553770299626224
2.8419982,-0.75731027755674 2.09041984898851 1.22684714620405 1.53428167116843 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
2.8535925,1.20962937075363 -0.242882661178889 1.09253092365124 -1.02470580167082 -0.522940888712441 1.24263233939889 3.11219574032972 2.50384590920108
2.9204698,0.570886990493502 0.58243883987948 0.555266033439982 1.16006887775962 -0.522940888712441 1.07357183940747 0.342627053981254 1.61744790484887
2.9626924,0.719758684343624 0.984970304132004 1.09253092365124 1.52137230773457 -0.522940888712441 -0.179808625688859 0.342627053981254 -0.509907305596424
2.9626924,-1.52406140158064 1.81975700990333 0.689582255992796 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
2.9729753,-0.132431544081234 2.68769877553723 1.09253092365124 1.53428167116843 -0.522940888712441 -0.442797990776478 0.342627053981254 -0.687186906466865
3.0130809,0.436161292804989 -0.0834447307428255 -0.519263746982526 -1.02470580167082 1.89254797819741 1.07357183940747 0.342627053981254 1.26288870310799
3.0373539,-0.161195191984091 -0.671900359186746 1.7641120364153 1.13650173283446 -0.522940888712441 -0.863171185425945 0.342627053981254 0.0219314970149
3.2752562,1.39927182372944 0.513852869452676 0.689582255992796 -1.02470580167082 1.89254797819741 1.49394503405693 0.342627053981254 -0.155348103855541
3.3375474,1.51967002306341 -0.852203755696565 0.555266033439982 -0.104527297798983 1.89254797819741 1.85927724828569 0.342627053981254 0.908329501367106
3.3928291,0.560725834706224 1.87867703391426 1.09253092365124 1.39364261119802 -0.522940888712441 0.486423065822545 0.342627053981254 1.26288870310799
3.4355988,1.00765532502814 1.69426310090641 1.89842825896812 1.53428167116843 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.509907305596424
3.4578927,1.10152996153577 -0.10927271844907 0.689582255992796 -1.02470580167082 1.89254797819741 1.97630171771485 0.342627053981254 1.61744790484887
3.5160131,0.100001934217311 -1.30380956369388 0.286633588334355 0.316555063757567 -0.522940888712441 0.28786643052924 0.342627053981254 0.553770299626224
3.5307626,0.987291634724086 -0.36279314978779 -0.922212414640967 0.232904453212813 -0.522940888712441 1.79270085261407 0.342627053981254 1.26288870310799
3.5652984,1.07158528137575 0.606453149641961 1.7641120364153 -0.432854616994416 1.89254797819741 0.528504607720369 0.342627053981254 0.199211097885341
3.5876769,0.180156323255198 0.188987436375017 -0.519263746982526 1.09956763075594 -0.522940888712441 0.708239632330506 0.342627053981254 0.199211097885341
3.6309855,1.65687973755377 -0.256675483533719 0.018001143228728 -1.02470580167082 1.89254797819741 1.79270085261407 0.342627053981254 1.26288870310799
3.6800909,0.5720085322365 0.239854450210939 -0.787896192088153 1.0605418233138 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
3.7123518,0.323806133438225 -0.606717660886078 -0.250631301876899 -1.02470580167082 1.89254797819741 0.342907418101747 0.342627053981254 0.199211097885341
3.9843437,1.23668206715898 2.54220539083611 0.152317365781542 -1.02470580167082 1.89254797819741 1.89037692416194 0.342627053981254 1.26288870310799
3.993603,0.180156323255198 0.154448192444669 1.62979581386249 0.576050411655607 1.89254797819741 0.708239632330506 0.342627053981254 1.79472750571931
4.029806,1.60906277046565 1.10378605019827 0.555266033439982 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
4.1295508,1.0036214996026 0.113496885050331 -0.384947524429713 0.860016436332751 1.89254797819741 -0.863171185425945 0.342627053981254 -0.332627704725983
4.3851468,1.25591974271076 0.577607033774471 0.555266033439982 -1.02470580167082 1.89254797819741 1.07357183940747 0.342627053981254 1.26288870310799
4.6844434,2.09650591351268 0.625488598331018 -2.66832330782754 -1.02470580167082 1.89254797819741 1.67954222367555 0.342627053981254 0.553770299626224
5.477509,1.30028987435881 0.338383613253713 0.555266033439982 1.00481276295349 1.89254797819741 1.24263233939889 0.342627053981254 1.97200710658975
package spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import spark.{SparkContext, RDD}
import spark.SparkContext._
import spark.Logging
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
/**
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
* they are executed together with joint passes over the data for efficiency.
*
* This is an iterative algorithm that will make multiple passes over the data, so any RDDs given
* to it should be cached by the user.
*/
class KMeans private (
var k: Int,
var maxIterations: Int,
var runs: Int,
var initializationMode: String,
var initializationSteps: Int,
var epsilon: Double)
extends Serializable with Logging
{
private type ClusterCenters = Array[Array[Double]]
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
/** Set the number of clusters to create (k). Default: 2. */
def setK(k: Int): KMeans = {
this.k = k
this
}
/** Set maximum number of iterations to run. Default: 20. */
def setMaxIterations(maxIterations: Int): KMeans = {
this.maxIterations = maxIterations
this
}
/**
* Set the initialization algorithm. This can be either "random" to choose random points as
* initial cluster centers, or "k-means||" to use a parallel variant of k-means++
* (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||.
*/
def setInitializationMode(initializationMode: String): KMeans = {
if (initializationMode != KMeans.RANDOM && initializationMode != KMeans.K_MEANS_PARALLEL) {
throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode)
}
this.initializationMode = initializationMode
this
}
/**
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
*/
def setRuns(runs: Int): KMeans = {
if (runs <= 0) {
throw new IllegalArgumentException("Number of runs must be positive")
}
this.runs = runs
this
}
/**
* Set the number of steps for the k-means|| initialization mode. This is an advanced
* setting -- the default of 5 is almost always enough. Default: 5.
*/
def setInitializationSteps(initializationSteps: Int): KMeans = {
if (initializationSteps <= 0) {
throw new IllegalArgumentException("Number of initialization steps must be positive")
}
this.initializationSteps = initializationSteps
this
}
/**
* Set the distance threshold within which we've consider centers to have converged.
* If all centers move less than this Euclidean distance, we stop iterating one run.
*/
def setEpsilon(epsilon: Double): KMeans = {
this.epsilon = epsilon
this
}
/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
*/
def train(data: RDD[Array[Double]]): KMeansModel = {
// TODO: check whether data is persistent; this needs RDD.storageLevel to be publicly readable
val sc = data.sparkContext
val centers = if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data)
}
val active = Array.fill(runs)(true)
val costs = Array.fill(runs)(0.0)
var activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
var iteration = 0
// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {
type WeightedPoint = (DoubleMatrix, Long)
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
(p1._1.addi(p2._1), p1._2 + p2._2)
}
val activeCenters = activeRuns.map(r => centers(r)).toArray
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
// Find the sum and count of points mapping to each center
val totalContribs = data.mapPartitions { points =>
val runs = activeCenters.length
val k = activeCenters(0).length
val dims = activeCenters(0)(0).length
val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
val counts = Array.fill(runs, k)(0L)
for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
val (bestCenter, cost) = KMeans.findClosest(centers, point)
costAccums(runIndex) += cost
sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
counts(runIndex)(bestCenter) += 1
}
val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
((i, j), (sums(i)(j), counts(i)(j)))
}
contribs.iterator
}.reduceByKey(mergeContribs).collectAsMap()
// Update the cluster centers and costs for each active run
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
for (j <- 0 until k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
val newCenter = sum.divi(count).data
if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
changed = true
}
centers(run)(j) = newCenter
}
}
if (!changed) {
active(run) = false
logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")
}
costs(run) = costAccums(i).value
}
activeRuns = activeRuns.filter(active(_))
iteration += 1
}
val bestRun = costs.zipWithIndex.min._2
new KMeansModel(centers(bestRun))
}
/**
* Initialize `runs` sets of cluster centers at random.
*/
private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new Random().nextInt())
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k))
}
/**
* Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al.
* (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
* to find with dissimilar cluster centers by starting with a random center and then doing
* passes where more centers are chosen with probability proportional to their squared distance
* to the current cluster set. It results in a provable approximation to an optimal clustering.
*
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/
private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = {
// Initialize each run's center to a random point
val seed = new Random().nextInt()
val sample = data.takeSample(true, runs, seed)
val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
// On each step, sample 2 * k points on average for each run with probability proportional
// to their squared distance from that run's current centers
for (step <- 0 until initializationSteps) {
val centerArrays = centers.map(_.toArray)
val sumCosts = data.flatMap { point =>
for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point))
}.reduceByKey(_ + _).collectAsMap()
val chosen = data.mapPartitionsWithIndex { (index, points) =>
val rand = new Random(seed ^ (step << 16) ^ index)
for {
p <- points
r <- 0 until runs
if rand.nextDouble() < KMeans.pointCost(centerArrays(r), p) * 2 * k / sumCosts(r)
} yield (r, p)
}.collect()
for ((r, p) <- chosen) {
centers(r) += p
}
}
// Finally, we might have a set of more than k candidate centers for each run; weigh each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick just k of them
val centerArrays = centers.map(_.toArray)
val weightMap = data.flatMap { p =>
for (r <- 0 until runs) yield ((r, KMeans.findClosest(centerArrays(r), p)._1), 1.0)
}.reduceByKey(_ + _).collectAsMap()
val finalCenters = (0 until runs).map { r =>
val myCenters = centers(r).toArray
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
}
finalCenters.toArray
}
}
/**
* Top-level methods for calling K-means clustering.
*/
object KMeans {
// Initialization mode names
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"
def train(
data: RDD[Array[Double]],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String)
: KMeansModel =
{
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.train(data)
}
def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}
def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}
/**
* Return the index of the closest point in `centers` to `point`, as well as its distance.
*/
private[mllib] def findClosest(centers: Array[Array[Double]], point: Array[Double])
: (Int, Double) =
{
var bestDistance = Double.PositiveInfinity
var bestIndex = 0
for (i <- 0 until centers.length) {
val distance = MLUtils.squaredDistance(point, centers(i))
if (distance < bestDistance) {
bestDistance = distance
bestIndex = i
}
}
(bestIndex, bestDistance)
}
/**
* Return the K-means cost of a given point against the given cluster centers.
*/
private[mllib] def pointCost(centers: Array[Array[Double]], point: Array[Double]): Double = {
var bestDistance = Double.PositiveInfinity
for (i <- 0 until centers.length) {
val distance = MLUtils.squaredDistance(point, centers(i))
if (distance < bestDistance) {
bestDistance = distance
}
}
bestDistance
}
def main(args: Array[String]) {
if (args.length != 4) {
println("Usage: KMeans <master> <input_file> <k> <max_iterations>")
System.exit(1)
}
val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
val sc = new SparkContext(master, "KMeans")
val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble))
val model = KMeans.train(data, k, iters)
val cost = model.computeCost(data)
println("Cluster centers:")
for (c <- model.clusterCenters) {
println(" " + c.mkString(" "))
}
println("Cost: " + cost)
System.exit(0)
}
}
package spark.mllib.clustering
import spark.RDD
import spark.SparkContext._
import spark.mllib.util.MLUtils
/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
*/
class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable {
/** Total number of clusters. */
def k: Int = clusterCenters.length
/** Return the cluster index that a given point belongs to. */
def predict(point: Array[Double]): Int = {
KMeans.findClosest(clusterCenters, point)._1
}
/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
*/
def computeCost(data: RDD[Array[Double]]): Double = {
data.map(p => KMeans.pointCost(clusterCenters, p)).sum
}
}
package spark.mllib.clustering
import scala.util.Random
import org.jblas.{DoubleMatrix, SimpleBlas}
/**
* An utility object to run K-means locally. This is private to the ML package because it's used
* in the initialization of KMeans but not meant to be publicly exposed.
*/
private[mllib] object LocalKMeans {
/**
* Run K-means++ on the weighted point set `points`. This first does the K-means++
* initialization procedure and then roudns of Lloyd's algorithm.
*/
def kMeansPlusPlus(
seed: Int,
points: Array[Array[Double]],
weights: Array[Double],
k: Int,
maxIterations: Int)
: Array[Array[Double]] =
{
val rand = new Random(seed)
val dimensions = points(0).length
val centers = new Array[Array[Double]](k)
// Initialize centers by sampling using the k-means++ procedure
centers(0) = pickWeighted(rand, points, weights)
for (i <- 1 until k) {
// Pick the next center with a probability proportional to cost under current centers
val curCenters = centers.slice(0, i)
val sum = points.zip(weights).map { case (p, w) =>
w * KMeans.pointCost(curCenters, p)
}.sum
val r = rand.nextDouble() * sum
var cumulativeScore = 0.0
var j = 0
while (j < points.length && cumulativeScore < r) {
cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
j += 1
}
centers(i) = points(j-1)
}
// Run up to maxIterations iterations of Lloyd's algorithm
val oldClosest = Array.fill(points.length)(-1)
var iteration = 0
var moved = true
while (moved && iteration < maxIterations) {
moved = false
val sums = Array.fill(k)(new DoubleMatrix(dimensions))
val counts = Array.fill(k)(0.0)
for ((p, i) <- points.zipWithIndex) {
val index = KMeans.findClosest(centers, p)._1
SimpleBlas.axpy(weights(i), new DoubleMatrix(p), sums(index))
counts(index) += weights(i)
if (index != oldClosest(i)) {
moved = true
oldClosest(i) = index
}
}
// Update centers
for (i <- 0 until k) {
if (counts(i) == 0.0) {
// Assign center to a random point
centers(i) = points(rand.nextInt(points.length))
} else {
centers(i) = sums(i).divi(counts(i)).data
}
}
iteration += 1
}
centers
}
private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = {
val r = rand.nextDouble() * weights.sum
var i = 0
var curWeight = 0.0
while (i < data.length && curWeight < r) {
curWeight += weights(i)
i += 1
}
data(i - 1)
}
}
package spark.mllib.optimization
import org.jblas.DoubleMatrix
abstract class Gradient extends Serializable {
/**
* Compute the gradient for a given row of data.
*
* @param data - One row of data. Row matrix of size 1xn where n is the number of features.
* @param label - Label for this data item.
* @param weights - Column matrix containing weights for every feature.
*/
def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
}
class LogisticGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
val margin: Double = -1.0 * data.dot(weights)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
val gradient = data.mul(gradientMultiplier)
val loss =
if (margin > 0) {
math.log(1 + math.exp(0 - margin))
} else {
math.log(1 + math.exp(margin)) - margin
}
(gradient, loss)
}
}
package spark.mllib.optimization
import spark.{Logging, RDD, SparkContext}
import spark.SparkContext._
import org.jblas.DoubleMatrix
import scala.collection.mutable.ArrayBuffer
object GradientDescent {
/**
* Run gradient descent in parallel using mini batches.
* Based on Matlab code written by John Duchi.
*
* @param data - Input data for SGD. RDD of form (label, [feature values]).
* @param gradient - Gradient object that will be used to compute the gradient.
* @param updater - Updater object that will be used to update the model.
* @param stepSize - stepSize to be used during update.
* @param numIters - number of iterations that SGD should be run.
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
* @return weights - Column matrix containing weights for every feature.
* @return lossHistory - Array containing the loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Array[Double])],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIters: Int,
miniBatchFraction: Double=1.0) : (DoubleMatrix, Array[Double]) = {
val lossHistory = new ArrayBuffer[Double](numIters)
val nfeatures: Int = data.take(1)(0)._2.length
val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction
// Initialize weights as a column matrix
var weights = DoubleMatrix.ones(nfeatures)
var reg_val = 0.0
for (i <- 1 to numIters) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
case (y, features) =>
val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
val (grad, loss) = gradient.compute(featuresRow, y, weights)
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
lossHistory.append(lossSum / miniBatchSize + reg_val)
val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i)
weights = update._1
reg_val = update._2
}
(weights, lossHistory.toArray)
}
}
package spark.mllib.optimization
import org.jblas.DoubleMatrix
abstract class Updater extends Serializable {
/**
* Compute an updated value for weights given the gradient, stepSize and iteration number.
*
* @param weightsOld - Column matrix of size nx1 where n is the number of features.
* @param gradient - Column matrix of size nx1 where n is the number of features.
* @param stepSize - step size across iterations
* @param iter - Iteration number
*
* @return weightsNew - Column matrix containing updated weights
* @return reg_val - regularization value
*/
def compute(weightsOlds: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int):
(DoubleMatrix, Double)
}
class SimpleUpdater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int): (DoubleMatrix, Double) = {
val normGradient = gradient.mul(stepSize / math.sqrt(iter))
(weightsOld.sub(normGradient), 0)
}
}
package spark.mllib.recommendation
import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.util.Random
import spark.{HashPartitioner, Partitioner, SparkContext, RDD}
import spark.storage.StorageLevel
import spark.SparkContext._
import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
/**
* Out-link information for a user or product block. This includes the original user/product IDs
* of the elements within this block, and the list of destination blocks that each user or
* product will need to send its feature vector to.
*/
private[recommendation] case class OutLinkBlock(
elementIds: Array[Int], shouldSend: Array[BitSet])
/**
* In-link information for a user (or product) block. This includes the original user/product IDs
* of the elements within this block, as well as an array of indices and ratings that specify
* which user in the block will be rated by which products from each product block (or vice-versa).
* Specifically, if this InLinkBlock is for users, ratingsForBlock(b)(i) will contain two arrays,
* indices and ratings, for the i'th product that will be sent to us by product block b (call this
* P). These arrays represent the users that product P had ratings for (by their index in this
* block), as well as the corresponding rating for each one. We can thus use this information when
* we get product block b's message to update the corresponding users.
*/
private[recommendation] case class InLinkBlock(
elementIds: Array[Int], ratingsForBlock: Array[Array[(Array[Int], Array[Double])]])
/**
* Alternating Least Squares matrix factorization.
*
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
* of factors (referred to as "users" and "products") into blocks and reduces communication by only
* sending one copy of each user vector to each product block on each iteration, and only for the
* product blocks that need that user's feature vector. This is achieved by precomputing some
* information about the ratings matrix to determine the "out-links" of each user (which blocks of
* products it will contribute to) and "in-link" information for each product (which of the feature
* vectors it receives from each user block it will depend on). This allows us to send only an
* array of feature vectors between each user block and product block, and have the product block
* find the users' ratings and update the products based on these messages.
*/
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double)
extends Serializable
{
def this() = this(-1, 10, 10, 0.01)
/**
* Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
* number of blocks. Default: -1.
*/
def setBlocks(numBlocks: Int): ALS = {
this.numBlocks = numBlocks
this
}
/** Set the rank of the feature matrices computed (number of features). Default: 10. */
def setRank(rank: Int): ALS = {
this.rank = rank
this
}
/** Set the number of iterations to run. Default: 10. */
def setIterations(iterations: Int): ALS = {
this.iterations = iterations
this
}
/** Set the regularization parameter, lambda. Default: 0.01. */
def setLambda(lambda: Double): ALS = {
this.lambda = lambda
this
}
/**
* Run ALS with the configured parmeters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = {
val numBlocks = if (this.numBlocks == -1) {
math.max(ratings.context.defaultParallelism, ratings.partitions.size)
} else {
this.numBlocks
}
val partitioner = new HashPartitioner(numBlocks)
val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, (u, p, r)) }
val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, (p, u, r)) }
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
// Initialize user and product factors randomly
val seed = new Random().nextInt()
var users = userOutLinks.mapValues(_.elementIds.map(u => randomFactor(rank, seed ^ u)))
var products = productOutLinks.mapValues(_.elementIds.map(p => randomFactor(rank, seed ^ ~p)))
for (iter <- 0 until iterations) {
// perform ALS update
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda)
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda)
}
// Flatten and cache the two final RDDs to un-block them
val usersOut = users.join(userOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
}
val productsOut = products.join(productOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
}
usersOut.persist()
productsOut.persist()
new MatrixFactorizationModel(rank, usersOut, productsOut)
}
/**
* Make the out-links table for a block of the users (or products) dataset given the list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
private def makeOutLinkBlock(numBlocks: Int, ratings: Array[(Int, Int, Double)]): OutLinkBlock = {
val userIds = ratings.map(_._1).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks))
for ((u, p, r) <- ratings) {
shouldSend(userIdToPos(u))(p % numBlocks) = true
}
OutLinkBlock(userIds, shouldSend)
}
/**
* Make the in-links table for a block of the users (or products) dataset given a list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
private def makeInLinkBlock(numBlocks: Int, ratings: Array[(Int, Int, Double)]): InLinkBlock = {
val userIds = ratings.map(_._1).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks)
for (productBlock <- 0 until numBlocks) {
val ratingsInBlock = ratings.filter(t => t._2 % numBlocks == productBlock)
val ratingsByProduct = ratingsInBlock.groupBy(_._2) // (p, Seq[(u, p, r)])
.toArray
.sortBy(_._1)
.map{case (p, rs) => (rs.map(t => userIdToPos(t._1)), rs.map(_._3))}
ratingsForBlock(productBlock) = ratingsByProduct
}
InLinkBlock(userIds, ratingsForBlock)
}
/**
* Make RDDs of InLinkBlocks and OutLinkBlocks given an RDD of (blockId, (u, p, r)) values for
* the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
* having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
*/
private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, (Int, Int, Double))])
: (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) =
{
val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
val ratings = elements.map(_._2).toArray
val inLinkBlock = makeInLinkBlock(numBlocks, ratings)
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
}, true)
links.persist(StorageLevel.MEMORY_AND_DISK)
(links.mapValues(_._1), links.mapValues(_._2))
}
/**
* Make a random factor vector with the given seed.
* TODO: Initialize things using mapPartitionsWithIndex to make it faster?
*/
private def randomFactor(rank: Int, seed: Int): Array[Double] = {
val rand = new Random(seed)
Array.fill(rank)(rand.nextDouble)
}
/**
* Compute the user feature vectors given the current products (or vice-versa). This first joins
* the products with their out-links to generate a set of messages to each destination block
* (specifically, the features for the products that user block cares about), then groups these
* by destination and joins them with the in-link info to figure out how to update each user.
* It returns an RDD of new feature vectors for each user block.
*/
private def updateFeatures(
products: RDD[(Int, Array[Array[Double]])],
productOutLinks: RDD[(Int, OutLinkBlock)],
userInLinks: RDD[(Int, InLinkBlock)],
partitioner: Partitioner,
rank: Int,
lambda: Double)
: RDD[(Int, Array[Array[Double]])] =
{
val numBlocks = products.partitions.size
productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) =>
val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numBlocks) {
if (outLinkBlock.shouldSend(p)(userBlock)) {
toSend(userBlock) += factors(p)
}
}
toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
}.groupByKey(partitioner)
.join(userInLinks)
.mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda) }
}
/**
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double)
: Array[Array[Double]] =
{
// Sort the incoming block factor messages by block ID and make them an array
val blockFactors = messages.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
val numBlocks = blockFactors.length
val numUsers = inLinkBlock.elementIds.length
// We'll sum up the XtXes using vectors that represent only the lower-triangular part, since
// the matrices are symmetric
val triangleSize = rank * (rank + 1) / 2
val userXtX = Array.fill(numUsers)(DoubleMatrix.zeros(triangleSize))
val userXy = Array.fill(numUsers)(DoubleMatrix.zeros(rank))
// Some temp variables to avoid memory allocation
val tempXtX = DoubleMatrix.zeros(triangleSize)
val fullXtX = DoubleMatrix.zeros(rank, rank)
// Compute the XtX and Xy values for each user by adding products it rated in each product block
for (productBlock <- 0 until numBlocks) {
for (p <- 0 until blockFactors(productBlock).length) {
val x = new DoubleMatrix(blockFactors(productBlock)(p))
fillXtX(x, tempXtX)
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
for (i <- 0 until us.length) {
userXtX(us(i)).addi(tempXtX)
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
}
}
}
// Solve the least-squares problem for each user and return the new feature vectors
userXtX.zipWithIndex.map{ case (triangularXtX, index) =>
// Compute the full XtX matrix from the lower-triangular part we got above
fillFullMatrix(triangularXtX, fullXtX)
// Add regularization
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
// Solve the resulting matrix, which is symmetric and positive-definite
Solve.solvePositive(fullXtX, userXy(index)).data
}
}
/**
* Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing
* these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values
* at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order.
*/
private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) {
var i = 0
var pos = 0
while (i < x.length) {
var j = 0
while (j <= i) {
xtxDest.data(pos) = x.data(i) * x.data(j)
pos += 1
j += 1
}
i += 1
}
}
/**
* Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
* matrix that it represents, storing it into destMatrix.
*/
private def fillFullMatrix(triangularMatrix: DoubleMatrix, destMatrix: DoubleMatrix) {
val rank = destMatrix.rows
var i = 0
var pos = 0
while (i < rank) {
var j = 0
while (j <= i) {
destMatrix.data(i*rank + j) = triangularMatrix.data(pos)
destMatrix.data(j*rank + i) = triangularMatrix.data(pos)
pos += 1
j += 1
}
i += 1
}
}
}
/**
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton.
*/
object ALS {
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
*/
def train(
ratings: RDD[(Int, Int, Double)],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int)
: MatrixFactorizationModel =
{
new ALS(blocks, rank, iterations, lambda).train(ratings)
}
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. The level of parallelism is determined
* automatically based on the number of partitions in `ratings`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
*/
def train(ratings: RDD[(Int, Int, Double)], rank: Int, iterations: Int, lambda: Double)
: MatrixFactorizationModel =
{
train(ratings, rank, iterations, lambda, -1)
}
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. The level of parallelism is determined
* automatically based on the number of partitions in `ratings`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
def train(ratings: RDD[(Int, Int, Double)], rank: Int, iterations: Int)
: MatrixFactorizationModel =
{
train(ratings, rank, iterations, 0.01, -1)
}
def main(args: Array[String]) {
if (args.length != 5) {
println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir>")
System.exit(1)
}
val (master, ratingsFile, rank, iters, outputDir) =
(args(0), args(1), args(2).toInt, args(3).toInt, args(4))
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')
(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}
val model = ALS.train(ratings, rank, iters)
model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
.saveAsTextFile(outputDir + "/userFeatures")
model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
.saveAsTextFile(outputDir + "/productFeatures")
println("Final user/product features written to " + outputDir)
System.exit(0)
}
}
package spark.mllib.recommendation
import spark.RDD
import spark.SparkContext._
import org.jblas._
class MatrixFactorizationModel(
val rank: Int,
val userFeatures: RDD[(Int, Array[Double])],
val productFeatures: RDD[(Int, Array[Double])])
extends Serializable
{
/** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double = {
val userVector = new DoubleMatrix(userFeatures.lookup(user).head)
val productVector = new DoubleMatrix(productFeatures.lookup(product).head)
userVector.dot(productVector)
}
// TODO: Figure out what good bulk prediction methods would look like.
// Probably want a way to get the top users for a product or vice-versa.
}
package spark.mllib.regression
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
/**
* Logistic Regression using Stochastic Gradient Descent.
* Based on Matlab code written by John Duchi.
*/
class LogisticRegressionModel(
val weights: DoubleMatrix,
val intercept: Double,
val losses: Array[Double]) extends RegressionModel {
override def predict(testData: spark.RDD[Array[Double]]) = {
testData.map { x =>
val margin = new DoubleMatrix(1, x.length, x:_*).mmul(this.weights).get(0) + this.intercept
1.0/ (1.0 + math.exp(margin * -1))
}
}
override def predict(testData: Array[Double]): Double = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
val margin = dataMat.mmul(this.weights).get(0) + this.intercept
1.0/ (1.0 + math.exp(margin * -1))
}
}
class LogisticRegression private (var stepSize: Double, var miniBatchFraction: Double,
var numIters: Int)
extends Logging {
/**
* Construct a LogisticRegression object with default parameters
*/
def this() = this(1.0, 1.0, 100)
/**
* Set the step size per-iteration of SGD. Default 1.0.
*/
def setStepSize(step: Double) = {
this.stepSize = step
this
}
/**
* Set fraction of data to be used for each SGD iteration. Default 1.0.
*/
def setMiniBatchFraction(fraction: Double) = {
this.miniBatchFraction = fraction
this
}
/**
* Set the number of iterations for SGD. Default 100.
*/
def setNumIterations(iters: Int) = {
this.numIters = iters
this
}
def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = {
// Add a extra variable consisting of all 1.0's for the intercept.
val data = input.map { case (y, features) =>
(y, Array(1.0, features:_*))
}
val (weights, losses) = GradientDescent.runMiniBatchSGD(
data, new LogisticGradient(), new SimpleUpdater(), stepSize, numIters, miniBatchFraction)
val weightsScaled = weights.getRange(1, weights.length)
val intercept = weights.get(0)
val model = new LogisticRegressionModel(weightsScaled, intercept, losses)
logInfo("Final model weights " + model.weights)
logInfo("Final model intercept " + model.intercept)
logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", "))
model
}
}
/**
* Top-level methods for calling Logistic Regression.
*/
object LogisticRegression {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[(Double, Array[Double])],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input)
}
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. We use the entire data set to update
* the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
* @param numIterations Number of iterations of gradient descent to run.
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[(Double, Array[Double])],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
{
train(input, numIterations, stepSize, 1.0)
}
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using a step size of 1.0. We use the entire data set to update
* the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[(Double, Array[Double])],
numIterations: Int)
: LogisticRegressionModel =
{
train(input, numIterations, 1.0, 1.0)
}
def main(args: Array[String]) {
if (args.length != 4) {
println("Usage: LogisticRegression <master> <input_dir> <step_size> <niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadData(sc, args(1))
val model = LogisticRegression.train(data, args(3).toInt, args(2).toDouble)
sc.stop()
}
}
package spark.mllib.regression
import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
import spark.mllib.util.MLUtils
object LogisticRegressionGenerator {
def main(args: Array[String]) {
if (args.length != 5) {
println("Usage: LogisticRegressionGenerator " +
"<master> <output_dir> <num_examples> <num_features> <num_partitions>")
System.exit(1)
}
val sparkMaster: String = args(0)
val outputPath: String = args(1)
val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
val parts: Int = if (args.length > 4) args(4).toInt else 2
val eps = 3
val sc = new SparkContext(sparkMaster, "LogisticRegressionGenerator")
val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
val y = if (idx % 2 == 0) 0 else 1
val x = Array.fill[Double](nfeatures) {
rnd.nextGaussian() + (y * eps)
}
(y, x)
}
MLUtils.saveData(data, outputPath)
sc.stop()
}
}
package spark.mllib.regression
import spark.RDD
trait RegressionModel {
/**
* Predict values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Array[Double]]): RDD[Double]
/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/
def predict(testData: Array[Double]): Double
}
package spark.mllib.regression
import spark.{Logging, RDD, SparkContext}
import spark.SparkContext._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
import org.jblas.Solve
/**
* Ridge Regression from Joseph Gonzalez's implementation in MLBase
*/
class RidgeRegressionModel(
val weights: DoubleMatrix,
val intercept: Double,
val lambdaOpt: Double,
val lambdas: List[(Double, Double, DoubleMatrix)])
extends RegressionModel {
override def predict(testData: RDD[Array[Double]]): RDD[Double] = {
testData.map { x =>
(new DoubleMatrix(1, x.length, x:_*).mmul(this.weights)).get(0) + this.intercept
}
}
override def predict(testData: Array[Double]): Double = {
(new DoubleMatrix(1, testData.length, testData:_*).mmul(this.weights)).get(0) + this.intercept
}
}
class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
extends Logging {
def this() = this(0.0, 100.0)
/**
* Set the lower bound on binary search for lambda's. Default is 0.
*/
def setLowLambda(low: Double) = {
this.lambdaLow = low
this
}
/**
* Set the upper bound on binary search for lambda's. Default is 100.0.
*/
def setHighLambda(hi: Double) = {
this.lambdaHigh = hi
this
}
def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = {
val nfeatures: Int = input.take(1)(0)._2.length
val nexamples: Long = input.count()
val (yMean, xColMean, xColSd) = MLUtils.computeStats(input, nfeatures, nexamples)
val data = input.map { case(y, features) =>
val yNormalized = y - yMean
val featuresMat = new DoubleMatrix(nfeatures, 1, features:_*)
val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
(yNormalized, featuresNormalized.toArray)
}
// Compute XtX - Size of XtX is nfeatures by nfeatures
val XtX: DoubleMatrix = data.map { case (y, features) =>
val x = new DoubleMatrix(1, features.length, features:_*)
x.transpose().mmul(x)
}.reduce(_.addi(_))
// Compute Xt*y - Size of Xty is nfeatures by 1
val Xty: DoubleMatrix = data.map { case (y, features) =>
new DoubleMatrix(features.length, 1, features:_*).mul(y)
}.reduce(_.addi(_))
// Define a function to compute the leave one out cross validation error
// for a single example
def crossValidate(lambda: Double): (Double, Double, DoubleMatrix) = {
// Compute the MLE ridge regression parameter value
// Ridge Regression parameter = inv(XtX + \lambda*I) * Xty
val XtXlambda = DoubleMatrix.eye(nfeatures).muli(lambda).addi(XtX)
val w = Solve.solveSymmetric(XtXlambda, Xty)
val invXtX = Solve.solveSymmetric(XtXlambda, DoubleMatrix.eye(nfeatures))
// compute the generalized cross validation score
val cverror = data.map {
case (y, features) =>
val x = new DoubleMatrix(features.length, 1, features:_*)
val yhat = w.transpose().mmul(x).get(0)
val H_ii = x.transpose().mmul(invXtX).mmul(x).get(0)
val residual = (y - yhat) / (1.0 - H_ii)
residual * residual
}.reduce(_ + _) / nexamples
(lambda, cverror, w)
}
// Binary search for the best assignment to lambda.
def binSearch(low: Double, high: Double): List[(Double, Double, DoubleMatrix)] = {
val mid = (high - low) / 2 + low
val lowValue = crossValidate((mid - low) / 2 + low)
val highValue = crossValidate((high - mid) / 2 + mid)
val (newLow, newHigh) = if (lowValue._2 < highValue._2) {
(low, mid + (high-low)/4)
} else {
(mid - (high-low)/4, high)
}
if (newHigh - newLow > 1.0E-7) {
// :: is list prepend in Scala.
lowValue :: highValue :: binSearch(newLow, newHigh)
} else {
List(lowValue, highValue)
}
}
// Actually compute the best lambda
val lambdas = binSearch(lambdaLow, lambdaHigh).sortBy(_._1)
// Find the best parameter set by taking the lowest cverror.
val (lambdaOpt, cverror, weights) = lambdas.reduce((a, b) => if (a._2 < b._2) a else b)
// Return the model which contains the solution
val weightsScaled = weights.div(xColSd)
val intercept = yMean - (weights.transpose().mmul(xColMean.div(xColSd)).get(0))
val model = new RidgeRegressionModel(weightsScaled, intercept, lambdaOpt, lambdas)
logInfo("RidgeRegression: optimal lambda " + model.lambdaOpt)
logInfo("RidgeRegression: optimal weights " + model.weights)
logInfo("RidgeRegression: optimal intercept " + model.intercept)
logInfo("RidgeRegression: cross-validation error " + cverror)
model
}
}
/**
* Top-level methods for calling Ridge Regression.
*/
object RidgeRegression {
/**
* Train a ridge regression model given an RDD of (response, features) pairs.
* We use the closed form solution to compute the cross-validation score for
* a given lambda. The optimal lambda is computed by performing binary search
* between the provided bounds of lambda.
*
* @param input RDD of (response, array of features) pairs.
* @param lambdaLow lower bound used in binary search for lambda
* @param lambdaHigh upper bound used in binary search for lambda
*/
def train(
input: RDD[(Double, Array[Double])],
lambdaLow: Double,
lambdaHigh: Double)
: RidgeRegressionModel =
{
new RidgeRegression(lambdaLow, lambdaHigh).train(input)
}
/**
* Train a ridge regression model given an RDD of (response, features) pairs.
* We use the closed form solution to compute the cross-validation score for
* a given lambda. The optimal lambda is computed by performing binary search
* between lambda values of 0 and 100.
*
* @param input RDD of (response, array of features) pairs.
*/
def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = {
train(input, 0.0, 100.0)
}
def main(args: Array[String]) {
if (args.length != 2) {
println("Usage: RidgeRegression <master> <input_dir>")
System.exit(1)
}
val sc = new SparkContext(args(0), "RidgeRegression")
val data = MLUtils.loadData(sc, args(1))
val model = RidgeRegression.train(data, 0, 1000)
sc.stop()
}
}
package spark.mllib.regression
import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
import spark.mllib.util.MLUtils
object RidgeRegressionGenerator {
def main(args: Array[String]) {
if (args.length != 5) {
println("Usage: RidgeRegressionGenerator " +
"<master> <output_dir> <num_examples> <num_features> <num_partitions>")
System.exit(1)
}
val sparkMaster: String = args(0)
val outputPath: String = args(1)
val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
val nfeatures: Int = if (args.length > 3) args(3).toInt else 100
val parts: Int = if (args.length > 4) args(4).toInt else 2
val eps = 10
org.jblas.util.Random.seed(42)
val sc = new SparkContext(sparkMaster, "RidgeRegressionGenerator")
// Random values distributed uniformly in [-0.5, 0.5]
val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
w.put(0, 0, 10)
w.put(1, 0, 10)
val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until parts, parts).flatMap { p =>
org.jblas.util.Random.seed(42 + p)
val examplesInPartition = nexamples / parts
val X = DoubleMatrix.rand(examplesInPartition, nfeatures)
val y = X.mmul(w)
val rnd = new Random(42 + p)
val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps)
val yObs = new DoubleMatrix(normalValues).addi(y)
Iterator.tabulate(examplesInPartition) { i =>
(yObs.get(i, 0), X.getRow(i).toArray)
}
}
MLUtils.saveData(data, outputPath)
sc.stop()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment