diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index a410547a72fda4239b2824bc5f5abff821c2a2b9..ab124e6d77c5edc9a2eb59c87163d558c7e14dad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -23,11 +23,10 @@ import org.apache.spark.Logging import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils - /** * :: Experimental :: * @@ -49,14 +48,15 @@ import org.apache.spark.util.Utils class LDA private ( private var k: Int, private var maxIterations: Int, - private var docConcentration: Double, + private var docConcentration: Vector, private var topicConcentration: Double, private var seed: Long, private var checkpointInterval: Int, private var ldaOptimizer: LDAOptimizer) extends Logging { - def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1, - seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer) + def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1), + topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10, + ldaOptimizer = new EMLDAOptimizer) /** * Number of topics to infer. I.e., the number of soft cluster centers. @@ -77,37 +77,50 @@ class LDA private ( * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * - * This is the parameter to a symmetric Dirichlet distribution. + * This is the parameter to a Dirichlet distribution. */ - def getDocConcentration: Double = this.docConcentration + def getDocConcentration: Vector = this.docConcentration /** * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * - * This is the parameter to a symmetric Dirichlet distribution, where larger values - * mean more smoothing (more regularization). + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). * - * If set to -1, then docConcentration is set automatically. - * (default = -1 = automatic) + * If set to a singleton vector Vector(-1), then docConcentration is set automatically. If set to + * singleton vector Vector(t) where t != -1, then t is replicated to a vector of length k during + * [[LDAOptimizer.initialize()]]. Otherwise, the [[docConcentration]] vector must be length k. + * (default = Vector(-1) = automatic) * * Optimizer-specific parameter settings: * - EM - * - Value should be > 1.0 - * - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows - * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - Online - * - Value should be >= 0 - * - default = (1.0 / k), following the implementation from + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. */ - def setDocConcentration(docConcentration: Double): this.type = { + def setDocConcentration(docConcentration: Vector): this.type = { this.docConcentration = docConcentration this } + /** Replicates Double to create a symmetric prior */ + def setDocConcentration(docConcentration: Double): this.type = { + this.docConcentration = Vectors.dense(docConcentration) + this + } + /** Alias for [[getDocConcentration]] */ - def getAlpha: Double = getDocConcentration + def getAlpha: Vector = getDocConcentration + + /** Alias for [[setDocConcentration()]] */ + def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha) /** Alias for [[setDocConcentration()]] */ def setAlpha(alpha: Double): this.type = setDocConcentration(alpha) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index b960ae6c0708de0800e5bdd4e18212609808ee22..f4170a3d98dd8d38e452177a83aa15f21e534871 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector} +import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors} import org.apache.spark.rdd.RDD /** @@ -95,8 +95,11 @@ final class EMLDAOptimizer extends LDAOptimizer { * Compute bipartite term/doc graph. */ override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + val docConcentration = lda.getDocConcentration(0) + require({ + lda.getDocConcentration.toArray.forall(_ == docConcentration) + }, "EMLDAOptimizer currently only supports symmetric document-topic priors") - val docConcentration = lda.getDocConcentration val topicConcentration = lda.getTopicConcentration val k = lda.getK @@ -229,10 +232,10 @@ final class OnlineLDAOptimizer extends LDAOptimizer { private var vocabSize: Int = 0 /** alias for docConcentration */ - private var alpha: Double = 0 + private var alpha: Vector = Vectors.dense(0) /** (private[clustering] for debugging) Get docConcentration */ - private[clustering] def getAlpha: Double = alpha + private[clustering] def getAlpha: Vector = alpha /** alias for topicConcentration */ private var eta: Double = 0 @@ -343,7 +346,19 @@ final class OnlineLDAOptimizer extends LDAOptimizer { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size - this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration + this.alpha = if (lda.getDocConcentration.size == 1) { + if (lda.getDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k)) + else { + require(lda.getDocConcentration(0) >= 0, s"all entries in alpha must be >=0, got: $alpha") + Vectors.dense(Array.fill(k)(lda.getDocConcentration(0))) + } + } else { + require(lda.getDocConcentration.size == k, s"alpha must have length k, got: $alpha") + lda.getDocConcentration.foreachActive { case (_, x) => + require(x >= 0, s"all entries in alpha must be >= 0, got: $alpha") + } + lda.getDocConcentration + } this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration this.randomGenerator = new Random(lda.getSeed) @@ -372,7 +387,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val vocabSize = this.vocabSize val Elogbeta = dirichletExpectation(lambda).t val expElogbeta = exp(Elogbeta) - val alpha = this.alpha + val alpha = this.alpha.toBreeze val gammaShape = this.gammaShape val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 721a0656589517f1541e4af95f286b4b95b0dbe9..da70d9bd7c7902f75836ab8065f34325d6cfba4f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils @@ -132,22 +132,38 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { test("setter alias") { val lda = new LDA().setAlpha(2.0).setBeta(3.0) - assert(lda.getAlpha === 2.0) - assert(lda.getDocConcentration === 2.0) + assert(lda.getAlpha.toArray.forall(_ === 2.0)) + assert(lda.getDocConcentration.toArray.forall(_ === 2.0)) assert(lda.getBeta === 3.0) assert(lda.getTopicConcentration === 3.0) } + test("initializing with alpha length != k or 1 fails") { + intercept[IllegalArgumentException] { + val lda = new LDA().setK(2).setAlpha(Vectors.dense(1, 2, 3, 4)) + val corpus = sc.parallelize(tinyCorpus, 2) + lda.run(corpus) + } + } + + test("initializing with elements in alpha < 0 fails") { + intercept[IllegalArgumentException] { + val lda = new LDA().setK(4).setAlpha(Vectors.dense(-1, 2, 3, 4)) + val corpus = sc.parallelize(tinyCorpus, 2) + lda.run(corpus) + } + } + test("OnlineLDAOptimizer initialization") { val lda = new LDA().setK(2) val corpus = sc.parallelize(tinyCorpus, 2) val op = new OnlineLDAOptimizer().initialize(corpus, lda) op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau0(567) - assert(op.getAlpha == 0.5) // default 1.0 / k - assert(op.getEta == 0.5) // default 1.0 / k - assert(op.getKappa == 0.9876) - assert(op.getMiniBatchFraction == 0.123) - assert(op.getTau0 == 567) + assert(op.getAlpha.toArray.forall(_ === 0.5)) // default 1.0 / k + assert(op.getEta === 0.5) // default 1.0 / k + assert(op.getKappa === 0.9876) + assert(op.getMiniBatchFraction === 0.123) + assert(op.getTau0 === 567) } test("OnlineLDAOptimizer one iteration") { @@ -218,6 +234,56 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { } } + test("OnlineLDAOptimizer with asymmetric prior") { + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(0, 1), Array(1, 1)), + Vectors.sparse(6, Array(1, 2), Array(1, 1)), + Vectors.sparse(6, Array(0, 2), Array(1, 1)), + Vectors.sparse(6, Array(3, 4), Array(1, 1)), + Vectors.sparse(6, Array(3, 5), Array(1, 1)), + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + + val docs = sc.parallelize(toydata) + val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51) + .setGammaShape(1e10) + val lda = new LDA().setK(2) + .setDocConcentration(Vectors.dense(0.00001, 0.1)) + .setTopicConcentration(0.01) + .setMaxIterations(100) + .setOptimizer(op) + .setSeed(12345) + + val ldaModel = lda.run(docs) + val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) + val topics = topicIndices.map { case (terms, termWeights) => + terms.zip(termWeights) + } + + /* Verify results with Python: + + import numpy as np + from gensim import models + corpus = [ + [(0, 1.0), (1, 1.0)], + [(1, 1.0), (2, 1.0)], + [(0, 1.0), (2, 1.0)], + [(3, 1.0), (4, 1.0)], + [(3, 1.0), (5, 1.0)], + [(4, 1.0), (5, 1.0)]] + np.random.seed(10) + lda = models.ldamodel.LdaModel( + corpus=corpus, alpha=np.array([0.00001, 0.1]), num_topics=2, update_every=0, passes=100) + lda.print_topics() + + > ['0.167*0 + 0.167*1 + 0.167*2 + 0.167*3 + 0.167*4 + 0.167*5', + '0.167*0 + 0.167*1 + 0.167*2 + 0.167*4 + 0.167*3 + 0.167*5'] + */ + topics.foreach { topic => + assert(topic.forall { case (_, p) => p ~= 0.167 absTol 0.05 }) + } + } + test("model save/load") { // Test for LocalLDAModel. val localModel = new LocalLDAModel(tinyTopics)