diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index b3253934e6c8e400e68c11e3274ecd30684fc7bc..89e1b4ea01a81fa14a1a4d721853e871661b23f6 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -7,13 +7,19 @@ import org.apache.spark.util.ClosureCleaner /** - * The Graph abstractly represents a graph with arbitrary objects associated - * with vertices and edges. The graph provides basic operations to access and - * manipulate the data associated with vertices and edges as well as the - * underlying structure. Like Spark RDDs, the graph is a functional - * data-structure in which mutating operations return new graphs. + * The Graph abstractly represents a graph with arbitrary objects + * associated with vertices and edges. The graph provides basic + * operations to access and manipulate the data associated with + * vertices and edges as well as the underlying structure. Like Spark + * RDDs, the graph is a functional data-structure in which mutating + * operations return new graphs. + * + * @see GraphOps for additional graph member functions. * - * @note The majority of the graph operations are implemented in `GraphOps`. + * @note The majority of the graph operations are implemented in + * `GraphOps`. All the convenience operations are defined in the + * `GraphOps` class which may be shared across multiple graph + * implementations. * * @tparam VD the vertex attribute type * @tparam ED the edge attribute type @@ -32,28 +38,28 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { val vertices: VertexSetRDD[VD] /** - * Get the Edges and their data as an RDD. The entries in the RDD contain - * just the source id and target id along with the edge data. + * Get the Edges and their data as an RDD. The entries in the RDD + * contain just the source id and target id along with the edge + * data. * * @return An RDD containing the edges in this graph * * @see Edge for the edge type. - * @see edgesWithVertices to get an RDD which contains all the edges along - * with their vertex data. + * @see edgesWithVertices to get an RDD which contains all the edges + * along with their vertex data. * - * @todo Should edges return 3 tuples instead of Edge objects? In this case - * we could rename EdgeTriplet to Edge? */ val edges: RDD[Edge[ED]] /** - * Get the edges with the vertex data associated with the adjacent pair of - * vertices. + * Get the edges with the vertex data associated with the adjacent + * pair of vertices. * * @return An RDD containing edge triplets. * - * @example This operation might be used to evaluate a graph coloring where - * we would like to check that both vertices are a different color. + * @example This operation might be used to evaluate a graph + * coloring where we would like to check that both vertices are a + * different color. * {{{ * type Color = Int * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") @@ -61,15 +67,16 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * .map(e => if(e.src.data == e.dst.data) 1 else 0).sum * }}} * - * @see edges() If only the edge data and adjacent vertex ids are required. + * @see edges() If only the edge data and adjacent vertex ids are + * required. * */ val triplets: RDD[EdgeTriplet[VD, ED]] /** - * Return a graph that is cached when first created. This is used to pin a - * graph in memory enabling multiple queries to reuse the same construction - * process. + * Return a graph that is cached when first created. This is used to + * pin a graph in memory enabling multiple queries to reuse the same + * construction process. * * @see RDD.cache() for a more detailed explanation of caching. */ @@ -84,19 +91,19 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** - * Construct a new graph where each vertex value has been transformed by the - * map function. + * Construct a new graph where each vertex value has been + * transformed by the map function. * - * @note This graph is not changed and that the new graph has the same - * structure. As a consequence the underlying index structures can be - * reused. + * @note This graph is not changed and that the new graph has the + * same structure. As a consequence the underlying index structures + * can be reused. * * @param map the function from a vertex object to a new vertex value. * * @tparam VD2 the new vertex data type * - * @example We might use this operation to change the vertex values from one - * type to another to initialize an algorithm. + * @example We might use this operation to change the vertex values + * from one type to another to initialize an algorithm. * {{{ * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") * val root = 42 @@ -108,40 +115,42 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED] /** - * Construct a new graph where each the value of each edge is transformed by - * the map operation. This function is not passed the vertex value for the - * vertices adjacent to the edge. If vertex values are desired use the - * mapTriplets function. + * Construct a new graph where each the value of each edge is + * transformed by the map operation. This function is not passed + * the vertex value for the vertices adjacent to the edge. If + * vertex values are desired use the mapTriplets function. * - * @note This graph is not changed and that the new graph has the same - * structure. As a consequence the underlying index structures can be - * reused. + * @note This graph is not changed and that the new graph has the + * same structure. As a consequence the underlying index structures + * can be reused. * * @param map the function from an edge object to a new edge value. * * @tparam ED2 the new edge data type * - * @example This function might be used to initialize edge attributes. + * @example This function might be used to initialize edge + * attributes. * */ def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2] /** - * Construct a new graph where each the value of each edge is transformed by - * the map operation. This function passes vertex values for the adjacent - * vertices to the map function. If adjacent vertex values are not required, - * consider using the mapEdges function instead. + * Construct a new graph where each the value of each edge is + * transformed by the map operation. This function passes vertex + * values for the adjacent vertices to the map function. If + * adjacent vertex values are not required, consider using the + * mapEdges function instead. * - * @note This graph is not changed and that the new graph has the same - * structure. As a consequence the underlying index structures can be - * reused. + * @note This graph is not changed and that the new graph has the + * same structure. As a consequence the underlying index structures + * can be reused. * * @param map the function from an edge object to a new edge value. * * @tparam ED2 the new edge data type * - * @example This function might be used to initialize edge attributes based - * on the attributes associated with each vertex. + * @example This function might be used to initialize edge + * attributes based on the attributes associated with each vertex. * {{{ * val rawGraph: Graph[Int, Int] = someLoadFunction() * val graph = rawGraph.mapTriplets[Int]( edge => @@ -154,30 +163,35 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** - * Construct a new graph with all the edges reversed. If this graph contains - * an edge from a to b then the returned graph contains an edge from b to a. + * Construct a new graph with all the edges reversed. If this graph + * contains an edge from a to b then the returned graph contains an + * edge from b to a. * */ def reverse: Graph[VD, ED] /** - * This function takes a vertex and edge predicate and constructs the subgraph - * that consists of vertices and edges that satisfy the predict. The resulting - * graph contains the vertices and edges that satisfy: + * This function takes a vertex and edge predicate and constructs + * the subgraph that consists of vertices and edges that satisfy the + * predict. The resulting graph contains the vertices and edges + * that satisfy: * + * {{{ * V' = {v : for all v in V where vpred(v)} * E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)} + * }}} * - * @param epred the edge predicate which takes a triplet and evaluates to true - * if the edge is to remain in the subgraph. Note that only edges in which both - * vertices satisfy the vertex predicate are considered. + * @param epred the edge predicate which takes a triplet and + * evaluates to true if the edge is to remain in the subgraph. Note + * that only edges in which both vertices satisfy the vertex + * predicate are considered. * - * @param vpred the vertex predicate which takes a vertex object and evaluates - * to true if the vertex is to be included in the subgraph + * @param vpred the vertex predicate which takes a vertex object and + * evaluates to true if the vertex is to be included in the subgraph * - * @return the subgraph containing only the vertices and edges that satisfy the - * predicates. + * @return the subgraph containing only the vertices and edges that + * satisfy the predicates. */ def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] @@ -187,16 +201,17 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** * groupEdgeTriplets is used to merge multiple edges that have the * same source and destination vertex into a single edge. The user - * supplied function is applied to each directed pair of vertices (u, v) and - * has access to all EdgeTriplets + * supplied function is applied to each directed pair of vertices + * (u, v) and has access to all EdgeTriplets * * {e: for all e in E where e.src = u and e.dst = v} * - * This function is identical to [[org.apache.spark.graph.Graph.groupEdges]] - * except that this function - * provides the user-supplied function with an iterator over EdgeTriplets, - * which contain the vertex data, whereas groupEdges provides the user-supplied - * function with an iterator over Edges, which only contain the vertex IDs. + * This function is identical to + * [[org.apache.spark.graph.Graph.groupEdges]] except that this + * function provides the user-supplied function with an iterator + * over EdgeTriplets, which contain the vertex data, whereas + * groupEdges provides the user-supplied function with an iterator + * over Edges, which only contain the vertex IDs. * * @tparam ED2 the type of the resulting edge data after grouping * @@ -211,35 +226,38 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** - * This function merges multiple edges between two vertices into a single - * Edge. See [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more detail. + * This function merges multiple edges between two vertices into a + * single Edge. See + * [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more + * detail. * * @tparam ED2 the type of the resulting edge data after grouping. * * @param f the user supplied function to merge multiple Edges * into a single ED2 object. * - * @return Graph[VD,ED2] The resulting graph with a single Edge for each - * source, dest vertex pair. + * @return Graph[VD,ED2] The resulting graph with a single Edge for + * each source, dest vertex pair. */ def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2] /** - * The mapReduceTriplets function is used to compute statistics about - * the neighboring edges and vertices of each vertex. The user supplied - * `mapFunc` function is invoked on each edge of the graph generating 0 or - * more "messages" to be "sent" to either vertex in the edge. - * The `reduceFunc` is then used to combine the output of the map phase - * destined to each vertex. + * The mapReduceTriplets function is used to compute statistics + * about the neighboring edges and vertices of each vertex. The + * user supplied `mapFunc` function is invoked on each edge of the + * graph generating 0 or more "messages" to be "sent" to either + * vertex in the edge. The `reduceFunc` is then used to combine the + * output of the map phase destined to each vertex. * * @tparam A the type of "message" to be sent to each vertex * - * @param mapFunc the user defined map function which returns 0 or + * @param mapFunc the user defined map function which returns 0 or * more messages to neighboring vertices. - * @param reduceFunc the user defined reduce function which should be - * commutative and assosciative and is used to combine the output of - * the map phase. + * + * @param reduceFunc the user defined reduce function which should + * be commutative and assosciative and is used to combine the output + * of the map phase. * * @example We can use this function to compute the inDegree of each * vertex @@ -249,10 +267,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _) * }}} * - * @note By expressing computation at the edge level we achieve maximum - * parallelism. This is one of the core functions in the Graph API in that enables - * neighborhood level computation. For example this function can be used to - * count neighbors satisfying a predicate or implement PageRank. + * @note By expressing computation at the edge level we achieve + * maximum parallelism. This is one of the core functions in the + * Graph API in that enables neighborhood level computation. For + * example this function can be used to count neighbors satisfying a + * predicate or implement PageRank. * */ def mapReduceTriplets[A: ClassManifest]( @@ -262,23 +281,25 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** - * Join the vertices with an RDD and then apply a function from the the - * vertex and RDD entry to a new vertex value and type. - * The input table should contain at most one entry for each vertex. - * If no entry is provided the map function is invoked passing none. + * Join the vertices with an RDD and then apply a function from the + * the vertex and RDD entry to a new vertex value and type. The + * input table should contain at most one entry for each vertex. If + * no entry is provided the map function is invoked passing none. * * @tparam U the type of entry in the table of updates * @tparam VD2 the new vertex value type * - * @param table the table to join with the vertices in the graph. The table - * should contain at most one entry for each vertex. - * @param mapFunc the function used to compute the new vertex values. The - * map function is invoked for all vertices, even those that do not have a - * corresponding entry in the table. + * @param table the table to join with the vertices in the graph. + * The table should contain at most one entry for each vertex. + * + * @param mapFunc the function used to compute the new vertex + * values. The map function is invoked for all vertices, even those + * that do not have a corresponding entry in the table. * - * @example This function is used to update the vertices with new values - * based on external data. For example we could add the out degree to each - * vertex record + * @example This function is used to update the vertices with new + * values based on external data. For example we could add the out + * degree to each vertex record + * * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("webgraph") * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees() @@ -296,29 +317,50 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { // Save a copy of the GraphOps object so there is always one unique GraphOps object // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. val ops = new GraphOps(this) -} +} // end of Graph + + /** - * The Graph Singleton contains basic routines to create graphs + * The Graph object contains a collection of routines used to + * construct graphs from RDDs. + * */ object Graph { import org.apache.spark.graph.impl._ import org.apache.spark.SparkContext._ + /** + * Construct a graph from a collection of edges encoded as vertex id + * pairs. Duplicate directed edges are merged to a single edge with + * weight equal to the number of duplicate edges. The returned + * vertex attribute is the number of edges adjacent to that vertex + * (i.e., the undirected degree). + * + * @param rawEdges the RDD containing the set of edges in the graph + * + * @return a graph with edge attributes containing the count of + * duplicate edges and vertex attributes containing the total degree + * of each vertex. + */ def apply(rawEdges: RDD[(Vid, Vid)]): Graph[Int, Int] = { Graph(rawEdges, true) } /** - * Construct a graph from a list of Edges. + * Construct a graph from a collection of edges encoded as vertex id + * pairs. * * @param rawEdges a collection of edges in (src,dst) form. - * @param uniqueEdges if multiple identical edges are found they are combined - * and the edge attribute is set to the sum. Otherwise duplicate edges are - * treated as separate. + * @param uniqueEdges if multiple identical edges are found they are + * combined and the edge attribute is set to the sum. Otherwise + * duplicate edges are treated as separate. + * + * @return a graph with edge attributes containing either the count + * of duplicate edges or 1 (if `uniqueEdges=false`) and vertex + * attributes containing the total degree of each vertex. * - * */ def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean): Graph[Int, Int] = { // Reduce to unique edges. @@ -338,6 +380,20 @@ object Graph { } + /** + * Construct a graph from a collection attributed vertices and + * edges. + * + * @note Duplicate vertices are removed arbitrarily and missing + * vertices (vertices in the edge collection that are not in the + * vertex collection) are replaced by null vertex attributes. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + * @param vertices the "set" of vertices and their attributes + * @param edges the collection of edges in the graph + * + */ def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = { @@ -346,9 +402,21 @@ object Graph { } - /** - * Construct a new graph from a set of edges and vertices + * Construct a graph from a collection attributed vertices and + * edges. Duplicate vertices are combined using the `mergeFunc` and + * vertices found in the edge collection but not in the input + * vertices are the default attribute `defautVertexAttr`. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + * @param vertices the "set" of vertices and their attributes + * @param edges the collection of edges in the graph + * @param defaultVertexAttr the default vertex attribute to use for + * vertices that are mentioned in `edges` but not in `vertices + * @param mergeFunc the function used to merge duplicate vertices + * in the `vertices` collection. + * */ def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid,VD)], @@ -358,5 +426,14 @@ object Graph { GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc) } + /** + * The implicit graphToGraphOPs function extracts the GraphOps + * member from a graph. + * + * To improve modularity the Graph type only contains a small set of + * basic operations. All the convenience operations are defined in + * the GraphOps class which may be shared across multiple graph + * implementations. + */ implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops } // end of Graph object