Skip to content
Snippets Groups Projects
Commit 764ca180 authored by Xiangrui Meng's avatar Xiangrui Meng
Browse files

[SPARK-13355][MLLIB] replace GraphImpl.fromExistingRDDs by Graph.apply

`GraphImpl.fromExistingRDDs` expects preprocessed vertex RDD as input. We call it in LDA without validating this requirement. So it might introduce errors. Replacing it by `Graph.apply` would be safer and more proper because it is a public API. The tests still pass. So maybe it is safe to use `fromExistingRDDs` here (though it doesn't seem so based on the implementation) or the test cases are special. jkbradley ankurdave

Author: Xiangrui Meng <meng@databricks.com>

Closes #11226 from mengxr/SPARK-13355.
parent 72427c3e
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,6 @@ import breeze.stats.distributions.{Gamma, RandBasis}
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
......@@ -188,7 +187,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg)
.mapValues(_._2)
// Update the vertex descriptors with the new counts.
val newGraph = GraphImpl.fromExistingRDDs(docTopicDistributions, graph.edges)
val newGraph = Graph(docTopicDistributions, graph.edges)
graph = newGraph
graphCheckpointer.update(newGraph)
globalTopicTotals = computeGlobalTopicTotals()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment