Skip to content
Snippets Groups Projects
Commit 3a40a5eb authored by Dan Crankshaw's avatar Dan Crankshaw
Browse files

Added some documentation.

parent 3f3d28c7
No related branches found
No related tags found
No related merge requests found
......@@ -184,13 +184,42 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
/**
* @todo document function
* groupEdgeTriplets is used to merge multiple edges that have the
* same source and destination vertex into a single edge. The user
* supplied function is applied to each directed pair of vertices (u, v) and
* has access to all EdgeTriplets
*
* {e: for all e in E where e.src = u and e.dst = v}
*
* This function is identical to [[org.apache.spark.graph.Graph.groupEdges]]
* except that this function
* provides the user-supplied function with an iterator over EdgeTriplets,
* which contain the vertex data, whereas groupEdges provides the user-supplied
* function with an iterator over Edges, which only contain the vertex IDs.
*
* @tparam ED2 the type of the resulting edge data after grouping
*
* @param f the user supplied function to merge multiple EdgeTriplets
* into a single ED2 object
*
* @return Graph[VD,ED2] The resulting graph with a single Edge for each
* source, dest vertex pair.
*
*/
def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2]
/**
* @todo document function
* This function merges multiple edges between two vertices into a single
* Edge. See [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more detail.
*
* @tparam ED2 the type of the resulting edge data after grouping.
*
* @param f the user supplied function to merge multiple Edges
* into a single ED2 object.
*
* @return Graph[VD,ED2] The resulting graph with a single Edge for each
* source, dest vertex pair.
*/
def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2]
......
......@@ -4,7 +4,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
/**
* This object implement the graphlab gather-apply-scatter api.
* This object implements the GraphLab gather-apply-scatter api.
*/
object GraphLab {
......
......@@ -9,7 +9,18 @@ import org.apache.spark.graph.impl.GraphImpl
object GraphLoader {
/**
* Load an edge list from file initializing the Graph RDD
* Load an edge list from file initializing the Graph
*
* @tparam ED the type of the edge data of the resulting Graph
*
* @param sc the SparkContext used to construct RDDs
* @param path the path to the text file containing the edge list
* @param edgeParser a function that takes an array of strings and
* returns an ED object
* @param minEdgePartitions the number of partitions for the
* the Edge RDD
*
* @todo remove minVertexPartitions arg
*/
def textFile[ED: ClassManifest](
sc: SparkContext,
......@@ -38,14 +49,10 @@ object GraphLoader {
}.cache()
val graph = fromEdges(edges)
// println("Loaded graph:" +
// "\n\t#edges: " + graph.numEdges +
// "\n\t#vertices: " + graph.numVertices)
graph
}
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => (vid, degree) }
......
......@@ -94,11 +94,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
} // end of aggregateNeighbors
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
val nbrs = graph.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertexId(vid))),
......
......@@ -3,8 +3,37 @@ package org.apache.spark.graph
import org.apache.spark.rdd.RDD
/**
* This object implements the Pregel bulk-synchronous
* message-passing API.
*/
object Pregel {
/**
* Execute the Pregel program.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param vprog a user supplied function that acts as the vertex program for
* the Pregel computation. It takes the vertex ID of the vertex it is running on,
* the accompanying data for that vertex, and the incoming data and returns the
* new vertex value.
* @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
* between the vertex and one of its neighbors and produces a message to send
* to that neighbor.
* @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
* them into a single message of type A. ''This function must be commutative and
* associative.''
* @param initialMsg the message each vertex will receive at the beginning of the
* first iteration.
* @param numIter the number of iterations to run this computation for.
*
* @return the resulting graph at the end of the computation
*
*/
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
......
......@@ -261,71 +261,27 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
// Because of the edgepartitioner, we know that all edges with the same src and dst
// will be in the same partition
// We will want to keep the same partitioning scheme. Use newGraph() rather than
// new GraphImpl()
// TODO(crankshaw) is there a better way to do this using RDD.groupBy()
// functions?
override def groupEdgeTriplets[ED2: ClassManifest](
f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = {
//override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
// I think that
// myRDD.mapPartitions { part =>
// val (vmap, edges) = part.next()
// gives me access to the vertex map and the set of
// edges within that partition
// This is what happens during mapPartitions
// The iterator iterates over all partitions
// val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U])
// TODO(crankshaw) figure out how to actually get the new Edge RDD and what
// type that should have
val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter =>
// toList lets us operate on all EdgeTriplets in a single partition at once
partIter
// TODO(crankshaw) toList requires that the entire edge partition
// can fit in memory right now.
.toList
// groups all ETs in this partition that have the same src and dst
// Because all ETs with the same src and dst will live on the same
// partition due to the EdgePartitioner, this guarantees that these
// ET groups will be complete.
.groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
//.groupBy { e => (e.src, e.dst) }
// Apply the user supplied supplied edge group function to
// each group of edges
// The result of this line is Map[(Long, Long, ED2]
.mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
// convert the resulting map back to a list of tuples
.toList
// TODO(crankshaw) needs an iterator over the tuples?
// Why can't I map over the list?
.toIterator
// map over those tuples that contain src and dst info plus the
// new edge data to make my new edges
.map { case ((src, dst), data) => Edge(src, dst, data) }
// How do I convert from a scala map to a list?
// I want to be able to apply a function like:
// f: (key, value): (K, V) => result: [R]
// so that I can transfrom a Map[K, V] to List[R]
// Maybe look at collections.breakOut
// see http://stackoverflow.com/questions/1715681/scala-2-8-breakout
// and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list
}
// @todo eliminate the need to call createETable
//TODO(crankshaw) eliminate the need to call createETable
val newETable = createETable(newEdges,
eTable.index.partitioner.numPartitions)
new GraphImpl(vTable, vid2pid, localVidMap, newETable)
}
......@@ -340,7 +296,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.toIterator
.map { case ((src, dst), data) => Edge(src, dst, data) }
}
// @todo eliminate the need to call createETable
// TODO(crankshaw) eliminate the need to call createETable
val newETable = createETable(newEdges,
eTable.index.partitioner.numPartitions)
......@@ -654,7 +610,8 @@ object GraphImpl {
/**
* @todo(crankshaw) how does this effect load balancing?
* @todo This will only partition edges to the upper diagonal
* of the 2D processor space.
*/
protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
......
......@@ -16,9 +16,9 @@ import org.apache.spark.graph.Graph
import org.apache.spark.graph.Edge
import org.apache.spark.graph.impl.GraphImpl
// TODO(crankshaw) I might want to pull at least RMAT out into a separate class.
// Might simplify the code to have classwide variables and such.
/**
* @todo(crankshaw) cleanup and modularize code
*/
object GraphGenerators {
val RMATa = 0.45
......@@ -236,8 +236,6 @@ object GraphGenerators {
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment