diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 50a44e51e5d2d44e830305c9c63593043d73ca95..342151173a63674a3ae294ac9768f0e1bc3d207d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -184,13 +184,42 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** - * @todo document function + * groupEdgeTriplets is used to merge multiple edges that have the + * same source and destination vertex into a single edge. The user + * supplied function is applied to each directed pair of vertices (u, v) and + * has access to all EdgeTriplets + * + * {e: for all e in E where e.src = u and e.dst = v} + * + * This function is identical to [[org.apache.spark.graph.Graph.groupEdges]] + * except that this function + * provides the user-supplied function with an iterator over EdgeTriplets, + * which contain the vertex data, whereas groupEdges provides the user-supplied + * function with an iterator over Edges, which only contain the vertex IDs. + * + * @tparam ED2 the type of the resulting edge data after grouping + * + * @param f the user supplied function to merge multiple EdgeTriplets + * into a single ED2 object + * + * @return Graph[VD,ED2] The resulting graph with a single Edge for each + * source, dest vertex pair. + * */ def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] /** - * @todo document function + * This function merges multiple edges between two vertices into a single + * Edge. See [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more detail. + * + * @tparam ED2 the type of the resulting edge data after grouping. + * + * @param f the user supplied function to merge multiple Edges + * into a single ED2 object. + * + * @return Graph[VD,ED2] The resulting graph with a single Edge for each + * source, dest vertex pair. */ def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2] diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index 8ba708ba3290b92ec1b0fcac46e2ff8265b29660..2f2a624592de12e41fdc2fd4b82b91b8ec87f3fb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -4,7 +4,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.rdd.RDD /** - * This object implement the graphlab gather-apply-scatter api. + * This object implements the GraphLab gather-apply-scatter api. */ object GraphLab { diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 052f9acdeb6b3f8fed4406c794b955b331663060..76f69edf0e22fb60a75ea79924b4086de195dda2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -9,7 +9,18 @@ import org.apache.spark.graph.impl.GraphImpl object GraphLoader { /** - * Load an edge list from file initializing the Graph RDD + * Load an edge list from file initializing the Graph + * + * @tparam ED the type of the edge data of the resulting Graph + * + * @param sc the SparkContext used to construct RDDs + * @param path the path to the text file containing the edge list + * @param edgeParser a function that takes an array of strings and + * returns an ED object + * @param minEdgePartitions the number of partitions for the + * the Edge RDD + * + * @todo remove minVertexPartitions arg */ def textFile[ED: ClassManifest]( sc: SparkContext, @@ -38,14 +49,10 @@ object GraphLoader { }.cache() val graph = fromEdges(edges) - // println("Loaded graph:" + - // "\n\t#edges: " + graph.numEdges + - // "\n\t#vertices: " + graph.numVertices) - graph } - def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { + private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) } .reduceByKey(_ + _) .map{ case (vid, degree) => (vid, degree) } diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 92198a4995c8e26cf442f98c5c77df12ba717371..5e8f082fdad8aef9b9661f5d1f5ead42f1d1e0a2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -94,11 +94,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { } // end of aggregateNeighbors - - - - - def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { val nbrs = graph.aggregateNeighbors[Array[Vid]]( (vid, edge) => Some(Array(edge.otherVertexId(vid))), diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 065d196ff651d06c8de68f94d1d3d5c8cc0b184e..7ad6fda2a4570e4ca43378ae4db626660578babd 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -3,8 +3,37 @@ package org.apache.spark.graph import org.apache.spark.rdd.RDD +/** + * This object implements the Pregel bulk-synchronous + * message-passing API. + */ object Pregel { + + /** + * Execute the Pregel program. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param vprog a user supplied function that acts as the vertex program for + * the Pregel computation. It takes the vertex ID of the vertex it is running on, + * the accompanying data for that vertex, and the incoming data and returns the + * new vertex value. + * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet + * between the vertex and one of its neighbors and produces a message to send + * to that neighbor. + * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges + * them into a single message of type A. ''This function must be commutative and + * associative.'' + * @param initialMsg the message each vertex will receive at the beginning of the + * first iteration. + * @param numIter the number of iterations to run this computation for. + * + * @return the resulting graph at the end of the computation + * + */ def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( vprog: (Vid, VD, A) => VD, sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 413177b2dae63e69896390bd6ac3f5699347c944..a6604b978f9962b99431598bea9add18171a4c0e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -261,71 +261,27 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } - // Because of the edgepartitioner, we know that all edges with the same src and dst - // will be in the same partition - - // We will want to keep the same partitioning scheme. Use newGraph() rather than - // new GraphImpl() - // TODO(crankshaw) is there a better way to do this using RDD.groupBy() - // functions? - override def groupEdgeTriplets[ED2: ClassManifest]( f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = { - //override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): - - // I think that - // myRDD.mapPartitions { part => - // val (vmap, edges) = part.next() - // gives me access to the vertex map and the set of - // edges within that partition - - // This is what happens during mapPartitions - // The iterator iterates over all partitions - // val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U]) - - // TODO(crankshaw) figure out how to actually get the new Edge RDD and what - // type that should have val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter => - // toList lets us operate on all EdgeTriplets in a single partition at once partIter + // TODO(crankshaw) toList requires that the entire edge partition + // can fit in memory right now. .toList // groups all ETs in this partition that have the same src and dst // Because all ETs with the same src and dst will live on the same // partition due to the EdgePartitioner, this guarantees that these // ET groups will be complete. .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } - //.groupBy { e => (e.src, e.dst) } - // Apply the user supplied supplied edge group function to - // each group of edges - // The result of this line is Map[(Long, Long, ED2] .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } - // convert the resulting map back to a list of tuples .toList - // TODO(crankshaw) needs an iterator over the tuples? - // Why can't I map over the list? - .toIterator - // map over those tuples that contain src and dst info plus the - // new edge data to make my new edges .map { case ((src, dst), data) => Edge(src, dst, data) } - - // How do I convert from a scala map to a list? - // I want to be able to apply a function like: - // f: (key, value): (K, V) => result: [R] - // so that I can transfrom a Map[K, V] to List[R] - - // Maybe look at collections.breakOut - // see http://stackoverflow.com/questions/1715681/scala-2-8-breakout - // and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list - } - // @todo eliminate the need to call createETable + //TODO(crankshaw) eliminate the need to call createETable val newETable = createETable(newEdges, eTable.index.partitioner.numPartitions) - - new GraphImpl(vTable, vid2pid, localVidMap, newETable) - } @@ -340,7 +296,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .toIterator .map { case ((src, dst), data) => Edge(src, dst, data) } } - // @todo eliminate the need to call createETable + // TODO(crankshaw) eliminate the need to call createETable val newETable = createETable(newEdges, eTable.index.partitioner.numPartitions) @@ -654,7 +610,8 @@ object GraphImpl { /** - * @todo(crankshaw) how does this effect load balancing? + * @todo This will only partition edges to the upper diagonal + * of the 2D processor space. */ protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, numParts: Pid, ceilSqrtNumParts: Pid): Pid = { diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index d75a678b269492339744345479f721b5efa069a2..061cce99b6e3d70f9b5e13668f102c60a095d4ed 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -16,9 +16,9 @@ import org.apache.spark.graph.Graph import org.apache.spark.graph.Edge import org.apache.spark.graph.impl.GraphImpl - -// TODO(crankshaw) I might want to pull at least RMAT out into a separate class. -// Might simplify the code to have classwide variables and such. +/** + * @todo(crankshaw) cleanup and modularize code + */ object GraphGenerators { val RMATa = 0.45 @@ -236,8 +236,6 @@ object GraphGenerators { } } - - }