Skip to content
Snippets Groups Projects
Commit 70f6f964 authored by Xin Ren's avatar Xin Ren Committed by Xiangrui Meng
Browse files

[SPARK-13013][DOCS] Replace example code in mllib-clustering.md using include_example

Replace example code in mllib-clustering.md using include_example
https://issues.apache.org/jira/browse/SPARK-13013

The example code in the user guide is embedded in the markdown and hence it is not easy to test. It would be nice to automatically test them. This JIRA is to discuss options to automate example code testing and see what we can do in Spark 1.6.

Goal is to move actual example code to spark/examples and test compilation in Jenkins builds. Then in the markdown, we can reference part of the code to show in the user guide. This requires adding a Jekyll tag that is similar to https://github.com/jekyll/jekyll/blob/master/lib/jekyll/tags/include.rb, e.g., called include_example.
`{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}`
Jekyll will find `examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala` and pick code blocks marked "example" and replace code block in
`{% highlight %}`
 in the markdown.

See more sub-tasks in parent ticket: https://issues.apache.org/jira/browse/SPARK-11337

Author: Xin Ren <iamshrek@126.com>

Closes #11116 from keypointt/SPARK-13013.
parent 645c3a85
No related branches found
No related tags found
No related merge requests found
Showing
with 715 additions and 447 deletions
(1.0), [1.7, 0.4, 0.9]
(2.0), [2.2, 1.8, 0.0]
...@@ -49,27 +49,7 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph. ...@@ -49,27 +49,7 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
Refer to the [`KMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeans) and [`KMeansModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel) for details on the API. Refer to the [`KMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeans) and [`KMeansModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel) for details on the API.
{% highlight scala %} {% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
// Save and load model
clusters.save(sc, "myModelPath")
val sameModel = KMeansModel.load(sc, "myModelPath")
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
...@@ -81,51 +61,7 @@ that is equivalent to the provided example in Scala is given below: ...@@ -81,51 +61,7 @@ that is equivalent to the provided example in Scala is given below:
Refer to the [`KMeans` Java docs](api/java/org/apache/spark/mllib/clustering/KMeans.html) and [`KMeansModel` Java docs](api/java/org/apache/spark/mllib/clustering/KMeansModel.html) for details on the API. Refer to the [`KMeans` Java docs](api/java/org/apache/spark/mllib/clustering/KMeans.html) and [`KMeansModel` Java docs](api/java/org/apache/spark/mllib/clustering/KMeansModel.html) for details on the API.
{% highlight java %} {% include_example java/org/apache/spark/examples/mllib/JavaKMeansExample.java %}
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;
public class KMeansExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("K-means Example");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(sc.sc(), "myModelPath");
KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");
}
}
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
...@@ -138,27 +74,7 @@ fact the optimal *k* is usually one where there is an "elbow" in the WSSSE graph ...@@ -138,27 +74,7 @@ fact the optimal *k* is usually one where there is an "elbow" in the WSSSE graph
Refer to the [`KMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeans) and [`KMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel) for more details on the API. Refer to the [`KMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeans) and [`KMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel) for more details on the API.
{% highlight python %} {% include_example python/mllib/k_means_example.py %}
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
runs=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
WSSSE = clusters.computeCost(parsedData)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "myModelPath")
sameModel = KMeansModel.load(sc, "myModelPath")
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -188,29 +104,7 @@ to the algorithm. We then output the parameters of the mixture model. ...@@ -188,29 +104,7 @@ to the algorithm. We then output the parameters of the mixture model.
Refer to the [`GaussianMixture` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixtureModel) for details on the API. Refer to the [`GaussianMixture` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixtureModel) for details on the API.
{% highlight scala %} {% include_example scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala %}
import org.apache.spark.mllib.clustering.GaussianMixture
import org.apache.spark.mllib.clustering.GaussianMixtureModel
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "myGMMModel")
val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
...@@ -222,50 +116,7 @@ that is equivalent to the provided example in Scala is given below: ...@@ -222,50 +116,7 @@ that is equivalent to the provided example in Scala is given below:
Refer to the [`GaussianMixture` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixture.html) and [`GaussianMixtureModel` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixtureModel.html) for details on the API. Refer to the [`GaussianMixture` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixture.html) and [`GaussianMixtureModel` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixtureModel.html) for details on the API.
{% highlight java %} {% include_example java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java %}
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;
public class GaussianMixtureExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("GaussianMixture Example");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(sc.sc(), "myGMMModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(sc.sc(), "myGMMModel");
// Output the parameters of the mixture model
for(int j=0; j<gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
}
}
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
...@@ -276,23 +127,7 @@ to the algorithm. We then output the parameters of the mixture model. ...@@ -276,23 +127,7 @@ to the algorithm. We then output the parameters of the mixture model.
Refer to the [`GaussianMixture` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixtureModel) for more details on the API. Refer to the [`GaussianMixture` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixtureModel) for more details on the API.
{% highlight python %} {% include_example python/mllib/gaussian_mixture_example.py %}
from pyspark.mllib.clustering import GaussianMixture
from numpy import array
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# output parameters of model
for i in range(2):
print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -334,31 +169,7 @@ which contains the computed clustering assignments. ...@@ -334,31 +169,7 @@ which contains the computed clustering assignments.
Refer to the [`PowerIterationClustering` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClusteringModel) for details on the API. Refer to the [`PowerIterationClustering` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClusteringModel) for details on the API.
{% highlight scala %} {% include_example scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala %}
import org.apache.spark.mllib.clustering.{PowerIterationClustering, PowerIterationClusteringModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/pic_data.txt")
val similarities = data.map { line =>
val parts = line.split(' ')
(parts(0).toLong, parts(1).toLong, parts(2).toDouble)
}
// Cluster the data into two classes using PowerIterationClustering
val pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
val model = pic.run(similarities)
model.assignments.foreach { a =>
println(s"${a.id} -> ${a.cluster}")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
{% endhighlight %}
A full example that produces the experiment described in the PIC paper can be found under A full example that produces the experiment described in the PIC paper can be found under
[`examples/`](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala). [`examples/`](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala).
...@@ -377,40 +188,7 @@ which contains the computed clustering assignments. ...@@ -377,40 +188,7 @@ which contains the computed clustering assignments.
Refer to the [`PowerIterationClustering` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClustering.html) and [`PowerIterationClusteringModel` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html) for details on the API. Refer to the [`PowerIterationClustering` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClustering.html) and [`PowerIterationClusteringModel` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html) for details on the API.
{% highlight java %} {% include_example java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java %}
import scala.Tuple2;
import scala.Tuple3;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
// Load and parse the data
JavaRDD<String> data = sc.textFile("data/mllib/pic_data.txt");
JavaRDD<Tuple3<Long, Long, Double>> similarities = data.map(
new Function<String, Tuple3<Long, Long, Double>>() {
public Tuple3<Long, Long, Double> call(String line) {
String[] parts = line.split(" ");
return new Tuple3<>(new Long(parts[0]), new Long(parts[1]), new Double(parts[2]));
}
}
);
// Cluster the data into two classes using PowerIterationClustering
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
// Save and load model
model.save(sc.sc(), "myModelPath");
PowerIterationClusteringModel sameModel = PowerIterationClusteringModel.load(sc.sc(), "myModelPath");
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
...@@ -425,23 +203,7 @@ which contains the computed clustering assignments. ...@@ -425,23 +203,7 @@ which contains the computed clustering assignments.
Refer to the [`PowerIterationClustering` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClusteringModel) for more details on the API. Refer to the [`PowerIterationClustering` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClusteringModel) for more details on the API.
{% highlight python %} {% include_example python/mllib/power_iteration_clustering_example.py %}
from __future__ import print_function
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "myModelPath")
sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -587,129 +349,19 @@ to the algorithm. We then output the topics, represented as probability distribu ...@@ -587,129 +349,19 @@ to the algorithm. We then output the topics, represented as probability distribu
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
Refer to the [`LDA` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.LDA) and [`DistributedLDAModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel) for details on the API. Refer to the [`LDA` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.LDA) and [`DistributedLDAModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel) for details on the API.
{% highlight scala %} {% include_example scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala %}
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print("Topic " + topic + ":")
for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
println()
}
// Save and load model.
ldaModel.save(sc, "myLDAModel")
val sameModel = DistributedLDAModel.load(sc, "myLDAModel")
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
Refer to the [`LDA` Java docs](api/java/org/apache/spark/mllib/clustering/LDA.html) and [`DistributedLDAModel` Java docs](api/java/org/apache/spark/mllib/clustering/DistributedLDAModel.html) for details on the API. Refer to the [`LDA` Java docs](api/java/org/apache/spark/mllib/clustering/LDA.html) and [`DistributedLDAModel` Java docs](api/java/org/apache/spark/mllib/clustering/DistributedLDAModel.html) for details on the API.
{% highlight java %} {% include_example java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java %}
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;
public class JavaLDAExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("LDA Example");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
return doc_id.swap();
}
}
));
corpus.cache();
// Cluster the documents into three topics using LDA
DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(sc.sc(), "myLDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(sc.sc(), "myLDAModel");
}
}
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
Refer to the [`LDA` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA) and [`LDAModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDAModel) for more details on the API. Refer to the [`LDA` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA) and [`LDAModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDAModel) for more details on the API.
{% highlight python %} {% include_example python/mllib/latent_dirichlet_allocation_example.py %}
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
model.save(sc, "myModelPath")
sameModel = LDAModel.load(sc, "myModelPath")
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -785,96 +437,16 @@ This example shows how to estimate clusters on streaming data. ...@@ -785,96 +437,16 @@ This example shows how to estimate clusters on streaming data.
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
Refer to the [`StreamingKMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.StreamingKMeans) for details on the API. Refer to the [`StreamingKMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.StreamingKMeans) for details on the API.
And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext.
First we import the necessary classes. {% include_example scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala %}
{% highlight scala %}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
{% endhighlight %}
Then we make an input stream of vectors for training, as well as a stream of labeled data
points for testing. We assume a StreamingContext `ssc` has been created, see
[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
{% highlight scala %}
val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
{% endhighlight %}
We create a model with random clusters and specify the number of clusters to find
{% highlight scala %}
val numDimensions = 3
val numClusters = 2
val model = new StreamingKMeans()
.setK(numClusters)
.setDecayFactor(1.0)
.setRandomCenters(numDimensions, 0.0)
{% endhighlight %}
Now register the streams for training and testing and start the job, printing
the predicted cluster assignments on new data points as they arrive.
{% highlight scala %}
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
Refer to the [`StreamingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.StreamingKMeans) for more details on the API. Refer to the [`StreamingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.StreamingKMeans) for more details on the API.
And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext.
First we import the necessary classes. {% include_example python/mllib/streaming_k_means_example.py %}
{% highlight python %}
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
{% endhighlight %}
Then we make an input stream of vectors for training, as well as a stream of labeled data
points for testing. We assume a StreamingContext `ssc` has been created, see
[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
{% highlight python %}
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(',')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
testData = ssc.textFileStream("/testing/data/dir").map(parse)
{% endhighlight %}
We create a model with random clusters and specify the number of clusters to find
{% highlight python %}
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
{% endhighlight %}
Now register the streams for training and testing and start the job, printing
the predicted cluster assignments on new data points as they arrive.
{% highlight python %}
model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
ssc.start()
ssc.awaitTermination()
{% endhighlight %}
</div> </div>
</div> </div>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.mllib;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// $example off$
public class JavaGaussianMixtureExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaGaussianMixtureExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
// $example on$
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
// $example off$
jsc.stop();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.mllib;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// $example off$
public class JavaKMeansExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaKMeansExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
// $example on$
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");
// $example off$
jsc.stop();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.mllib;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// $example off$
public class JavaLatentDirichletAllocationExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaKLatentDirichletAllocationExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
// $example on$
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
return doc_id.swap();
}
}
)
);
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
// $example off$
jsc.stop();
}
}
...@@ -24,8 +24,10 @@ import com.google.common.collect.Lists; ...@@ -24,8 +24,10 @@ import com.google.common.collect.Lists;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import org.apache.spark.mllib.clustering.PowerIterationClustering; import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel; import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
// $example off$
/** /**
* Java example for graph clustering using power iteration clustering (PIC). * Java example for graph clustering using power iteration clustering (PIC).
...@@ -36,6 +38,7 @@ public class JavaPowerIterationClusteringExample { ...@@ -36,6 +38,7 @@ public class JavaPowerIterationClusteringExample {
JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaSparkContext sc = new JavaSparkContext(sparkConf);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
// $example on$
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList( JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList(
new Tuple3<Long, Long, Double>(0L, 1L, 0.9), new Tuple3<Long, Long, Double>(0L, 1L, 0.9),
new Tuple3<Long, Long, Double>(1L, 2L, 0.9), new Tuple3<Long, Long, Double>(1L, 2L, 0.9),
...@@ -51,6 +54,7 @@ public class JavaPowerIterationClusteringExample { ...@@ -51,6 +54,7 @@ public class JavaPowerIterationClusteringExample {
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) { for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster()); System.out.println(a.id() + " -> " + a.cluster());
} }
// $example off$
sc.stop(); sc.stop();
} }
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
# $example on$
from numpy import array
# $example off$
from pyspark import SparkContext
# $example on$
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# $example off$
if __name__ == "__main__":
sc = SparkContext(appName="GaussianMixtureExample") # SparkContext
# $example on$
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
# $example off$
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
# $example on$
from numpy import array
from math import sqrt
# $example off$
from pyspark import SparkContext
# $example on$
from pyspark.mllib.clustering import KMeans, KMeansModel
# $example off$
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample") # SparkContext
# $example on$
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
runs=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
# $example off$
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from pyspark import SparkContext
# $example on$
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# $example off$
if __name__ == "__main__":
sc = SparkContext(appName="LatentDirichletAllocationExample") # SparkContext
# $example on$
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
# $example off$
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from pyspark import SparkContext
# $example on$
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# $example off$
if __name__ == "__main__":
sc = SparkContext(appName="PowerIterationClusteringExample") # SparkContext
# $example on$
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
# $example off$
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# $example on$
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# $example off$
if __name__ == "__main__":
sc = SparkContext(appName="StreamingKMeansExample") # SparkContext
ssc = StreamingContext(sc, 1)
# $example on$
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
# $example off$
print("Final centers: " + str(model.latestModel().centers))
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.mllib
import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// $example off$
object GaussianMixtureExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("GaussianMixtureExample")
val sc = new SparkContext(conf)
// $example on$
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
// $example off$
sc.stop()
}
}
// scalastyle:on println
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.mllib
import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// $example off$
object KMeansExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KMeansExample")
val sc = new SparkContext(conf)
// $example on$
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
// $example off$
sc.stop()
}
}
// scalastyle:on println
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.mllib
import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// $example off$
object LatentDirichletAllocationExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("LatentDirichletAllocationExample")
val sc = new SparkContext(conf)
// $example on$
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print("Topic " + topic + ":")
for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
// $example off$
sc.stop()
}
}
// scalastyle:on println
...@@ -22,7 +22,9 @@ import org.apache.log4j.{Level, Logger} ...@@ -22,7 +22,9 @@ import org.apache.log4j.{Level, Logger}
import scopt.OptionParser import scopt.OptionParser
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.mllib.clustering.PowerIterationClustering import org.apache.spark.mllib.clustering.PowerIterationClustering
// $example off$
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
/** /**
...@@ -90,6 +92,7 @@ object PowerIterationClusteringExample { ...@@ -90,6 +92,7 @@ object PowerIterationClusteringExample {
Logger.getRootLogger.setLevel(Level.WARN) Logger.getRootLogger.setLevel(Level.WARN)
// $example on$
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints) val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering() val model = new PowerIterationClustering()
.setK(params.k) .setK(params.k)
...@@ -101,12 +104,13 @@ object PowerIterationClusteringExample { ...@@ -101,12 +104,13 @@ object PowerIterationClusteringExample {
val assignments = clusters.toList.sortBy { case (k, v) => v.length } val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments val assignmentsStr = assignments
.map { case (k, v) => .map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}" s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ") }.mkString(", ")
val sizesStr = assignments.map { val sizesStr = assignments.map {
_._2.length _._2.length
}.sorted.mkString("(", ",", ")") }.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr") println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
// $example off$
sc.stop() sc.stop()
} }
......
...@@ -19,10 +19,12 @@ ...@@ -19,10 +19,12 @@
package org.apache.spark.examples.mllib package org.apache.spark.examples.mllib
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
// $example on$
import org.apache.spark.mllib.clustering.StreamingKMeans import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
// $example off$
/** /**
* Estimate clusters on one stream of data and make predictions * Estimate clusters on one stream of data and make predictions
...@@ -58,7 +60,8 @@ object StreamingKMeansExample { ...@@ -58,7 +60,8 @@ object StreamingKMeansExample {
System.exit(1) System.exit(1)
} }
val conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample") // $example on$
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse) val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
...@@ -74,6 +77,7 @@ object StreamingKMeansExample { ...@@ -74,6 +77,7 @@ object StreamingKMeansExample {
ssc.start() ssc.start()
ssc.awaitTermination() ssc.awaitTermination()
// $example off$
} }
} }
// scalastyle:on println // scalastyle:on println
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment