Skip to content
Snippets Groups Projects
Commit cdbd19bb authored by Ankur Dave's avatar Ankur Dave
Browse files

Create all versions of vid2pid ahead of time

parent 27e4355d
No related branches found
No related tags found
No related merge requests found
......@@ -22,9 +22,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
* The Iterator type returned when constructing edge triplets
*/
class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
private var pos = 0
private val et = new EdgeTriplet[VD, ED]
......@@ -66,7 +66,7 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vTable: VertexSetRDD[VD],
@transient val vid2pid: VertexSetRDD[Array[Pid]],
@transient val vid2pid: Vid2Pid,
@transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
extends Graph[VD, ED] {
......@@ -82,8 +82,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
* (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
* and is arranged as described above.
*/
@transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid, localVidMap)
@transient val vTableReplicatedValuesBothAttrs: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid.bothAttrs, localVidMap)
@transient val vTableReplicatedValuesSrcAttrOnly: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid.srcAttrOnly, localVidMap)
@transient val vTableReplicatedValuesDstAttrOnly: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid.dstAttrOnly, localVidMap)
// TODO(ankurdave): create this more efficiently
@transient val vTableReplicatedValuesNoAttrs: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid.noAttrs, localVidMap)
/** Return a RDD of vertices. */
@transient override val vertices = vTable
......@@ -95,7 +105,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
/** Return a RDD that brings edges with its source and destination vertices together. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
makeTriplets(localVidMap, vTableReplicatedValues, eTable)
makeTriplets(localVidMap, vTableReplicatedValuesBothAttrs, eTable)
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
eTable.persist(newLevel)
......@@ -111,15 +121,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def statistics: Map[String, Any] = {
val numVertices = this.numVertices
val numEdges = this.numEdges
val replicationRatio =
vid2pid.map(kv => kv._2.size).sum / vTable.count
val replicationRatioBothAttrs =
vid2pid.bothAttrs.map(kv => kv._2.size).sum / numVertices
val replicationRatioSrcAttrOnly =
vid2pid.srcAttrOnly.map(kv => kv._2.size).sum / numVertices
val replicationRatioDstAttrOnly =
vid2pid.dstAttrOnly.map(kv => kv._2.size).sum / numVertices
val loadArray =
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
"Num Vertices" -> numVertices, "Num Edges" -> numEdges,
"Replication" -> replicationRatio, "Load Array" -> loadArray,
"Replication (both)" -> replicationRatioBothAttrs,
"Replication (src only)" -> replicationRatioSrcAttrOnly,
"Replication (dest only)" -> replicationRatioDstAttrOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad, "Max Load" -> maxLoad)
}
......@@ -162,18 +179,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
traverseLineage(vTable, " ", visited)
visited += (vTable.id -> "vTable")
println("\n\nvid2pid -----------------------------------------")
traverseLineage(vid2pid, " ", visited)
visited += (vid2pid.id -> "vid2pid")
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
println("\n\nvid2pid.bothAttrs -------------------------------")
traverseLineage(vid2pid.bothAttrs, " ", visited)
visited += (vid2pid.bothAttrs.id -> "vid2pid")
visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.values")
println("\n\nlocalVidMap -------------------------------------")
traverseLineage(localVidMap, " ", visited)
visited += (localVidMap.id -> "localVidMap")
println("\n\nvTableReplicatedValues --------------------------")
traverseLineage(vTableReplicatedValues, " ", visited)
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
println("\n\nvTableReplicatedValuesBothAttrs -----------------")
traverseLineage(vTableReplicatedValuesBothAttrs, " ", visited)
visited += (vTableReplicatedValuesBothAttrs.id -> "vTableReplicatedValuesBothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
......@@ -233,7 +250,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
// @todo reindex the vertex and edge tables
val newVid2Pid = createVid2Pid(newETable, newVTable.index)
val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
val newVidMap = createLocalVidMap(newETable)
new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
......@@ -329,7 +346,7 @@ object GraphImpl {
*
*/
val etable = createETable(edges)
val vid2pid = createVid2Pid(etable, vtable.index)
val vid2pid = new Vid2Pid(etable, vtable.index)
val localVidMap = createLocalVidMap(etable)
new GraphImpl(vtable, vid2pid, localVidMap, etable)
}
......@@ -369,38 +386,6 @@ object GraphImpl {
}, preservesPartitioning = true).cache()
}
protected def createVid2Pid[ED: ClassManifest](
eTable: RDD[(Pid, EdgePartition[ED])],
vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}.partitionBy(vTableIndex.rdd.partitioner.get)
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
protected def createVid2PidSourceAttrOnly[ED: ClassManifest](
eTable: RDD[(Pid, EdgePartition[ED])],
vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {vSet.add(e.srcId)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
RDD[(Pid, VertexIdToIndexMap)] = {
eTable.mapPartitions( _.map{ case (pid, epart) =>
......@@ -459,7 +444,7 @@ object GraphImpl {
def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
g: GraphImpl[VD, ED],
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValuesBothAttrs){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
......@@ -491,13 +476,12 @@ object GraphImpl {
BytecodeUtils.invokedMethod(mapFunc, classOf[EdgeTriplet[VD, ED]], "srcAttr")
val mapFuncUsesDstAttr =
BytecodeUtils.invokedMethod(mapFunc, classOf[EdgeTriplet[VD, ED]], "dstAttr")
val vTableReplicatedValues =
if (mapFuncUsesSrcAttr && !mapFuncUsesDstAttr) {
val vid2pidSourceAttrOnly = createVid2PidSourceAttrOnly(g.eTable, g.vTable.index)
createVTableReplicated(g.vTable, vid2pidSourceAttrOnly, g.localVidMap)
} else {
g.vTableReplicatedValues
}
val vTableReplicatedValues = (mapFuncUsesSrcAttr, mapFuncUsesDstAttr) match {
case (true, true) => g.vTableReplicatedValuesBothAttrs
case (true, false) => g.vTableReplicatedValuesSrcAttrOnly
case (false, true) => g.vTableReplicatedValuesDstAttrOnly
case (false, false) => g.vTableReplicatedValuesNoAttrs
}
// Map and preaggregate
val preAgg = g.eTable.zipPartitions(g.localVidMap, vTableReplicatedValues){
......
package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.graph._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/**
* Stores the layout of vertex attributes.
*/
class Vid2Pid(
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
vTableIndex: VertexSetIndex) {
val bothAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(true, true)
val srcAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(true, false)
val dstAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(false, true)
// TODO(ankurdave): create this more efficiently
val noAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(false, false)
def persist(newLevel: StorageLevel) {
bothAttrs.persist(newLevel)
srcAttrOnly.persist(newLevel)
dstAttrOnly.persist(newLevel)
noAttrs.persist(newLevel)
}
private def createVid2Pid(
includeSrcAttr: Boolean,
includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {
if (includeSrcAttr) vSet.add(e.srcId)
if (includeDstAttr) vSet.add(e.dstId)
})
vSet.iterator.map { vid => (vid.toLong, pid) }
}
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
}
......@@ -58,44 +58,6 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
test("aggregateNeighbors - source attribute replication only") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
// Create a star graph where the degree of each vertex is its attribute
val star = Graph(sc.parallelize((1 to n).map(x => ((n + 1): Vid, x: Vid))))
val totalOfInNeighborDegrees = star.aggregateNeighbors(
(vid, edge) => {
// All edges have the center vertex as the source, which has degree n
if (edge.srcAttr != n) {
throw new Exception("edge.srcAttr is %d, expected %d".format(edge.srcAttr, n))
}
Some(edge.srcAttr)
},
(a: Int, b: Int) => a + b,
EdgeDirection.In)
assert(totalOfInNeighborDegrees.collect().toSet === (1 to n).map(x => (x, n)).toSet)
}
}
test("aggregateNeighbors - no vertex attribute replication") {
withSpark(new SparkContext("local[2]", "test")) { sc =>
val n = 3
// Not serializable because it captures org.scalatest.Engine
class UnserializableAttribute {}
// Create a star graph where vertex attributes are not serializable
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))))
.mapVertices { (id, attr) => new UnserializableAttribute }
// Should not serialize any vertex attributes
val ignoreAttributes = star.aggregateNeighbors(
(vid, edge) => Some(0),
(a: Int, b: Int) => a + b,
EdgeDirection.In)
assert(ignoreAttributes.collect().toSet === (1 to n).map(x => (x, 0)).toSet)
}
}
test("joinVertices") {
withSpark(new SparkContext("local", "test")) { sc =>
val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment