Skip to content
Snippets Groups Projects
Commit 3d7277cc authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #55 from ankurdave/aggregateNeighbors-variants

Specialize mapReduceTriplets for accessing subsets of vertex attributes
parents 1a06f707 bee10156
No related branches found
No related tags found
No related merge requests found
......@@ -12,6 +12,7 @@ import org.apache.spark.util.ClosureCleaner
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.graph.util.BytecodeUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
......@@ -21,9 +22,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
* The Iterator type returned when constructing edge triplets
*/
class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
private var pos = 0
private val et = new EdgeTriplet[VD, ED]
......@@ -62,27 +63,25 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
/**
* A Graph RDD that supports computation on graphs.
*
* @param localVidMap Stores the location of vertex attributes after they are
* replicated. Within each partition, localVidMap holds a map from vertex ID to
* the index where that vertex's attribute is stored. This index refers to the
* arrays in the same partition in the variants of
* [[VTableReplicatedValues]]. Therefore, localVidMap can be reused across
* changes to the vertex attributes.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vTable: VertexSetRDD[VD],
@transient val vid2pid: VertexSetRDD[Array[Pid]],
@transient val vid2pid: Vid2Pid,
@transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
extends Graph[VD, ED] {
def this() = this(null, null, null, null)
/**
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
* vertex data after it is replicated. Within each partition, it holds a map
* from vertex ID to the index where that vertex's attribute is stored. This
* index refers to an array in the same partition in vTableReplicatedValues.
*
* (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
* and is arranged as described above.
*/
@transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid, localVidMap)
@transient val vTableReplicatedValues: VTableReplicatedValues[VD] =
new VTableReplicatedValues(vTable, vid2pid, localVidMap)
/** Return a RDD of vertices. */
@transient override val vertices = vTable
......@@ -94,7 +93,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
/** Return a RDD that brings edges with its source and destination vertices together. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
makeTriplets(localVidMap, vTableReplicatedValues, eTable)
makeTriplets(localVidMap, vTableReplicatedValues.bothAttrs, eTable)
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
eTable.persist(newLevel)
......@@ -110,15 +109,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def statistics: Map[String, Any] = {
val numVertices = this.numVertices
val numEdges = this.numEdges
val replicationRatio =
vid2pid.map(kv => kv._2.size).sum / vTable.count
val replicationRatioBothAttrs =
vid2pid.bothAttrs.map(kv => kv._2.size).sum / numVertices
val replicationRatioSrcAttrOnly =
vid2pid.srcAttrOnly.map(kv => kv._2.size).sum / numVertices
val replicationRatioDstAttrOnly =
vid2pid.dstAttrOnly.map(kv => kv._2.size).sum / numVertices
val loadArray =
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
"Num Vertices" -> numVertices, "Num Edges" -> numEdges,
"Replication" -> replicationRatio, "Load Array" -> loadArray,
"Replication (both)" -> replicationRatioBothAttrs,
"Replication (src only)" -> replicationRatioSrcAttrOnly,
"Replication (dest only)" -> replicationRatioDstAttrOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad, "Max Load" -> maxLoad)
}
......@@ -161,18 +167,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
traverseLineage(vTable, " ", visited)
visited += (vTable.id -> "vTable")
println("\n\nvid2pid -----------------------------------------")
traverseLineage(vid2pid, " ", visited)
visited += (vid2pid.id -> "vid2pid")
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
println("\n\nvid2pid.bothAttrs -------------------------------")
traverseLineage(vid2pid.bothAttrs, " ", visited)
visited += (vid2pid.bothAttrs.id -> "vid2pid")
visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs")
println("\n\nlocalVidMap -------------------------------------")
traverseLineage(localVidMap, " ", visited)
visited += (localVidMap.id -> "localVidMap")
println("\n\nvTableReplicatedValues --------------------------")
traverseLineage(vTableReplicatedValues, " ", visited)
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
println("\n\nvTableReplicatedValues.bothAttrs ----------------")
traverseLineage(vTableReplicatedValues.bothAttrs, " ", visited)
visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
......@@ -232,7 +238,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
// @todo reindex the vertex and edge tables
val newVid2Pid = createVid2Pid(newETable, newVTable.index)
val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
val newVidMap = createLocalVidMap(newETable)
new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
......@@ -328,7 +334,7 @@ object GraphImpl {
*
*/
val etable = createETable(edges)
val vid2pid = createVid2Pid(etable, vtable.index)
val vid2pid = new Vid2Pid(etable, vtable.index)
val localVidMap = createLocalVidMap(etable)
new GraphImpl(vtable, vid2pid, localVidMap, etable)
}
......@@ -368,24 +374,9 @@ object GraphImpl {
}, preservesPartitioning = true).cache()
}
protected def createVid2Pid[ED: ClassManifest](
eTable: RDD[(Pid, EdgePartition[ED])],
vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}.partitionBy(vTableIndex.rdd.partitioner.get)
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
RDD[(Pid, VertexIdToIndexMap)] = {
private def createLocalVidMap(
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED }
): RDD[(Pid, VertexIdToIndexMap)] = {
eTable.mapPartitions( _.map{ case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach{ e =>
......@@ -396,36 +387,6 @@ object GraphImpl {
}, preservesPartitioning = true).cache()
}
protected def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vid2pid: VertexSetRDD[Array[Pid]],
replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
RDD[(Pid, Array[VD])] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) =>
// TODO(rxin): reuse VertexBroadcastMessage
pids.iterator.map { pid =>
new VertexBroadcastMsg[VD](pid, vid, vdata)
}
}.partitionBy(replicationMap.partitioner.get).cache()
replicationMap.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.capacity)
for (msg <- msgsIter) {
val ind = vidToIndex.getPos(msg.vid) & OpenHashSet.POSITION_MASK
vertexArray(ind) = msg.data
}
Iterator((pid, vertexArray))
}.cache()
// @todo assert edge table has partitioner
}
def makeTriplets[VD: ClassManifest, ED: ClassManifest](
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
......@@ -442,7 +403,7 @@ object GraphImpl {
def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
g: GraphImpl[VD, ED],
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
......@@ -468,8 +429,16 @@ object GraphImpl {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
// Map and preaggregate
val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
val preAgg = g.eTable.zipPartitions(
g.localVidMap,
g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (_, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
......@@ -488,8 +457,12 @@ object GraphImpl {
edgePartition.foreach { e =>
et.set(e)
et.srcAttr = vmap(e.srcId)
et.dstAttr = vmap(e.dstId)
if (mapFuncUsesSrcAttr) {
et.srcAttr = vmap(e.srcId)
}
if (mapFuncUsesDstAttr) {
et.dstAttr = vmap(e.dstId)
}
// TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
// Also given we are only allowing zero, one, or two messages, we can completely unroll
// the for loop.
......@@ -596,4 +569,13 @@ object GraphImpl {
(col * ceilSqrtNumParts + row) % numParts
}
private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
closure: AnyRef, attrName: String): Boolean = {
try {
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
} catch {
case _: ClassNotFoundException => true // if we don't know, be conservative
}
}
} // end of object GraphImpl
package org.apache.spark.graph.impl
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.graph._
import org.apache.spark.graph.impl.MsgRDDFunctions._
/**
* Stores the vertex attribute values after they are replicated. See
* the description of localVidMap in [[GraphImpl]].
*/
class VTableReplicatedValues[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vid2pid: Vid2Pid,
localVidMap: RDD[(Pid, VertexIdToIndexMap)]) {
val bothAttrs: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, true)
val srcAttrOnly: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, false)
val dstAttrOnly: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, true)
val noAttrs: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, false)
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[(Pid, Array[VD])] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
case (false, true) => dstAttrOnly
case (false, false) => noAttrs
}
}
object VTableReplicatedValues {
protected def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vid2pid: Vid2Pid,
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
includeSrcAttr: Boolean,
includeDstAttr: Boolean): RDD[(Pid, Array[VD])] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
val msgsByPartition = vTable.zipJoinFlatMap(vid2pid.get(includeSrcAttr, includeDstAttr)) {
// TODO(rxin): reuse VertexBroadcastMessage
(vid, vdata, pids) => pids.iterator.map { pid =>
new VertexBroadcastMsg[VD](pid, vid, vdata)
}
}.partitionBy(localVidMap.partitioner.get).cache()
localVidMap.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.capacity)
for (msg <- msgsIter) {
val ind = vidToIndex.getPos(msg.vid) & OpenHashSet.POSITION_MASK
vertexArray(ind) = msg.data
}
Iterator((pid, vertexArray))
}.cache()
// @todo assert edge table has partitioner
}
}
package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graph._
/**
* Stores the layout of vertex attributes for GraphImpl.
*/
class Vid2Pid(
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
vTableIndex: VertexSetIndex) {
val bothAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(true, true)
val srcAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(true, false)
val dstAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(false, true)
val noAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(false, false)
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
case (false, true) => dstAttrOnly
case (false, false) => noAttrs
}
def persist(newLevel: StorageLevel) {
bothAttrs.persist(newLevel)
srcAttrOnly.persist(newLevel)
dstAttrOnly.persist(newLevel)
noAttrs.persist(newLevel)
}
private def createVid2Pid(
includeSrcAttr: Boolean,
includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
if (includeSrcAttr || includeDstAttr) {
edgePartition.foreach(e => {
if (includeSrcAttr) vSet.add(e.srcId)
if (includeDstAttr) vSet.add(e.dstId)
})
}
vSet.iterator.map { vid => (vid.toLong, pid) }
}
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
}
......@@ -33,6 +33,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))))
val neighborDegreeSums = star.mapReduceTriplets(
edge => Array((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
}
}
test("aggregateNeighbors") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
......@@ -87,6 +99,6 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment