From 08024c938c81b6590745fb87e95f8d352f5ea011 Mon Sep 17 00:00:00 2001
From: "Joseph E. Gonzalez" <joseph.e.gonzalez@gmail.com>
Date: Sat, 26 Oct 2013 15:42:51 -0700
Subject: [PATCH] Adding more documentation to the Pregel API as well as
 additional functionality including the ability to specify the edge direction
 along which messages are computed.

---
 .../scala/org/apache/spark/graph/Pregel.scala | 259 +++++++++++++++---
 1 file changed, 224 insertions(+), 35 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 94dc806fc2..d45e351d6a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -4,34 +4,87 @@ import org.apache.spark.rdd.RDD
 
 
 /**
- * This object implements the Pregel bulk-synchronous
- * message-passing API.
+ * This object implements a Pregel-like bulk-synchronous
+ * message-passing API.  However, unlike the original Pregel API
+ * the GraphX pregel API factors the sendMessage computation over
+ * edges, enables the message sending computation to read both
+ * vertex attributes, and finally contrains messages to the graph
+ * structure.  These changes allow for substantially more efficient
+ * distributed execution while also exposing greater flexibility
+ * for graph based computation.  
+ *
+ * This object present several variants of the bulk synchronous
+ * execution that differ only in the edge direction along which
+ * messages are sent and whether a fixed number of iterations 
+ * is used. 
+ *
+ * @example We can use the Pregel abstraction to implement PageRank
+ * {{{
+ * val pagerankGraph: Graph[Double, Double] = graph
+ *   // Associate the degree with each vertex
+ *   .outerJoinVertices(graph.outDegrees){
+ *     (vid, vdata, deg) => deg.getOrElse(0)
+ *   }
+ *   // Set the weight on the edges based on the degree
+ *   .mapTriplets( e => 1.0 / e.srcAttr )
+ *   // Set the vertex attributes to the initial pagerank values
+ *   .mapVertices( (id, attr) => 1.0 )
+ *
+ * def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ *   resetProb + (1.0 - resetProb) * msgSum
+ * def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
+ *   Some(edge.srcAttr * edge.attr)
+ * def messageCombiner(a: Double, b: Double): Double = a + b
+ * val initialMessage = 0.0 
+ * // Execute pregel for a fixed number of iterations.      
+ * Pregel(pagerankGraph, initialMessage, numIter)(
+ *   vertexProgram, sendMessage, messageCombiner)
+ * }}}
+ *
  */
 object Pregel {
 
 
-
-
   /**
-   * Execute the Pregel program.
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates a fixed number (`numIter`) of iterations.
    *
    * @tparam VD the vertex data type
    * @tparam ED the edge data type
    * @tparam A the Pregel message type
    *
-   * @param vprog a user supplied function that acts as the vertex program for
-   *              the Pregel computation. It takes the vertex ID of the vertex it is running on,
-   *              the accompanying data for that vertex, and the incoming data and returns the
-   *              new vertex value.
-   * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
-   *                between the vertex and one of its neighbors and produces a message to send
-   *                to that neighbor.
-   * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
-   *                 them into a single message of type A. ''This function must be commutative and
-   *                 associative.''
-   * @param initialMsg the message each vertex will receive at the beginning of the
-   *                   first iteration.
-   * @param numIter the number of iterations to run this computation for.
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to out edges
+   * of vertices that received messages in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
    *
    * @return the resulting graph at the end of the computation
    *
@@ -42,6 +95,64 @@ object Pregel {
       sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
       mergeMsg: (A, A) => A)
     : Graph[VD, ED] = {
+    apply(graph, initialMsg, numIter, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
+  } // end of Apply
+    
+
+  /**
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates a fixed number (`numIter`) of iterations.
+   *
+   * @tparam VD the vertex data type
+   * @tparam ED the edge data type
+   * @tparam A the Pregel message type
+   *
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   * 
+   * @param sendDir the edge direction along which the `sendMsg` function
+   * is invoked.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to each edge
+   * in the direction `sendDir` adjacent to vertices that received messages
+   * in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
+   *
+   * @return the resulting graph at the end of the computation
+   *
+   */
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], initialMsg: A, numIter: Int, sendDir: EdgeDirection)(
+      vprog: (Vid, VD, A) => VD,
+      sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+      mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
 
     def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
 
@@ -51,7 +162,7 @@ object Pregel {
     var i = 0
     while (i < numIter) {
       // compute the messages
-      val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
+      val messages = g.aggregateNeighbors(mapF, mergeMsg, sendDir.reverse)
       // receive the messages
       g = g.joinVertices(messages)(vprog)
       // count the iteration
@@ -63,25 +174,45 @@ object Pregel {
 
 
   /**
-   * Execute the Pregel program.
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates until there are no remaining messages.
    *
    * @tparam VD the vertex data type
    * @tparam ED the edge data type
    * @tparam A the Pregel message type
    *
-   * @param vprog a user supplied function that acts as the vertex program for
-   *              the Pregel computation. It takes the vertex ID of the vertex it is running on,
-   *              the accompanying data for that vertex, and the incoming data and returns the
-   *              new vertex value.
-   * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
-   *                between the vertex and one of its neighbors and produces a message to send
-   *                to that neighbor.
-   * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
-   *                 them into a single message of type A. ''This function must be commutative and
-   *                 associative.''
-   * @param initialMsg the message each vertex will receive at the beginning of the
-   *                   first iteration.
-   * @param numIter the number of iterations to run this computation for.
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to out edges
+   * of vertices that received messages in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
    *
    * @return the resulting graph at the end of the computation
    *
@@ -92,6 +223,64 @@ object Pregel {
       sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
       mergeMsg: (A, A) => A)
     : Graph[VD, ED] = {
+    apply(graph, initialMsg, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
+  } // end of apply
+
+
+  /**
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates until there are no remaining messages.
+   *
+   * @tparam VD the vertex data type
+   * @tparam ED the edge data type
+   * @tparam A the Pregel message type
+   *
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   * 
+   * @param sendDir the edge direction along which the `sendMsg` function
+   * is invoked.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to each edge
+   * in the direction `sendDir` adjacent to vertices that received messages
+   * in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
+   *
+   * @return the resulting graph at the end of the computation
+   *
+   */
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], initialMsg: A, sendDir: EdgeDirection)(
+      vprog: (Vid, VD, A) => VD,
+      sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+      mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
 
     def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = {
       msgOpt match {
@@ -114,7 +303,7 @@ object Pregel {
 
     var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) 
     // compute the messages
-    var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
+    var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
     var activeMessages = messages.count
     // Loop 
     var i = 0
@@ -123,7 +312,7 @@ object Pregel {
       g = g.outerJoinVertices(messages)(vprogFun)
       val oldMessages = messages
       // compute the messages
-      messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
+      messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
       activeMessages = messages.count
       // after counting we can unpersist the old messages
       oldMessages.unpersist(blocking=false)
-- 
GitLab