diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 5dc637ebdc133cf5a0c1a8cf61edb9c6cc06f7fe..f31949f13a4cf310a1db2fa8207d8947de514ac9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -26,7 +26,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId} import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} @@ -228,6 +228,11 @@ class LocalLDAModel private[clustering] ( docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, vocabSize) + /** Java-friendly version of [[logLikelihood]] */ + def logLikelihood(documents: JavaPairRDD[java.lang.Long, Vector]): Double = { + logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) + } + /** * Calculate an upper bound bound on perplexity. (Lower is better.) * See Equation (16) in original Online LDA paper. @@ -242,6 +247,11 @@ class LocalLDAModel private[clustering] ( -logLikelihood(documents) / corpusTokenCount } + /** Java-friendly version of [[logPerplexity]] */ + def logPerplexity(documents: JavaPairRDD[java.lang.Long, Vector]): Double = { + logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) + } + /** * Estimate the variational likelihood bound of from `documents`: * log p(documents) >= E_q[log p(documents)] - E_q[log q(documents)] @@ -341,8 +351,14 @@ class LocalLDAModel private[clustering] ( } } -} + /** Java-friendly version of [[topicDistributions]] */ + def topicDistributions( + documents: JavaPairRDD[java.lang.Long, Vector]): JavaPairRDD[java.lang.Long, Vector] = { + val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) + JavaPairRDD.fromRDD(distributions.asInstanceOf[RDD[(java.lang.Long, Vector)]]) + } +} @Experimental object LocalLDAModel extends Loader[LocalLDAModel] { @@ -657,6 +673,13 @@ class DistributedLDAModel private[clustering] ( } } + /** Java-friendly version of [[topTopicsPerDocument]] */ + def javaTopTopicsPerDocument( + k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])] = { + val topics = topTopicsPerDocument(k) + topics.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[java.lang.Double])]].toJavaRDD() + } + // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index f84502919e3815940ad349c222f1ac6a26029142..24fe48cb8f71fc3d69fbd6807c0647e76dab9614 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat import scala.annotation.varargs import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaRDD, JavaDoubleRDD} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.regression.LabeledPoint @@ -178,6 +178,9 @@ object Statistics { ChiSqTest.chiSquaredFeatures(data) } + /** Java-friendly version of [[chiSqTest()]] */ + def chiSqTest(data: JavaRDD[LabeledPoint]): Array[ChiSqTestResult] = chiSqTest(data.rdd) + /** * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a * continuous distribution. By comparing the largest difference between the empirical cumulative @@ -212,4 +215,15 @@ object Statistics { : KolmogorovSmirnovTestResult = { KolmogorovSmirnovTest.testOneSample(data, distName, params: _*) } + + /** Java-friendly version of [[kolmogorovSmirnovTest()]] */ + @varargs + def kolmogorovSmirnovTest( + data: JavaDoubleRDD, + distName: String, + params: java.lang.Double*): KolmogorovSmirnovTestResult = { + val javaParams = params.asInstanceOf[Seq[Double]] + KolmogorovSmirnovTest.testOneSample(data.rdd.asInstanceOf[RDD[Double]], + distName, javaParams: _*) + } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index d272a42c8576fde3dcf5549db4401fe5c3392073..427be9430d82069e5e80c1591fa5a5e1dbc63f39 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -124,6 +124,10 @@ public class JavaLDASuite implements Serializable { } }); assertEquals(topicDistributions.count(), nonEmptyCorpus.count()); + + // Check: javaTopTopicsPerDocuments + JavaRDD<scala.Tuple3<java.lang.Long, int[], java.lang.Double[]>> topTopics = + model.javaTopTopicsPerDocument(3); } @Test @@ -160,11 +164,31 @@ public class JavaLDASuite implements Serializable { assertEquals(roundedLocalTopicSummary.length, k); } + @Test + public void localLdaMethods() { + JavaRDD<Tuple2<Long, Vector>> docs = sc.parallelize(toyData, 2); + JavaPairRDD<Long, Vector> pairedDocs = JavaPairRDD.fromJavaRDD(docs); + + // check: topicDistributions + assertEquals(toyModel.topicDistributions(pairedDocs).count(), pairedDocs.count()); + + // check: logPerplexity + double logPerplexity = toyModel.logPerplexity(pairedDocs); + + // check: logLikelihood. + ArrayList<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<Tuple2<Long, Vector>>(); + docsSingleWord.add(new Tuple2<Long, Vector>(Long.valueOf(0), Vectors.dense(1.0, 0.0, 0.0))); + JavaPairRDD<Long, Vector> single = JavaPairRDD.fromJavaRDD(sc.parallelize(docsSingleWord)); + double logLikelihood = toyModel.logLikelihood(single); + } + private static int tinyK = LDASuite$.MODULE$.tinyK(); private static int tinyVocabSize = LDASuite$.MODULE$.tinyVocabSize(); private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics(); private static Tuple2<int[], double[]>[] tinyTopicDescription = LDASuite$.MODULE$.tinyTopicDescription(); private JavaPairRDD<Long, Vector> corpus; + private LocalLDAModel toyModel = LDASuite$.MODULE$.toyModel(); + private ArrayList<Tuple2<Long, Vector>> toyData = LDASuite$.MODULE$.javaToyData(); } diff --git a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java index 62f7f26b7c98f0c27ee31c79e9c3007f9ed5445e..eb4e3698624bc2f249d860b9042bd72e4eed492f 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java @@ -27,7 +27,12 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.stat.test.ChiSqTestResult; +import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult; public class JavaStatisticsSuite implements Serializable { private transient JavaSparkContext sc; @@ -53,4 +58,21 @@ public class JavaStatisticsSuite implements Serializable { // Check default method assertEquals(corr1, corr2); } + + @Test + public void kolmogorovSmirnovTest() { + JavaDoubleRDD data = sc.parallelizeDoubles(Lists.newArrayList(0.2, 1.0, -1.0, 2.0)); + KolmogorovSmirnovTestResult testResult1 = Statistics.kolmogorovSmirnovTest(data, "norm"); + KolmogorovSmirnovTestResult testResult2 = Statistics.kolmogorovSmirnovTest( + data, "norm", 0.0, 1.0); + } + + @Test + public void chiSqTest() { + JavaRDD<LabeledPoint> data = sc.parallelize(Lists.newArrayList( + new LabeledPoint(0.0, Vectors.dense(0.1, 2.3)), + new LabeledPoint(1.0, Vectors.dense(1.5, 5.1)), + new LabeledPoint(0.0, Vectors.dense(2.4, 8.1)))); + ChiSqTestResult[] testResults = Statistics.chiSqTest(data); + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index ce6a8eb8e8c46c416522df02b6ef32b40ad00646..926185e90bcf92533cf1c52c301d6a7f17a9fa16 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import java.util.{ArrayList => JArrayList} + import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax} import org.apache.spark.SparkFunSuite @@ -575,6 +577,17 @@ private[clustering] object LDASuite { Vectors.sparse(6, Array(4, 5), Array(1, 1)) ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + /** Used in the Java Test Suite */ + def javaToyData: JArrayList[(java.lang.Long, Vector)] = { + val javaData = new JArrayList[(java.lang.Long, Vector)] + var i = 0 + while (i < toyData.size) { + javaData.add((toyData(i)._1, toyData(i)._2)) + i += 1 + } + javaData + } + def toyModel: LocalLDAModel = { val k = 2 val vocabSize = 6