diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index dc1955a8353f8cf5a60ea3232839a5194437d9db..ddbd5becceb05f756e65d71d4401f0bc3dad13c1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -4,14 +4,54 @@ import org.apache.spark._
 
 
 
+/**
+ * The Analytics object contains a collection of basic graph analytics
+ * algorithms that operate largely on the graph structure. 
+ *
+ * In addition the Analytics object contains a driver `main` which can 
+ * be used to apply the various functions to graphs in standard formats.
+ */
 object Analytics extends Logging {
 
   /**
-   * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
+   * Run PageRank for a fixed number of iterations returning a graph 
+   * with vertex attributes containing the PageRank and edge attributes
+   * the normalized edge weight.
+   * 
+   * The following PageRank fixed point is computed for each vertex. 
+   *
+   * {{{
+   * var PR = Array.fill(n)( 1.0 )
+   * val oldPR = Array.fill(n)( 1.0 )
+   * for( iter <- 0 until numIter ) {
+   *   swap(oldPR, PR)
+   *   for( i <- 0 until n ) {
+   *     PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
+   *   }
+   * }
+   * }}}
+   *
+   * where `alpha` is the random reset probability (typically 0.15), 
+   * `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]`
+   * is the out degree of vertex `j`.
+   * 
+   * Note that this is not the "normalized" PageRank and as a consequence
+   * pages that have no inlinks will have a PageRank of alpha.  
+   *
+   * @tparam VD the original vertex attribute (not used)
+   * @tparam ED the original edge attribute (not used)
+   *
+   * @param graph the graph on which to compute PageRank
+   * @param numIter the number of iterations of PageRank to run
+   * @param resetProb the random reset probability (alpha)
+   *
+   * @return the graph containing with each vertex containing the PageRank and
+   * each edge containing the normalized weight.
+   *
    */
-  def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
-                                           numIter: Int,
-                                           resetProb: Double = 0.15): Graph[Double, Double] = {
+  def pagerank[VD: Manifest, ED: Manifest](
+    graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): 
+    Graph[Double, Double] = {
 
     /**
      * Initialize the pagerankGraph with each edge attribute 
@@ -45,12 +85,42 @@ object Analytics extends Logging {
       vertexProgram, sendMessage, messageCombiner)
   }
 
-
   /**
-   * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
+   * Run a dynamic version of PageRank returning a graph with vertex attributes 
+   * containing the PageRank and edge attributes containing the normalized 
+   * edge weight.
+   *
+   * {{{
+   * var PR = Array.fill(n)( 1.0 )
+   * val oldPR = Array.fill(n)( 0.0 )
+   * while( max(abs(PR - oldPr)) > tol ) {
+   *   swap(oldPR, PR)
+   *   for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
+   *     PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
+   *   }
+   * }
+   * }}}
+   *
+   * where `alpha` is the random reset probability (typically 0.15), 
+   * `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]`
+   * is the out degree of vertex `j`.
+   * 
+   * Note that this is not the "normalized" PageRank and as a consequence
+   * pages that have no inlinks will have a PageRank of alpha.  
+   *
+   * @tparam VD the original vertex attribute (not used)
+   * @tparam ED the original edge attribute (not used)
+   *
+   * @param graph the graph on which to compute PageRank
+   * @param tol the tolerance allowed at convergence (smaller => more accurate). 
+   * @param resetProb the random reset probability (alpha)
+   *
+   * @return the graph containing with each vertex containing the PageRank and
+   * each edge containing the normalized weight.
    */
   def deltaPagerank[VD: Manifest, ED: Manifest](
-    graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
+    graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): 
+    Graph[Double, Double] = {
 
     /**
      * Initialize the pagerankGraph with each edge attribute 
@@ -89,22 +159,7 @@ object Analytics extends Logging {
     Pregel(pagerankGraph, initialMessage)(
       vertexProgram, sendMessage, messageCombiner)
       .mapVertices( (vid, attr) => attr._1 )
-
-
-    // // Compute the out degree of each vertex
-    // val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
-    //   (id, data, degIter) => (degIter.sum, 1.0, 1.0)
-    // }
-    
-    // // Run PageRank
-    // GraphLab.iterate(pagerankGraph)(
-    //   (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
-    //   (a: Double, b: Double) => a + b,
-    //   (id, data, a: Option[Double]) =>
-    //     (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
-    //   (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
-    //   maxIter).mapVertices { case (vid, data) => data._2 }
-  }
+  } // end of deltaPageRank
 
 
   /**
@@ -112,19 +167,36 @@ object Analytics extends Logging {
    * and return an RDD with the vertex value containing the
    * lowest vertex id in the connected component containing
    * that vertex.
+   *
+   * @tparam VD the vertex attribute type (discarded in the computation)
+   * @tparam ED the edge attribute type (preserved in the computation)
+   *
+   * @param graph the graph for which to compute the connected components 
+   *
+   * @return a graph with vertex attributes containing the smallest vertex
+   * in each connected component
    */
-  def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
+  def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): 
+    Graph[Vid, ED] = {
     val ccGraph = graph.mapVertices { case (vid, _) => vid }
-    GraphLab.iterate(ccGraph)(
-      (me_id, edge) => edge.otherVertexAttr(me_id), // gather
-      (a: Vid, b: Vid) => math.min(a, b), // merge
-      (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply
-      (me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id)), // scatter
-      numIter,
-      gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
-    )
-  }
+
+    def sendMessage(id: Vid, edge: EdgeTriplet[Vid, ED]): Option[Vid] = {
+      val thisAttr = edge.vertexAttr(id)
+      val otherAttr = edge.otherVertexAttr(id)
+      if(thisAttr < otherAttr) { Some(thisAttr) }
+      else { None }
+    }
+
+    val initialMessage = Long.MaxValue
+    Pregel(ccGraph, initialMessage, EdgeDirection.Both)(
+      (id, attr, msg) => math.min(attr, msg),
+      sendMessage, 
+      (a,b) => math.min(a,b)
+      )
+  } // end of connectedComponents 
+
   
+
   def main(args: Array[String]) = {
     val host = args(0)
     val taskType = args(1)
@@ -238,7 +310,7 @@ object Analytics extends Logging {
            //val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
            val graph = GraphLoader.textFile(sc, fname, a => 1.0F, 
             minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
-           val cc = Analytics.connectedComponents(graph, numIter)
+           val cc = Analytics.connectedComponents(graph)
            //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
            //         else Analytics.connectedComponents(graph, numIter)
            println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
index 99af2d54580bf5b23b4e0b6996c424003f4f16aa..a1468a152baa9dacac7d2d0e919994533bf91002 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
@@ -6,9 +6,13 @@ package org.apache.spark.graph
  * the set of adjacent neighbors when running a neighborhood query.
  */
 sealed abstract class EdgeDirection {
+  /**
+   * Reverse the direction of an edge.  An in becomes out, 
+   * out becomes in and both remains both.
+   */
   def reverse: EdgeDirection = this match {
-    case EdgeDirection.In   => EdgeDirection.In
-    case EdgeDirection.Out  => EdgeDirection.Out
+    case EdgeDirection.In   => EdgeDirection.Out
+    case EdgeDirection.Out  => EdgeDirection.In
     case EdgeDirection.Both => EdgeDirection.Both
   }
 }
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index 2f2a624592de12e41fdc2fd4b82b91b8ec87f3fb..b8503ab7fdb6c266161f31b3308bf7fcd554c1f8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -33,14 +33,14 @@ object GraphLab {
    * @tparam A The type accumulated during the gather phase
    * @return the resulting graph after the algorithm converges
    */
-  def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
-    gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
-    mergeFunc: (A, A) => A,
-    applyFunc: (Vid, VD, Option[A]) => VD,
-    scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
-    numIter: Int = Integer.MAX_VALUE,
-    gatherDirection: EdgeDirection = EdgeDirection.In,
-    scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = {
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], numIter: Int,  
+     gatherDirection: EdgeDirection = EdgeDirection.In,
+     scatterDirection: EdgeDirection = EdgeDirection.Out)
+    (gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
+     mergeFunc: (A, A) => A,
+     applyFunc: (Vid, VD, Option[A]) => VD,
+     scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean): Graph[VD, ED] = {
 
 
     // Add an active attribute to all vertices to track convergence.
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 94dc806fc28147e6d1cfe9956aabcfad6e4501f6..729eaa7eaef907ada1b288e2ce15f3154b4a8426 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -4,34 +4,87 @@ import org.apache.spark.rdd.RDD
 
 
 /**
- * This object implements the Pregel bulk-synchronous
- * message-passing API.
+ * This object implements a Pregel-like bulk-synchronous
+ * message-passing API.  However, unlike the original Pregel API
+ * the GraphX pregel API factors the sendMessage computation over
+ * edges, enables the message sending computation to read both
+ * vertex attributes, and finally constrains messages to the graph
+ * structure.  These changes allow for substantially more efficient
+ * distributed execution while also exposing greater flexibility
+ * for graph based computation.  
+ *
+ * This object present several variants of the bulk synchronous
+ * execution that differ only in the edge direction along which
+ * messages are sent and whether a fixed number of iterations 
+ * is used. 
+ *
+ * @example We can use the Pregel abstraction to implement PageRank
+ * {{{
+ * val pagerankGraph: Graph[Double, Double] = graph
+ *   // Associate the degree with each vertex
+ *   .outerJoinVertices(graph.outDegrees){
+ *     (vid, vdata, deg) => deg.getOrElse(0)
+ *   }
+ *   // Set the weight on the edges based on the degree
+ *   .mapTriplets( e => 1.0 / e.srcAttr )
+ *   // Set the vertex attributes to the initial pagerank values
+ *   .mapVertices( (id, attr) => 1.0 )
+ *
+ * def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ *   resetProb + (1.0 - resetProb) * msgSum
+ * def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
+ *   Some(edge.srcAttr * edge.attr)
+ * def messageCombiner(a: Double, b: Double): Double = a + b
+ * val initialMessage = 0.0 
+ * // Execute pregel for a fixed number of iterations.      
+ * Pregel(pagerankGraph, initialMessage, numIter)(
+ *   vertexProgram, sendMessage, messageCombiner)
+ * }}}
+ *
  */
 object Pregel {
 
 
-
-
   /**
-   * Execute the Pregel program.
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates a fixed number (`numIter`) of iterations.
    *
    * @tparam VD the vertex data type
    * @tparam ED the edge data type
    * @tparam A the Pregel message type
    *
-   * @param vprog a user supplied function that acts as the vertex program for
-   *              the Pregel computation. It takes the vertex ID of the vertex it is running on,
-   *              the accompanying data for that vertex, and the incoming data and returns the
-   *              new vertex value.
-   * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
-   *                between the vertex and one of its neighbors and produces a message to send
-   *                to that neighbor.
-   * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
-   *                 them into a single message of type A. ''This function must be commutative and
-   *                 associative.''
-   * @param initialMsg the message each vertex will receive at the beginning of the
-   *                   first iteration.
-   * @param numIter the number of iterations to run this computation for.
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to out edges
+   * of vertices that received messages in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
    *
    * @return the resulting graph at the end of the computation
    *
@@ -42,6 +95,64 @@ object Pregel {
       sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
       mergeMsg: (A, A) => A)
     : Graph[VD, ED] = {
+    apply(graph, initialMsg, numIter, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
+  } // end of Apply
+    
+
+  /**
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates a fixed number (`numIter`) of iterations.
+   *
+   * @tparam VD the vertex data type
+   * @tparam ED the edge data type
+   * @tparam A the Pregel message type
+   *
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   * 
+   * @param sendDir the edge direction along which the `sendMsg` function
+   * is invoked.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to each edge
+   * in the direction `sendDir` adjacent to vertices that received messages
+   * in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
+   *
+   * @return the resulting graph at the end of the computation
+   *
+   */
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], initialMsg: A, numIter: Int, sendDir: EdgeDirection)(
+      vprog: (Vid, VD, A) => VD,
+      sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+      mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
 
     def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
 
@@ -51,7 +162,7 @@ object Pregel {
     var i = 0
     while (i < numIter) {
       // compute the messages
-      val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
+      val messages = g.aggregateNeighbors(mapF, mergeMsg, sendDir.reverse)
       // receive the messages
       g = g.joinVertices(messages)(vprog)
       // count the iteration
@@ -63,25 +174,45 @@ object Pregel {
 
 
   /**
-   * Execute the Pregel program.
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates until there are no remaining messages.
    *
    * @tparam VD the vertex data type
    * @tparam ED the edge data type
    * @tparam A the Pregel message type
    *
-   * @param vprog a user supplied function that acts as the vertex program for
-   *              the Pregel computation. It takes the vertex ID of the vertex it is running on,
-   *              the accompanying data for that vertex, and the incoming data and returns the
-   *              new vertex value.
-   * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
-   *                between the vertex and one of its neighbors and produces a message to send
-   *                to that neighbor.
-   * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
-   *                 them into a single message of type A. ''This function must be commutative and
-   *                 associative.''
-   * @param initialMsg the message each vertex will receive at the beginning of the
-   *                   first iteration.
-   * @param numIter the number of iterations to run this computation for.
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to out edges
+   * of vertices that received messages in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
    *
    * @return the resulting graph at the end of the computation
    *
@@ -92,6 +223,64 @@ object Pregel {
       sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
       mergeMsg: (A, A) => A)
     : Graph[VD, ED] = {
+    apply(graph, initialMsg, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
+  } // end of apply
+
+
+  /**
+   * Execute a Pregel-like iterative vertex-parallel abstraction.  
+   * The user-defined vertex-program `vprog` is executed in parallel
+   * on each vertex receiving any inbound messages and computing a new
+   * value for the vertex.  The `sendMsg` function is then invoked on
+   * all out-edges and is used to compute an optional message to the 
+   * destination vertex. The `mergeMsg` function is a commutative 
+   * associative function used to combine messages destined to the
+   * same vertex. 
+   *
+   * On the first iteration all vertices receive the `initialMsg` and
+   * on subsequent iterations if a vertex does not receive a message then
+   * the vertex-program is not invoked. 
+   *
+   * This function iterates until there are no remaining messages.
+   *
+   * @tparam VD the vertex data type
+   * @tparam ED the edge data type
+   * @tparam A the Pregel message type
+   *
+   * @param graph the input graph. 
+   *
+   * @param initialMsg the message each vertex will receive at the 
+   * on the first iteration.
+   *
+   * @param numIter the number of iterations to run this computation.
+   * 
+   * @param sendDir the edge direction along which the `sendMsg` function
+   * is invoked.
+   *
+   * @param vprog the user-defined vertex program which runs on each vertex
+   * and receives the inbound message and computes a new vertex value. 
+   * On the first iteration the vertex program is invoked on all vertices
+   * and is passed the default message.  On subsequent iterations the 
+   * vertex program is only invoked on those vertices that receive messages.
+   *
+   * @param sendMsg a user supplied function that is applied to each edge
+   * in the direction `sendDir` adjacent to vertices that received messages
+   * in the current iteration. 
+   *
+   * @param mergeMsg a user supplied function that takes two incoming messages 
+   * of type A and merges them into a single message of type A. 
+   * ''This function must be commutative and associative and ideally the 
+   * size of A should not increase.''
+   *
+   * @return the resulting graph at the end of the computation
+   *
+   */
+  def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+    (graph: Graph[VD, ED], initialMsg: A, sendDir: EdgeDirection)(
+      vprog: (Vid, VD, A) => VD,
+      sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+      mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
 
     def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = {
       msgOpt match {
@@ -114,7 +303,7 @@ object Pregel {
 
     var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) 
     // compute the messages
-    var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
+    var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
     var activeMessages = messages.count
     // Loop 
     var i = 0
@@ -123,7 +312,7 @@ object Pregel {
       g = g.outerJoinVertices(messages)(vprogFun)
       val oldMessages = messages
       // compute the messages
-      messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
+      messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
       activeMessages = messages.count
       // after counting we can unpersist the old messages
       oldMessages.unpersist(blocking=false)
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index f4a8c6b4c9f4d81da8f11394cbe030cd1fdb17fc..8d0b2e0b02b75475fa2b3cbe434784417f742359 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -79,6 +79,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
   } // end of test Star PageRank
 
 
+
   test("Grid PageRank") {
     withSpark(new SparkContext("local", "test")) { sc =>
       val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
@@ -104,4 +105,68 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
   } // end of Grid PageRank
 
 
+  test("Grid Connected Components") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
+      val ccGraph = Analytics.connectedComponents(gridGraph).cache()
+      val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+      assert(maxCCid === 0)
+    }
+  } // end of Grid connected components
+
+
+  test("Reverse Grid Connected Components") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
+      val ccGraph = Analytics.connectedComponents(gridGraph).cache()
+      val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+      assert(maxCCid === 0)
+    }
+  } // end of Grid connected components
+
+
+  test("Chain Connected Components") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val chain1 = (0 until 9).map(x => (x, x+1) )
+      val chain2 = (10 until 20).map(x => (x, x+1) )
+      val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+      val twoChains = Graph(rawEdges)
+      val ccGraph = Analytics.connectedComponents(twoChains).cache()
+      val vertices = ccGraph.vertices.collect
+      for ( (id, cc) <- vertices ) {
+        if(id < 10) { assert(cc === 0) }
+        else { assert(cc === 10) }
+      }
+      val ccMap = vertices.toMap
+      println(ccMap)
+      for( id <- 0 until 20 ) {
+        if(id < 10) { assert(ccMap(id) === 0) }
+        else { assert(ccMap(id) === 10) }
+      }
+    }
+  } // end of chain connected components
+
+  test("Reverse Chain Connected Components") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val chain1 = (0 until 9).map(x => (x, x+1) )
+      val chain2 = (10 until 20).map(x => (x, x+1) )
+      val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+      val twoChains = Graph(rawEdges).reverse
+      val ccGraph = Analytics.connectedComponents(twoChains).cache()
+      val vertices = ccGraph.vertices.collect
+      for ( (id, cc) <- vertices ) {
+        if(id < 10) { assert(cc === 0) }
+        else { assert(cc === 10) }
+      }
+      val ccMap = vertices.toMap
+      println(ccMap)
+      for( id <- 0 until 20 ) {
+        if(id < 10) { assert(ccMap(id) === 0) }
+        else { assert(ccMap(id) === 10) }
+      }
+    }
+  } // end of chain connected components
+
+
+
 } // end of AnalyticsSuite