From 14a3329a11d7b38e0fd28807aa434dae19ca52f6 Mon Sep 17 00:00:00 2001
From: "Joseph E. Gonzalez" <joseph.e.gonzalez@gmail.com>
Date: Tue, 22 Oct 2013 15:01:20 -0700
Subject: [PATCH] Changing the Pregel interface slightly to better support type
 inference.

---
 .../scala/org/apache/spark/graph/Pregel.scala | 69 +++++++++++++++++--
 1 file changed, 63 insertions(+), 6 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 7ad6fda2a4..d1f5513f6a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -10,6 +10,8 @@ import org.apache.spark.rdd.RDD
 object Pregel {
 
 
+
+
   /**
    * Execute the Pregel program.
    *
@@ -34,12 +36,11 @@ object Pregel {
    * @return the resulting graph at the end of the computation
    *
    */
-  def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
       vprog: (Vid, VD, A) => VD,
       sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
-      mergeMsg: (A, A) => A,
-      initialMsg: A,
-      numIter: Int)
+      mergeMsg: (A, A) => A)
     : Graph[VD, ED] = {
 
     var g = graph
@@ -61,5 +62,61 @@ object Pregel {
     }
     // Return the final graph
     g
-  }
-}
+  } // end of apply
+
+
+  /**
+   * Execute the Pregel program.
+   *
+   * @tparam VD the vertex data type
+   * @tparam ED the edge data type
+   * @tparam A the Pregel message type
+   *
+   * @param vprog a user supplied function that acts as the vertex program for
+   *              the Pregel computation. It takes the vertex ID of the vertex it is running on,
+   *              the accompanying data for that vertex, and the incoming data and returns the
+   *              new vertex value.
+   * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
+   *                between the vertex and one of its neighbors and produces a message to send
+   *                to that neighbor.
+   * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
+   *                 them into a single message of type A. ''This function must be commutative and
+   *                 associative.''
+   * @param initialMsg the message each vertex will receive at the beginning of the
+   *                   first iteration.
+   * @param numIter the number of iterations to run this computation for.
+   *
+   * @return the resulting graph at the end of the computation
+   *
+   */
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], initialMsg: A)(
+      vprog: (Vid, VD, A) => VD,
+      sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+      mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
+
+    var g = graph
+    //var g = graph.cache()
+    var i = 0
+
+    def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
+
+    // Receive the first set of messages
+    g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
+
+    var activeMessages = g.numEdges
+    while (activeMessages > 0) {
+      // compute the messages
+      val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In).cache
+      activeMessages = messages.count
+      // receive the messages
+      g = g.joinVertices(messages)(vprog)
+      // count the iteration
+      i += 1
+    }
+    // Return the final graph
+    g
+  } // end of apply
+
+} // end of class Pregel
-- 
GitLab