Skip to content
Snippets Groups Projects
Commit d39ac2eb authored by Kyle Ellrott's avatar Kyle Ellrott
Browse files
parents 59ec6b85 6f82c426
No related branches found
No related tags found
No related merge requests found
......@@ -11,43 +11,99 @@ object Analytics extends Logging {
*/
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
numIter: Int,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => (deg.getOrElse(0), 1.0)
}
resetProb: Double = 0.15): Graph[Double, Double] = {
/**
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 )
// Display statistics about pagerank
println(pagerankGraph.statistics)
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
(vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather
(a: Double, b: Double) => a + b, // merge
1.0,
numIter).mapVertices{ case (id, (outDeg, r)) => r }
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
Some(edge.srcAttr * edge.attr)
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)
}
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
tol: Float,
maxIter: Int = Integer.MAX_VALUE,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(id, data, degIter) => (degIter.sum, 1.0, 1.0)
def deltaPagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
/**
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[(Double, Double), Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices( (id, attr) => (0.0, 0.0) )
// Display statistics about pagerank
println(pagerankGraph.statistics)
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = {
if (edge.srcAttr._2 > tol) {
Some(edge.srcAttr._2 * edge.attr)
} else { None }
}
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
Pregel(pagerankGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
.mapVertices( (vid, attr) => attr._1 )
// // Compute the out degree of each vertex
// val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
// (id, data, degIter) => (degIter.sum, 1.0, 1.0)
// }
// Run PageRank
GraphLab.iterate(pagerankGraph)(
(me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
(a: Double, b: Double) => a + b,
(id, data, a: Option[Double]) =>
(data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
(me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
maxIter).mapVertices { case (vid, data) => data._2 }
// // Run PageRank
// GraphLab.iterate(pagerankGraph)(
// (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
// (a: Double, b: Double) => a + b,
// (id, data, a: Option[Double]) =>
// (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
// (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
// maxIter).mapVertices { case (vid, data) => data._2 }
}
......
......@@ -10,6 +10,8 @@ import org.apache.spark.rdd.RDD
object Pregel {
/**
* Execute the Pregel program.
*
......@@ -34,23 +36,19 @@ object Pregel {
* @return the resulting graph at the end of the computation
*
*/
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
initialMsg: A,
numIter: Int)
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
var g = graph
//var g = graph.cache()
var i = 0
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
// Receive the first set of messages
g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var i = 0
while (i < numIter) {
// compute the messages
val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
......@@ -61,5 +59,79 @@ object Pregel {
}
// Return the final graph
g
}
}
} // end of apply
/**
* Execute the Pregel program.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param vprog a user supplied function that acts as the vertex program for
* the Pregel computation. It takes the vertex ID of the vertex it is running on,
* the accompanying data for that vertex, and the incoming data and returns the
* new vertex value.
* @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
* between the vertex and one of its neighbors and produces a message to send
* to that neighbor.
* @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
* them into a single message of type A. ''This function must be commutative and
* associative.''
* @param initialMsg the message each vertex will receive at the beginning of the
* first iteration.
* @param numIter the number of iterations to run this computation for.
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = {
msgOpt match {
case Some(msg) => (vprog(id, attr._1, msg), true)
case None => (attr._1, false)
}
}
def sendMsgFun(vid: Vid, edge: EdgeTriplet[(VD,Boolean), ED]): Option[A] = {
if(edge.srcAttr._2) {
val et = new EdgeTriplet[VD, ED]
et.srcId = edge.srcId
et.srcAttr = edge.srcAttr._1
et.dstId = edge.dstId
et.dstAttr = edge.dstAttr._1
et.attr = edge.attr
sendMsg(edge.otherVertexId(vid), et)
} else { None }
}
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
// compute the messages
var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
var activeMessages = messages.count
// Loop
var i = 0
while (activeMessages > 0) {
// receive the messages
g = g.outerJoinVertices(messages)(vprogFun)
val oldMessages = messages
// compute the messages
messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
activeMessages = messages.count
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)
// count the iteration
i += 1
}
// Return the final graph
g.mapVertices((id, attr) => attr._1)
} // end of apply
} // end of class Pregel
......@@ -236,7 +236,51 @@ object GraphGenerators {
}
}
}
/**
* Create `rows` by `cols` grid graph with each vertex connected to its
* row+1 and col+1 neighbors. Vertex ids are assigned in row major
* order.
*
* @param sc the spark context in which to construct the graph
* @param rows the number of rows
* @param cols the number of columns
*
* @return A graph containing vertices with the row and column ids
* as their attributes and edge values as 1.0.
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Vid = r * cols + c
val vertices: RDD[(Vid, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
(if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
(if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
}.map{ case (src, dst) => Edge(src, dst, 1.0) }
Graph(vertices, edges)
} // end of gridGraph
/**
* Create a star graph with vertex 0 being the center.
*
* @param sc the spark context in which to construct the graph
* @param the number of vertices in the star
*
* @return A star graph containing `nverts` vertices with vertex 0
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph(edges, false)
} // end of starGraph
} // end of Graph Generators
......
package org.apache.spark.graph
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graph.LocalSparkContext._
import org.apache.spark.graph.util.GraphGenerators
object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c
// Make the grid graph
for(r <- 0 until nRows; c <- 0 until nCols){
val ind = sub2ind(r,c)
if(r+1 < nRows) {
outDegree(ind) += 1
inNbrs(sub2ind(r+1,c)) += ind
}
if(c+1 < nCols) {
outDegree(ind) += 1
inNbrs(sub2ind(r,c+1)) += ind
}
}
// compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb)
for(iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
for(ind <- 0 until (nRows * nCols)) {
pr(ind) = resetProb + (1.0 - resetProb) *
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
}
}
(0L until (nRows * nCols)).zip(pr)
}
}
class AnalyticsSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
test("Star PageRank") {
withSpark(new SparkContext("local", "test")) { sc =>
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices)
val resetProb = 0.15
val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum
assert(notMatching === 0)
prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map{ case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
if ( !correct ) { 1 } else { 0 }
}
assert(errors.sum === 0)
val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb)
val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{
case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0
case _ => 1
}.sum
assert(errors2 === 0)
}
} // end of test Star PageRank
test("Grid PageRank") {
withSpark(new SparkContext("local", "test")) { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
val resetProb = 0.15
val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache()
val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache()
val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map {
case (id, (a, b)) => (a - b) * (a - b)
}.sum
prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_))
println(error)
assert(error < 1.0e-5)
val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb))
val error2 = prGraph1.vertices.leftJoin(pr3).map {
case (id, (a, Some(b))) => (a - b) * (a - b)
case _ => 0
}.sum
prGraph1.vertices.leftJoin(pr3).foreach(println( _ ))
println(error2)
assert(error2 < 1.0e-5)
}
} // end of Grid PageRank
} // end of AnalyticsSuite
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment