Skip to content
Snippets Groups Projects
Commit d691e9f4 authored by Ankur Dave's avatar Ankur Dave
Browse files

Move algorithms to GraphOps

parent 20c509b8
No related branches found
No related tags found
No related merge requests found
...@@ -667,9 +667,7 @@ things to worry about.) ...@@ -667,9 +667,7 @@ things to worry about.)
# Graph Algorithms # Graph Algorithms
<a name="graph_algorithms"></a> <a name="graph_algorithms"></a>
GraphX includes a set of graph algorithms in to simplify analytics. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via an implicit conversion to [`Algorithms`][Algorithms]. This section describes the algorithms and how they are used. GraphX includes a set of graph algorithms in to simplify analytics. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used.
[Algorithms]: api/graphx/index.html#org.apache.spark.graphx.lib.Algorithms
## PageRank ## PageRank
<a name="pagerank"></a> <a name="pagerank"></a>
...@@ -681,8 +679,6 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t ...@@ -681,8 +679,6 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t
[PageRank]: api/graphx/index.html#org.apache.spark.graphx.lib.PageRank$ [PageRank]: api/graphx/index.html#org.apache.spark.graphx.lib.PageRank$
{% highlight scala %} {% highlight scala %}
// Load the implicit conversion to Algorithms
import org.apache.spark.graphx.lib._
// Load the datasets into a graph // Load the datasets into a graph
val users = sc.textFile("graphx/data/users.txt").map { line => val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split("\\s+") val fields = line.split("\\s+")
...@@ -710,8 +706,7 @@ The connected components algorithm labels each connected component of the graph ...@@ -710,8 +706,7 @@ The connected components algorithm labels each connected component of the graph
[ConnectedComponents]: api/graphx/index.html#org.apache.spark.graphx.lib.ConnectedComponents$ [ConnectedComponents]: api/graphx/index.html#org.apache.spark.graphx.lib.ConnectedComponents$
{% highlight scala %} {% highlight scala %}
// Load the implicit conversion and graph as in the PageRank example // Load the graph as in the PageRank example
import org.apache.spark.graphx.lib._
val users = ... val users = ...
val followers = ... val followers = ...
val graph = Graph(users, followers) val graph = Graph(users, followers)
...@@ -733,8 +728,7 @@ A vertex is part of a triangle when it has two adjacent vertices with an edge be ...@@ -733,8 +728,7 @@ A vertex is part of a triangle when it has two adjacent vertices with an edge be
[Graph.partitionBy]: api/graphx/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED] [Graph.partitionBy]: api/graphx/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED]
{% highlight scala %} {% highlight scala %}
// Load the implicit conversion and graph as in the PageRank example // Load the graph as in the PageRank example
import org.apache.spark.graphx.lib._
val users = ... val users = ...
// Load the edges in canonical order and partition the graph for triangle count // Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(RandomVertexCut) val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(RandomVertexCut)
......
...@@ -15,9 +15,7 @@ import org.apache.spark.storage.StorageLevel ...@@ -15,9 +15,7 @@ import org.apache.spark.storage.StorageLevel
* RDDs, the graph is a functional data-structure in which mutating * RDDs, the graph is a functional data-structure in which mutating
* operations return new graphs. * operations return new graphs.
* *
* @note [[GraphOps]] contains additional convenience operations. * @note [[GraphOps]] contains additional convenience operations and graph algorithms.
* [[lib.Algorithms]] contains graph algorithms; to access these,
* import `org.apache.spark.graphx.lib._`.
* *
* @tparam VD the vertex attribute type * @tparam VD the vertex attribute type
* @tparam ED the edge attribute type * @tparam ED the edge attribute type
......
...@@ -2,9 +2,10 @@ package org.apache.spark.graphx ...@@ -2,9 +2,10 @@ package org.apache.spark.graphx
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.SparkException import org.apache.spark.SparkException
import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDD
/** /**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
...@@ -298,4 +299,52 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { ...@@ -298,4 +299,52 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
} }
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank]], method `runUntilConvergence`.
*/
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.runUntilConvergence(graph, tol, resetProb)
}
/**
* Run PageRank for a fixed number of iterations returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank]], method `run`.
*/
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.run(graph, numIter, resetProb)
}
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents]]
*/
def connectedComponents(): Graph[VertexID, ED] = {
ConnectedComponents.run(graph)
}
/**
* Compute the number of triangles passing through each vertex.
*
* @see [[org.apache.spark.graphx.lib.TriangleCount]]
*/
def triangleCount(): Graph[Int, ED] = {
TriangleCount.run(graph)
}
/**
* Compute the strongly connected component (SCC) of each vertex and return a graph with the
* vertex value containing the lowest vertex id in the SCC containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents]]
*/
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = {
StronglyConnectedComponents.run(graph, numIter)
}
} // end of GraphOps } // end of GraphOps
package org.apache.spark.graphx.lib
import scala.reflect.ClassTag
import org.apache.spark.graphx._
/**
* Provides graph algorithms directly on [[org.apache.spark.graphx.Graph]] via an implicit
* conversion.
* @example
* {{{
* import org.apache.spark.graph.lib._
* val graph: Graph[_, _] = loadGraph()
* graph.connectedComponents()
* }}}
*/
class Algorithms[VD: ClassTag, ED: ClassTag](self: Graph[VD, ED]) {
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank]], method `runUntilConvergence`.
*/
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.runUntilConvergence(self, tol, resetProb)
}
/**
* Run PageRank for a fixed number of iterations returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank]], method `run`.
*/
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.run(self, numIter, resetProb)
}
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents]]
*/
def connectedComponents(): Graph[VertexID, ED] = {
ConnectedComponents.run(self)
}
/**
* Compute the number of triangles passing through each vertex.
*
* @see [[org.apache.spark.graphx.lib.TriangleCount]]
*/
def triangleCount(): Graph[Int, ED] = {
TriangleCount.run(self)
}
/**
* Compute the strongly connected component (SCC) of each vertex and return a graph with the
* vertex value containing the lowest vertex id in the SCC containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents]]
*/
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = {
StronglyConnectedComponents.run(self, numIter)
}
}
package org.apache.spark.graphx
import scala.reflect.ClassTag
package object lib {
implicit def graphToAlgorithms[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED]): Algorithms[VD, ED] = new Algorithms(graph)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment