Skip to content
Snippets Groups Projects
Commit 1aca9c13 authored by Feynman Liang's avatar Feynman Liang Committed by Joseph K. Bradley
Browse files

[SPARK-8536] [MLLIB] Generalize OnlineLDAOptimizer to asymmetric document-topic Dirichlet priors

Modify `LDA` to take asymmetric document-topic prior distributions and `OnlineLDAOptimizer` to use the asymmetric prior during variational inference.

This PR only generalizes `OnlineLDAOptimizer` and the associated `LocalLDAModel`; `EMLDAOptimizer` and `DistributedLDAModel` still only support symmetric `alpha` (checked during `EMLDAOptimizer.initialize`).

Author: Feynman Liang <fliang@databricks.com>

Closes #7575 from feynmanliang/SPARK-8536-LDA-asymmetric-priors and squashes the following commits:

af8fbb7 [Feynman Liang] Fix merge errors
ef5821d [Feynman Liang] Merge remote-tracking branch 'apache/master' into SPARK-8536-LDA-asymmetric-priors
58f1d7b [Feynman Liang] Fix from review feedback
a6dcf70 [Feynman Liang] Change docConcentration interface and move LDAOptimizer validation to initialize, add sad path tests
72038ff [Feynman Liang] Add tests referenced against gensim
d4284fa [Feynman Liang] Generalize OnlineLDA to asymmetric priors, no tests
parent cf21d05f
No related branches found
No related tags found
No related merge requests found
......@@ -23,11 +23,10 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.graphx._
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
/**
* :: Experimental ::
*
......@@ -49,14 +48,15 @@ import org.apache.spark.util.Utils
class LDA private (
private var k: Int,
private var maxIterations: Int,
private var docConcentration: Double,
private var docConcentration: Vector,
private var topicConcentration: Double,
private var seed: Long,
private var checkpointInterval: Int,
private var ldaOptimizer: LDAOptimizer) extends Logging {
def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1,
seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer)
def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1),
topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10,
ldaOptimizer = new EMLDAOptimizer)
/**
* Number of topics to infer. I.e., the number of soft cluster centers.
......@@ -77,37 +77,50 @@ class LDA private (
* Concentration parameter (commonly named "alpha") for the prior placed on documents'
* distributions over topics ("theta").
*
* This is the parameter to a symmetric Dirichlet distribution.
* This is the parameter to a Dirichlet distribution.
*/
def getDocConcentration: Double = this.docConcentration
def getDocConcentration: Vector = this.docConcentration
/**
* Concentration parameter (commonly named "alpha") for the prior placed on documents'
* distributions over topics ("theta").
*
* This is the parameter to a symmetric Dirichlet distribution, where larger values
* mean more smoothing (more regularization).
* This is the parameter to a Dirichlet distribution, where larger values mean more smoothing
* (more regularization).
*
* If set to -1, then docConcentration is set automatically.
* (default = -1 = automatic)
* If set to a singleton vector Vector(-1), then docConcentration is set automatically. If set to
* singleton vector Vector(t) where t != -1, then t is replicated to a vector of length k during
* [[LDAOptimizer.initialize()]]. Otherwise, the [[docConcentration]] vector must be length k.
* (default = Vector(-1) = automatic)
*
* Optimizer-specific parameter settings:
* - EM
* - Value should be > 1.0
* - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
* Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - Currently only supports symmetric distributions, so all values in the vector should be
* the same.
* - Values should be > 1.0
* - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
* from Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - Online
* - Value should be >= 0
* - default = (1.0 / k), following the implementation from
* - Values should be >= 0
* - default = uniformly (1.0 / k), following the implementation from
* [[https://github.com/Blei-Lab/onlineldavb]].
*/
def setDocConcentration(docConcentration: Double): this.type = {
def setDocConcentration(docConcentration: Vector): this.type = {
this.docConcentration = docConcentration
this
}
/** Replicates Double to create a symmetric prior */
def setDocConcentration(docConcentration: Double): this.type = {
this.docConcentration = Vectors.dense(docConcentration)
this
}
/** Alias for [[getDocConcentration]] */
def getAlpha: Double = getDocConcentration
def getAlpha: Vector = getDocConcentration
/** Alias for [[setDocConcentration()]] */
def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha)
/** Alias for [[setDocConcentration()]] */
def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)
......
......@@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
/**
......@@ -95,8 +95,11 @@ final class EMLDAOptimizer extends LDAOptimizer {
* Compute bipartite term/doc graph.
*/
override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
val docConcentration = lda.getDocConcentration(0)
require({
lda.getDocConcentration.toArray.forall(_ == docConcentration)
}, "EMLDAOptimizer currently only supports symmetric document-topic priors")
val docConcentration = lda.getDocConcentration
val topicConcentration = lda.getTopicConcentration
val k = lda.getK
......@@ -229,10 +232,10 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
private var vocabSize: Int = 0
/** alias for docConcentration */
private var alpha: Double = 0
private var alpha: Vector = Vectors.dense(0)
/** (private[clustering] for debugging) Get docConcentration */
private[clustering] def getAlpha: Double = alpha
private[clustering] def getAlpha: Vector = alpha
/** alias for topicConcentration */
private var eta: Double = 0
......@@ -343,7 +346,19 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
this.k = lda.getK
this.corpusSize = docs.count()
this.vocabSize = docs.first()._2.size
this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
this.alpha = if (lda.getDocConcentration.size == 1) {
if (lda.getDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k))
else {
require(lda.getDocConcentration(0) >= 0, s"all entries in alpha must be >=0, got: $alpha")
Vectors.dense(Array.fill(k)(lda.getDocConcentration(0)))
}
} else {
require(lda.getDocConcentration.size == k, s"alpha must have length k, got: $alpha")
lda.getDocConcentration.foreachActive { case (_, x) =>
require(x >= 0, s"all entries in alpha must be >= 0, got: $alpha")
}
lda.getDocConcentration
}
this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
this.randomGenerator = new Random(lda.getSeed)
......@@ -372,7 +387,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
val vocabSize = this.vocabSize
val Elogbeta = dirichletExpectation(lambda).t
val expElogbeta = exp(Elogbeta)
val alpha = this.alpha
val alpha = this.alpha.toBreeze
val gammaShape = this.gammaShape
val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
......
......@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix, Vector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils
......@@ -132,22 +132,38 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
test("setter alias") {
val lda = new LDA().setAlpha(2.0).setBeta(3.0)
assert(lda.getAlpha === 2.0)
assert(lda.getDocConcentration === 2.0)
assert(lda.getAlpha.toArray.forall(_ === 2.0))
assert(lda.getDocConcentration.toArray.forall(_ === 2.0))
assert(lda.getBeta === 3.0)
assert(lda.getTopicConcentration === 3.0)
}
test("initializing with alpha length != k or 1 fails") {
intercept[IllegalArgumentException] {
val lda = new LDA().setK(2).setAlpha(Vectors.dense(1, 2, 3, 4))
val corpus = sc.parallelize(tinyCorpus, 2)
lda.run(corpus)
}
}
test("initializing with elements in alpha < 0 fails") {
intercept[IllegalArgumentException] {
val lda = new LDA().setK(4).setAlpha(Vectors.dense(-1, 2, 3, 4))
val corpus = sc.parallelize(tinyCorpus, 2)
lda.run(corpus)
}
}
test("OnlineLDAOptimizer initialization") {
val lda = new LDA().setK(2)
val corpus = sc.parallelize(tinyCorpus, 2)
val op = new OnlineLDAOptimizer().initialize(corpus, lda)
op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau0(567)
assert(op.getAlpha == 0.5) // default 1.0 / k
assert(op.getEta == 0.5) // default 1.0 / k
assert(op.getKappa == 0.9876)
assert(op.getMiniBatchFraction == 0.123)
assert(op.getTau0 == 567)
assert(op.getAlpha.toArray.forall(_ === 0.5)) // default 1.0 / k
assert(op.getEta === 0.5) // default 1.0 / k
assert(op.getKappa === 0.9876)
assert(op.getMiniBatchFraction === 0.123)
assert(op.getTau0 === 567)
}
test("OnlineLDAOptimizer one iteration") {
......@@ -218,6 +234,56 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}
}
test("OnlineLDAOptimizer with asymmetric prior") {
def toydata: Array[(Long, Vector)] = Array(
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
Vectors.sparse(6, Array(0, 2), Array(1, 1)),
Vectors.sparse(6, Array(3, 4), Array(1, 1)),
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
Vectors.sparse(6, Array(4, 5), Array(1, 1))
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
val docs = sc.parallelize(toydata)
val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51)
.setGammaShape(1e10)
val lda = new LDA().setK(2)
.setDocConcentration(Vectors.dense(0.00001, 0.1))
.setTopicConcentration(0.01)
.setMaxIterations(100)
.setOptimizer(op)
.setSeed(12345)
val ldaModel = lda.run(docs)
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val topics = topicIndices.map { case (terms, termWeights) =>
terms.zip(termWeights)
}
/* Verify results with Python:
import numpy as np
from gensim import models
corpus = [
[(0, 1.0), (1, 1.0)],
[(1, 1.0), (2, 1.0)],
[(0, 1.0), (2, 1.0)],
[(3, 1.0), (4, 1.0)],
[(3, 1.0), (5, 1.0)],
[(4, 1.0), (5, 1.0)]]
np.random.seed(10)
lda = models.ldamodel.LdaModel(
corpus=corpus, alpha=np.array([0.00001, 0.1]), num_topics=2, update_every=0, passes=100)
lda.print_topics()
> ['0.167*0 + 0.167*1 + 0.167*2 + 0.167*3 + 0.167*4 + 0.167*5',
'0.167*0 + 0.167*1 + 0.167*2 + 0.167*4 + 0.167*3 + 0.167*5']
*/
topics.foreach { topic =>
assert(topic.forall { case (_, p) => p ~= 0.167 absTol 0.05 })
}
}
test("model save/load") {
// Test for LocalLDAModel.
val localModel = new LocalLDAModel(tinyTopics)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment