diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 09cf81eeeb78bc0a9099e89c3ec0ad952e7b09a8..b411c60cee15eba54eeb7efec83405dad5953d54 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -47,6 +47,17 @@ object Analytics extends Logging { val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees, (vertex, deg) => (deg.getOrElse(0), 1.0) ) + + println("Vertex Replication: " + pagerankGraph.replication) + + val edgeCounts = pagerankGraph.balance + + println("Edge Balance: " + (edgeCounts.max.toDouble / edgeCounts.min ) ) + println("Min edge block: " + edgeCounts.min) + println("Max edge block: " + edgeCounts.max) + + + Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( (vertex, a: Double) => (vertex.data._1, (resetProb + (1.0 - resetProb) * a)), // apply (me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index de0a09050454783a0686e92e76ca11b47b3d3f2f..61032bf0be309a5e045042f6e8050af4333f8388 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -18,6 +18,12 @@ import org.apache.spark.rdd.RDD */ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { + + def replication: Double + + def balance: Array[Int] + + /** * Get the vertices and their data. * diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index db982a8fe4b9ea9acab3c5afd47f1c4f395bc3f9..e178df3841531f30b893af81a68720c25d0e0262 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -56,6 +56,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( this } + + override def replication(): Double = { + val rep = vTable.map{ case (_, (_, a)) => a.size }.sum + rep / vTable.count + } + + override def balance(): Array[Int] = { + eTable.map{ case (_, epart) => epart.data.size }.collect + } + override def reverse: Graph[VD, ED] = { newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) }) }