Skip to content
Snippets Groups Projects
Commit 77626d15 authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

Adding collect neighbors and documenting GraphOps.

parent d6a902f3
No related branches found
No related tags found
No related merge requests found
...@@ -5,21 +5,62 @@ import org.apache.spark.SparkContext._ ...@@ -5,21 +5,62 @@ import org.apache.spark.SparkContext._
import org.apache.spark.util.ClosureCleaner import org.apache.spark.util.ClosureCleaner
class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* `GraphOps` contains additional functionality (syntatic sugar) for the graph
* type and is implicitly constructed for each Graph object. All operations in
* `GraphOps` are expressed in terms of the efficient GraphX API.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
*
*/
class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the number of edges in the graph.
*/
lazy val numEdges: Long = graph.edges.count() lazy val numEdges: Long = graph.edges.count()
/**
* Compute the number of vertices in the graph.
*/
lazy val numVertices: Long = graph.vertices.count() lazy val numVertices: Long = graph.vertices.count()
/**
* Compute the in-degree of each vertex in the Graph returning an RDD.
* @note Vertices with no in edges are not returned in the resulting RDD.
*/
lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In) lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In)
/**
* Compute the out-degree of each vertex in the Graph returning an RDD.
* @note Vertices with no out edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out) lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out)
/**
* Compute the degrees of each vertex in the Graph returning an RDD.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both) lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both)
/**
* Compute the neighboring vertex degrees.
*
* @param edgeDirection the direction along which to collect neighboring
* vertex attributes.
*/
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
}
/** /**
* This function is used to compute a statistic for the neighborhood of each * This function is used to compute a statistic for the neighborhood of each
* vertex and returns a value for all vertices (including those without * vertex and returns a value for all vertices (including those without
...@@ -94,7 +135,16 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { ...@@ -94,7 +135,16 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
} // end of aggregateNeighbors } // end of aggregateNeighbors
def collectNeighborIds(edgeDirection: EdgeDirection) : VertexSetRDD[Array[Vid]] = { /**
* Return the Ids of the neighboring vertices.
*
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring ids for each vertex.
*/
def collectNeighborIds(edgeDirection: EdgeDirection) :
VertexSetRDD[Array[Vid]] = {
val nbrs = graph.aggregateNeighbors[Array[Vid]]( val nbrs = graph.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertexId(vid))), (vid, edge) => Some(Array(edge.otherVertexId(vid))),
(a, b) => a ++ b, (a, b) => a ++ b,
...@@ -104,12 +154,35 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { ...@@ -104,12 +154,35 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
case (_, Some(nbrs)) => nbrs case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[Vid] case (_, None) => Array.empty[Vid]
} }
} } // end of collectNeighborIds
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = { /**
graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) * Collect the neighbor vertex attributes for each vertex.
} *
* @note This function could be highly inefficient on power-law
* graphs where high degree vertices may force a large ammount of
* information to be collected to a single location.
*
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring vertex attributes
* for each vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
VertexSetRDD[ Array[(Vid, VD)] ] = {
val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]](
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
(a, b) => a ++ b,
edgeDirection)
graph.vertices.leftZipJoin(nbrs).mapValues{
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[(Vid, VD)]
}
} // end of collectNeighbor
/** /**
...@@ -139,11 +212,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { ...@@ -139,11 +212,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* (v, deg) => deg ) * (v, deg) => deg )
* }}} * }}}
* *
* @todo Should this function be curried to enable type inference? For
* example
* {{{
* graph.joinVertices(tbl)( (v, row) => row )
* }}}
*/ */
def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
: Graph[VD, ED] = { : Graph[VD, ED] = {
...@@ -158,4 +226,4 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { ...@@ -158,4 +226,4 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
graph.outerJoinVertices(table)(uf) graph.outerJoinVertices(table)(uf)
} }
} } // end of GraphOps
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment