diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index a1d3df03a114014a008f20ffa886ea761720bad7..5e17c8da6113416f05e2ae05f45a20738dd3c6d7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -450,34 +450,23 @@ private[clustering] object LDA { // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). - val edgesWithGamma: RDD[(Edge[TokenCount], TopicCounts)] = - edges.mapPartitionsWithIndex { case (partIndex, partEdges) => - val random = new Random(partIndex + randomSeed) - partEdges.map { edge => - // Create a random gamma_{wjk} - (edge, normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)) + def createVertices(): RDD[(VertexId, TopicCounts)] = { + val verticesTMP: RDD[(VertexId, TopicCounts)] = + edges.mapPartitionsWithIndex { case (partIndex, partEdges) => + val random = new Random(partIndex + randomSeed) + partEdges.flatMap { edge => + val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) + val sum = gamma * edge.attr + Seq((edge.srcId, sum), (edge.dstId, sum)) + } } - } - def createVertices(sendToWhere: Edge[TokenCount] => VertexId): RDD[(VertexId, TopicCounts)] = { - val verticesTMP: RDD[(VertexId, (TokenCount, TopicCounts))] = - edgesWithGamma.map { case (edge, gamma: TopicCounts) => - (sendToWhere(edge), (edge.attr, gamma)) - } - verticesTMP.aggregateByKey(BDV.zeros[Double](k))( - (sum, t) => { - brzAxpy(t._1, t._2, sum) - sum - }, - (sum0, sum1) => { - sum0 += sum1 - } - ) + verticesTMP.reduceByKey(_ + _) } - val docVertices = createVertices(_.srcId) - val termVertices = createVertices(_.dstId) + + val docTermVertices = createVertices() // Partition such that edges are grouped by document - val graph = Graph(docVertices ++ termVertices, edges) + val graph = Graph(docTermVertices, edges) .partitionBy(PartitionStrategy.EdgePartition1D) new EMOptimizer(graph, k, vocabSize, docConcentration, topicConcentration, checkpointInterval)