Skip to content
Snippets Groups Projects
Commit a9f96b54 authored by Ankur Dave's avatar Ankur Dave
Browse files

Merge pull request #56 from jegonzal/PregelAPIChanges

Changing Pregel API to use mapReduceTriplets instead of aggregateNeighbors
parents 5907137d e9308e0e
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@ object Analytics extends Logging {
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
*
* The following PageRank fixed point is computed for each vertex.
*
* {{{
......@@ -35,7 +35,7 @@ object Analytics extends Logging {
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and
* `outDeg[j]` is the out degree of vertex `j`.
*
*
* Note that this is not the "normalized" PageRank and as a
* consequence pages that have no inlinks will have a PageRank of
* alpha.
......@@ -52,7 +52,7 @@ object Analytics extends Logging {
*
*/
def pagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15):
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15):
Graph[Double, Double] = {
/**
......@@ -76,13 +76,13 @@ object Analytics extends Logging {
// version of Pregel
def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
Some(edge.srcAttr * edge.attr)
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Array((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
val initialMessage = 0.0
// Execute pregel for a fixed number of iterations.
// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)
}
......@@ -107,7 +107,7 @@ object Analytics extends Logging {
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and
* `outDeg[j]` is the out degree of vertex `j`.
*
*
* Note that this is not the "normalized" PageRank and as a
* consequence pages that have no inlinks will have a PageRank of
* alpha.
......@@ -124,11 +124,11 @@ object Analytics extends Logging {
* PageRank and each edge containing the normalized weight.
*/
def deltaPagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
Graph[Double, Double] = {
/**
* Initialize the pagerankGraph with each edge attribute
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[(Double, Double), Double] = graph
......@@ -136,7 +136,7 @@ object Analytics extends Logging {
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices( (id, attr) => (0.0, 0.0) )
......@@ -151,16 +151,16 @@ object Analytics extends Logging {
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = {
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Some(edge.srcAttr._2 * edge.attr)
} else { None }
}
Array((edge.dstId, edge.srcAttr._2 * edge.attr))
} else { Array.empty[(Vid, Double)] }
}
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
// Execute a dynamic version of Pregel.
Pregel(pagerankGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
.mapVertices( (vid, attr) => attr._1 )
......@@ -182,26 +182,28 @@ object Analytics extends Logging {
* @return a graph with vertex attributes containing the smallest
* vertex in each connected component
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]):
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]):
Graph[Vid, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(id: Vid, edge: EdgeTriplet[Vid, ED]): Option[Vid] = {
val thisAttr = edge.vertexAttr(id)
val otherAttr = edge.otherVertexAttr(id)
if(thisAttr < otherAttr) { Some(thisAttr) }
else { None }
def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Array((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Array((edge.srcId, edge.dstAttr))
} else {
Array.empty[(Vid, Vid)]
}
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, EdgeDirection.Both)(
Pregel(ccGraph, initialMessage)(
(id, attr, msg) => math.min(attr, msg),
sendMessage,
sendMessage,
(a,b) => math.min(a,b)
)
} // end of connectedComponents
} // end of connectedComponents
def main(args: Array[String]) = {
val host = args(0)
......@@ -213,7 +215,7 @@ object Analytics extends Logging {
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
loggers.map{
loggerName =>
......@@ -265,7 +267,7 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val startTime = System.currentTimeMillis
......@@ -314,7 +316,7 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val cc = Analytics.connectedComponents(graph)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
......
......@@ -11,21 +11,21 @@ import org.apache.spark.util.ClosureCleaner
* the graph type and is implicitly constructed for each Graph object.
* All operations in `GraphOps` are expressed in terms of the
* efficient GraphX API.
*
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @tparam ED the edge attribute type
*
*/
class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the number of edges in the graph.
* Compute the number of edges in the graph.
*/
lazy val numEdges: Long = graph.edges.count()
/**
* Compute the number of vertices in the graph.
* Compute the number of vertices in the graph.
*/
lazy val numVertices: Long = graph.vertices.count()
......@@ -39,7 +39,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the out-degree of each vertex in the Graph returning an RDD.
* Compute the out-degree of each vertex in the Graph returning an RDD.
* @note Vertices with no out edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out)
......@@ -60,7 +60,13 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* neighboring vertex attributes.
*/
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets(et => Array((et.dstId,1)), _+_)
} else if (edgeDirection == EdgeDirection.Out) {
graph.mapReduceTriplets(et => Array((et.srcId,1)), _+_)
} else { // EdgeDirection.both
graph.mapReduceTriplets(et => Array((et.srcId,1), (et.dstId,1)), _+_)
}
}
......@@ -89,7 +95,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
*
* @example We can use this function to compute the average follower
* age for each user
*
*
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val averageFollowerAge: RDD[(Int, Int)] =
......@@ -113,15 +119,15 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// Define a new map function over edge triplets
// Define a new map function over edge triplets
val mf = (et: EdgeTriplet[VD,ED]) => {
// Compute the message to the dst vertex
val dst =
val dst =
if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
mapFunc(et.dstId, et)
} else { Option.empty[A] }
// Compute the message to the source vertex
val src =
val src =
if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
mapFunc(et.srcId, et)
} else { Option.empty[A] }
......@@ -130,7 +136,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
case (None, None) => Array.empty[(Vid, A)]
case (Some(srcA),None) => Array((et.srcId, srcA))
case (None, Some(dstA)) => Array((et.dstId, dstA))
case (Some(srcA), Some(dstA)) =>
case (Some(srcA), Some(dstA)) =>
Array((et.srcId, srcA), (et.dstId, dstA))
}
}
......@@ -141,14 +147,14 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Return the Ids of the neighboring vertices.
* Return the Ids of the neighboring vertices.
*
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring ids for each vertex.
*/
def collectNeighborIds(edgeDirection: EdgeDirection) :
def collectNeighborIds(edgeDirection: EdgeDirection) :
VertexSetRDD[Array[Vid]] = {
val nbrs = graph.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertexId(vid))),
......@@ -171,10 +177,10 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* @return the vertex set of neighboring vertex attributes for each
* vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
def collectNeighbors(edgeDirection: EdgeDirection) :
VertexSetRDD[ Array[(Vid, VD)] ] = {
val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]](
(vid, edge) =>
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
(a, b) => a ++ b,
edgeDirection)
......
......@@ -34,8 +34,8 @@ import org.apache.spark.rdd.RDD
* def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
* Some(edge.srcAttr * edge.attr)
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
* // Execute pregel for a fixed number of iterations.
* val initialMessage = 0.0
* // Execute pregel for a fixed number of iterations.
* Pregel(pagerankGraph, initialMessage, numIter)(
* vertexProgram, sendMessage, messageCombiner)
* }}}
......@@ -64,7 +64,7 @@ object Pregel {
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
......@@ -93,78 +93,17 @@ object Pregel {
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
sendMsg: EdgeTriplet[VD, ED] => Array[(Vid,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
apply(graph, initialMsg, numIter, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
} // end of Apply
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates a fixed number (`numIter`) of iterations.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param sendDir the edge direction along which the `sendMsg`
* function is invoked.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to each
* edge in the direction `sendDir` adjacent to vertices that
* received messages in the current iteration.
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int, sendDir: EdgeDirection)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
// Receive the first set of messages
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var i = 0
while (i < numIter) {
// compute the messages
val messages = g.aggregateNeighbors(mapF, mergeMsg, sendDir.reverse)
val messages = g.mapReduceTriplets(sendMsg, mergeMsg)
// receive the messages
g = g.joinVertices(messages)(vprog)
// count the iteration
......@@ -195,7 +134,7 @@ object Pregel {
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
......@@ -224,66 +163,7 @@ object Pregel {
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
apply(graph, initialMsg, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
} // end of apply
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param sendDir the edge direction along which the `sendMsg`
* function is invoked.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to each
* edge in the direction `sendDir` adjacent to vertices that
* received messages in the current iteration.
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, sendDir: EdgeDirection)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
sendMsg: EdgeTriplet[VD, ED] => Array[(Vid,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
......@@ -294,7 +174,7 @@ object Pregel {
}
}
def sendMsgFun(vid: Vid, edge: EdgeTriplet[(VD,Boolean), ED]): Option[A] = {
def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Array[(Vid, A)] = {
if(edge.srcAttr._2) {
val et = new EdgeTriplet[VD, ED]
et.srcId = edge.srcId
......@@ -302,22 +182,22 @@ object Pregel {
et.dstId = edge.dstId
et.dstAttr = edge.dstAttr._1
et.attr = edge.attr
sendMsg(edge.otherVertexId(vid), et)
} else { None }
sendMsg(et)
} else { Array.empty[(Vid,A)] }
}
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
// compute the messages
var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
var activeMessages = messages.count
// Loop
// Loop
var i = 0
while (activeMessages > 0) {
// receive the messages
g = g.outerJoinVertices(messages)(vprogFun)
val oldMessages = messages
// compute the messages
messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
activeMessages = messages.count
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)
......
......@@ -7,7 +7,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.graph._
......@@ -28,7 +28,7 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
private var pos = 0
private val et = new EdgeTriplet[VD, ED]
private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
et.srcId = edgePartition.srcIds(pos)
......@@ -113,16 +113,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def statistics: Map[String, Any] = {
val numVertices = this.numVertices
val numEdges = this.numEdges
val replicationRatio =
val replicationRatio =
vid2pid.map(kv => kv._2.size).sum / vTable.count
val loadArray =
val loadArray =
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
"Num Vertices" -> numVertices, "Num Edges" -> numEdges,
"Replication" -> replicationRatio, "Load Array" -> loadArray,
"Min Load" -> minLoad, "Max Load" -> maxLoad)
"Replication" -> replicationRatio, "Load Array" -> loadArray,
"Min Load" -> minLoad, "Max Load" -> maxLoad)
}
......@@ -145,10 +145,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + ", " + numparts +")")
println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
}
}
println("eTable ------------------------------------------")
traverseLineage(eTable, " ")
var visited = Map(eTable.id -> "eTable")
......@@ -169,11 +169,11 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
traverseLineage(vid2pid, " ", visited)
visited += (vid2pid.id -> "vid2pid")
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
println("\n\nlocalVidMap -------------------------------------")
traverseLineage(localVidMap, " ", visited)
visited += (localVidMap.id -> "localVidMap")
println("\n\nvTableReplicatedValues --------------------------")
traverseLineage(vTableReplicatedValues, " ", visited)
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
......@@ -185,7 +185,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def reverse: Graph[VD, ED] = {
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
preservesPartitioning = true)
new GraphImpl(vTable, vid2pid, localVidMap, newEtable)
}
......@@ -207,7 +207,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
GraphImpl.mapTriplets(this, f)
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
/** @todo The following code behaves deterministically on each
......@@ -215,7 +215,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
* this version
*/
// val predGraph = mapVertices(v => (v.data, vpred(v)))
// val newETable = predGraph.triplets.filter(t =>
// val newETable = predGraph.triplets.filter(t =>
// if(v.src.data._2 && v.dst.data._2) {
// val src = Vertex(t.src.id, t.src.data._1)
// val dst = Vertex(t.dst.id, t.dst.data._1)
......@@ -226,7 +226,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// .map(v => (v.id, v.data._1)).indexed()
// Reuse the partitioner (but not the index) from this graph
val newVTable =
val newVTable =
VertexSetRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner))
......@@ -237,9 +237,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
)
.map( t => Edge(t.srcId, t.dstId, t.attr) ))
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
// @todo reindex the vertex and edge tables
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
// @todo reindex the vertex and edge tables
val newVid2Pid = createVid2Pid(newETable, newVTable.index)
val newVidMap = createLocalVidMap(newETable)
......@@ -298,7 +298,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
: VertexSetRDD[A] =
: VertexSetRDD[A] =
GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc)
......@@ -322,30 +322,30 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]],
defaultVertexAttr: VD):
defaultVertexAttr: VD):
GraphImpl[VD,ED] = {
apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a)
}
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
val vtable = VertexSetRDD(vertices, mergeFunc)
/**
* @todo Verify that there are no edges that contain vertices
val vtable = VertexSetRDD(vertices, mergeFunc)
/**
* @todo Verify that there are no edges that contain vertices
* that are not in vTable. This should probably be resolved:
*
* edges.flatMap{ e => Array((e.srcId, null), (e.dstId, null)) }
* .cogroup(vertices).map{
* case (vid, _, attr) =>
* case (vid, _, attr) =>
* if (attr.isEmpty) (vid, defaultValue)
* else (vid, attr)
* }
*
*
*/
val etable = createETable(edges)
val vid2pid = createVid2Pid(etable, vtable.index)
......@@ -366,7 +366,7 @@ object GraphImpl {
: RDD[(Pid, EdgePartition[ED])] = {
// Get the number of partitions
val numPartitions = edges.partitions.size
val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
edges.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
......@@ -399,7 +399,7 @@ object GraphImpl {
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
......@@ -407,11 +407,11 @@ object GraphImpl {
}
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
RDD[(Pid, VertexIdToIndexMap)] = {
eTable.mapPartitions( _.map{ case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach{ e =>
epart.foreach{ e =>
vidToIndex.add(e.srcId)
vidToIndex.add(e.dstId)
}
......@@ -421,17 +421,17 @@ object GraphImpl {
protected def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vTable: VertexSetRDD[VD],
vid2pid: VertexSetRDD[Array[Pid]],
replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
RDD[(Pid, Array[VD])] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) =>
pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
}.partitionBy(replicationMap.partitioner.get).cache()
replicationMap.zipPartitions(msgsByPartition){
replicationMap.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
......@@ -448,12 +448,12 @@ object GraphImpl {
}
def makeTriplets[VD: ClassManifest, ED: ClassManifest](
def makeTriplets[VD: ClassManifest, ED: ClassManifest](
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
(vidMapIter, replicatedValuesIter, eTableIter) =>
eTable.zipPartitions(localVidMap, vTableReplicatedValues) {
(eTableIter, vidMapIter, replicatedValuesIter) =>
val (_, vidToIndex) = vidMapIter.next()
val (_, vertexArray) = replicatedValuesIter.next()
val (_, edgePartition) = eTableIter.next()
......@@ -463,9 +463,9 @@ object GraphImpl {
def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
g: GraphImpl[VD, ED],
g: GraphImpl[VD, ED],
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
......@@ -492,8 +492,8 @@ object GraphImpl {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// Map and preaggregate
val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
// Map and preaggregate
val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
......@@ -511,7 +511,7 @@ object GraphImpl {
val msgBS = new BitSet(vertexArray.size)
// Iterate over the partition
val et = new EdgeTriplet[VD, ED]
edgePartition.foreach{e =>
edgePartition.foreach{e =>
et.set(e)
et.srcAttr = vmap(e.srcId)
et.dstAttr = vmap(e.dstId)
......@@ -523,7 +523,7 @@ object GraphImpl {
// Populate the aggregator map
if(msgBS.get(ind)) {
msgArray(ind) = reduceFunc(msgArray(ind), msg)
} else {
} else {
msgArray(ind) = msg
msgBS.set(ind)
}
......@@ -538,59 +538,59 @@ object GraphImpl {
protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
val mixingPrime: Vid = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
/**
* This function implements a classic 2D-Partitioning of a sparse matrix.
* Suppose we have a graph with 11 vertices that we want to partition
* This function implements a classic 2D-Partitioning of a sparse matrix.
* Suppose we have a graph with 11 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
*
* __________________________________
* v0 | P0 * | P1 | P2 * |
* v0 | P0 * | P1 | P2 * |
* v1 | **** | * | |
* v2 | ******* | ** | **** |
* v3 | ***** | * * | * |
* v3 | ***** | * * | * |
* ----------------------------------
* v4 | P3 * | P4 *** | P5 ** * |
* v4 | P3 * | P4 *** | P5 ** * |
* v5 | * * | * | |
* v6 | * | ** | **** |
* v7 | * * * | * * | * |
* v7 | * * * | * * | * |
* ----------------------------------
* v8 | P6 * | P7 * | P8 * *|
* v8 | P6 * | P7 * | P8 * *|
* v9 | * | * * | |
* v10 | * | ** | * * |
* v11 | * <-E | *** | ** |
* v11 | * <-E | *** | ** |
* ----------------------------------
*
* The edge denoted by E connects v11 with v1 and is assigned to
* The edge denoted by E connects v11 with v1 and is assigned to
* processor P6. To get the processor number we divide the matrix
* into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
* adjacent to v11 can only be in the first colum of
* blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
* As a consequence we can guarantee that v11 will need to be
* adjacent to v11 can only be in the first colum of
* blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
* As a consequence we can guarantee that v11 will need to be
* replicated to at most 2 * sqrt(numProc) machines.
*
* Notice that P0 has many edges and as a consequence this
* Notice that P0 has many edges and as a consequence this
* partitioning would lead to poor work balance. To improve
* balance we first multiply each vertex id by a large prime
* to effectively shuffle the vertex locations.
* balance we first multiply each vertex id by a large prime
* to effectively shuffle the vertex locations.
*
* One of the limitations of this approach is that the number of
* machines must either be a perfect square. We partially address
* this limitation by computing the machine assignment to the next
* largest perfect square and then mapping back down to the actual
* number of machines. Unfortunately, this can also lead to work
* imbalance and so it is suggested that a perfect square is used.
*
* this limitation by computing the machine assignment to the next
* largest perfect square and then mapping back down to the actual
* number of machines. Unfortunately, this can also lead to work
* imbalance and so it is suggested that a perfect square is used.
*
*
*/
protected def edgePartitionFunction2D(src: Vid, dst: Vid,
protected def edgePartitionFunction2D(src: Vid, dst: Vid,
numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
val mixingPrime: Vid = 1125899906842597L
val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
......@@ -598,7 +598,7 @@ object GraphImpl {
/**
* Assign edges to an aribtrary machine corresponding to a
* Assign edges to an aribtrary machine corresponding to a
* random vertex cut.
*/
protected def randomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = {
......@@ -610,9 +610,9 @@ object GraphImpl {
* @todo This will only partition edges to the upper diagonal
* of the 2D processor space.
*/
protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
val mixingPrime: Vid = 1125899906842597L
// Partitions by canonical edge direction
val src = math.min(srcOrig, dstOrig)
val dst = math.max(srcOrig, dstOrig)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment