diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index cacfcb1c90f49a85238a96aa8d8912328729e473..69f27601ce5ec8d338df340dbd6ec326b175f36c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -8,57 +8,47 @@ import org.apache.spark.SparkException /** - * `GraphOps` contains additional functionality (syntatic sugar) for - * the graph type and is implicitly constructed for each Graph object. - * All operations in `GraphOps` are expressed in terms of the - * efficient GraphX API. + * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the + * efficient GraphX API. This class is implicitly constructed for each Graph object. * * @tparam VD the vertex attribute type * @tparam ED the edge attribute type - * */ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { - /** - * Compute the number of edges in the graph. - */ + /** The number of edges in the graph. */ lazy val numEdges: Long = graph.edges.count() - /** - * Compute the number of vertices in the graph. - */ + /** The number of vertices in the graph. */ lazy val numVertices: Long = graph.vertices.count() /** - * Compute the in-degree of each vertex in the Graph returning an - * RDD. - * @note Vertices with no in edges are not returned in the resulting RDD. + * The in-degree of each vertex in the graph. + * @note Vertices with no in-edges are not returned in the resulting RDD. */ lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In) /** - * Compute the out-degree of each vertex in the Graph returning an RDD. - * @note Vertices with no out edges are not returned in the resulting RDD. + * The out-degree of each vertex in the graph. + * @note Vertices with no out-edges are not returned in the resulting RDD. */ lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out) /** - * Compute the degrees of each vertex in the Graph returning an RDD. - * @note Vertices with no edges are not returned in the resulting - * RDD. + * The degree of each vertex in the graph. + * @note Vertices with no edges are not returned in the resulting RDD. */ lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both) /** - * Compute the neighboring vertex degrees. + * Computes the neighboring vertex degrees. * - * @param edgeDirection the direction along which to collect - * neighboring vertex attributes. + * @param edgeDirection the direction along which to collect neighboring vertex attributes */ private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = { if (edgeDirection == EdgeDirection.In) { @@ -70,32 +60,20 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { } } - /** - * This function is used to compute a statistic for the neighborhood - * of each vertex and returns a value for all vertices (including - * those without neighbors). - * - * @note Because the a default value is provided all vertices will - * have a corresponding entry in the returned RDD. + * Computes a statistic for the neighborhood of each vertex. * - * @param mapFunc the function applied to each edge adjacent to each - * vertex. The mapFunc can optionally return None in which case it - * does not contribute to the final sum. - * @param reduceFunc the function used to merge the results of each - * map operation. - * @param default the default value to use for each vertex if it has - * no neighbors or the map function repeatedly evaluates to none - * @param direction the direction of edges to consider (e.g., In, - * Out, Both). - * @tparam VD2 The returned type of the aggregation operation. + * @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can + * optionally return `None`, in which case it does not contribute to the final sum. + * @param reduceFunc the function used to merge the results of each map operation + * @param direction the direction of edges to consider (e.g., In, Out, Both). + * @tparam A the aggregation type * - * @return A Spark.RDD containing tuples of vertex identifiers and - * their resulting value. There will be exactly one entry for ever - * vertex in the original graph. + * @return an RDD containing tuples of vertex identifiers and + * their resulting value. Vertices with no neighbors will not appear in the RDD. * * @example We can use this function to compute the average follower - * age for each user + * age for each user: * * {{{ * val graph: Graph[Int,Int] = loadGraph() @@ -107,16 +85,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * EdgeDirection.In) * .mapValues{ case (sum,followers) => sum.toDouble / followers} * }}} - * - * @todo Should this return a graph with the new vertex values? - * */ def aggregateNeighbors[A: ClassTag]( mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A], reduceFunc: (A, A) => A, dir: EdgeDirection) : VertexRDD[A] = { - // Define a new map function over edge triplets val mf = (et: EdgeTriplet[VD,ED]) => { // Compute the message to the dst vertex @@ -143,15 +117,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { /** - * Return the Ids of the neighboring vertices. + * Collect the neighbor vertex ids for each vertex. * * @param edgeDirection the direction along which to collect * neighboring vertices * - * @return the vertex set of neighboring ids for each vertex. + * @return the set of neighboring ids for each vertex */ - def collectNeighborIds(edgeDirection: EdgeDirection) : - VertexRDD[Array[VertexID]] = { + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = { val nbrs = if (edgeDirection == EdgeDirection.Both) { graph.mapReduceTriplets[Array[VertexID]]( @@ -185,8 +158,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * @param edgeDirection the direction along which to collect * neighboring vertices * - * @return the vertex set of neighboring vertex attributes for each - * vertex. + * @return the vertex set of neighboring vertex attributes for each vertex */ def collectNeighbors(edgeDirection: EdgeDirection) : VertexRDD[ Array[(VertexID, VD)] ] = {