diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 68f0394dd4bfa1e154f915b15dd5abfd78ea0070..8c7ee1fcefc86904fee229839f632e58390498b0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -28,7 +28,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @see Vertex for the vertex type. * */ - val vertices: RDD[(Vid,VD)] + val vertices: VertexSetRDD[VD] /** * Get the Edges and their data as an RDD. The entries in the RDD contain @@ -257,7 +257,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { def mapReduceTriplets[A: ClassManifest]( mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], reduceFunc: (A, A) => A) - : RDD[(Vid, A)] + : VertexSetRDD[A] /** diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 5e8f082fdad8aef9b9661f5d1f5ead42f1d1e0a2..cecd3ff2913fcf6f8c025dae9a6e82fcbac5daf1 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -13,11 +13,11 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { lazy val numVertices: Long = graph.vertices.count() - lazy val inDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.In) + lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In) - lazy val outDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Out) + lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out) - lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both) + lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both) /** @@ -62,7 +62,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], reduceFunc: (A, A) => A, dir: EdgeDirection) - : RDD[(Vid, A)] = { + : VertexSetRDD[A] = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) @@ -94,20 +94,20 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { } // end of aggregateNeighbors - def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { + def collectNeighborIds(edgeDirection: EdgeDirection) : VertexSetRDD[Array[Vid]] = { val nbrs = graph.aggregateNeighbors[Array[Vid]]( (vid, edge) => Some(Array(edge.otherVertexId(vid))), (a, b) => a ++ b, edgeDirection) - graph.vertices.leftOuterJoin(nbrs).mapValues{ + graph.vertices.leftZipJoin(nbrs).mapValues{ case (_, Some(nbrs)) => nbrs case (_, None) => Array.empty[Vid] } } - private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = { + private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = { graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) } diff --git a/graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala similarity index 85% rename from graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala rename to graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 900a46bb4236205703dded6d8a931d917c595fdd..b3f1fa768c86dc33865bfa5b834e82eff036385a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -39,47 +39,40 @@ import org.apache.spark.storage.StorageLevel -/** - * The BlockIndex is the internal map structure used inside the index - * of the IndexedRDD. - */ -class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int] - /** - * The RDDIndex is an opaque type used to represent the organization + * The VertexSetIndex is an opaque type used to represent the organization * of values in an RDD */ -class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) { - def persist(newLevel: StorageLevel): RDDIndex[K] = { +class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { + def persist(newLevel: StorageLevel): VertexSetIndex = { rdd.persist(newLevel) return this } - def partitioner: Partitioner = rdd.partitioner.get } /** - * An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and + * An VertexSetRDD[V] extends the RDD[(Vid,V)] by pre-indexing the keys and * organizing the values to enable faster join operations. * - * In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD - * exposes an index member which can be used to "key" other IndexedRDDs + * In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD + * exposes an index member which can be used to "key" other VertexSetRDDs * */ -class IndexedRDD[K: ClassManifest, V: ClassManifest]( - @transient val index: RDDIndex[K], +class VertexSetRDD[V: ClassManifest]( + @transient val index: VertexSetIndex, @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ]) - extends RDD[(K, V)](index.rdd.context, + extends RDD[(Vid, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { /** - * Construct a new IndexedRDD that is indexed by only the keys in the RDD + * Construct a new VertexSetRDD that is indexed by only the keys in the RDD */ - def reindex(): IndexedRDD[K,V] = IndexedRDD(this) + def reindex(): VertexSetRDD[V] = VertexSetRDD(this) /** @@ -109,20 +102,26 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( /** - * Caching an IndexedRDD causes the index and values to be cached separately. + * Caching an VertexSetRDD causes the index and values to be cached separately. */ - override def persist(newLevel: StorageLevel): RDD[(K,V)] = { + override def persist(newLevel: StorageLevel): VertexSetRDD[V] = { index.persist(newLevel) valuesRDD.persist(newLevel) return this } + override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY) + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + override def cache(): VertexSetRDD[V] = persist() + + /** * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = { + def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { val cleanF = index.rdd.context.clean(f) val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = valuesRDD.mapPartitions(iter => iter.map{ @@ -133,7 +132,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } (newValues.toIndexedSeq, bs) }, preservesPartitioning = true) - new IndexedRDD[K,U](index, newValuesRDD) + new VertexSetRDD[U](index, newValuesRDD) } @@ -141,7 +140,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = { + def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { val cleanF = index.rdd.context.clean(f) val newValues: RDD[ (IndexedSeq[U], BitSet) ] = index.rdd.zipPartitions(valuesRDD){ @@ -158,11 +157,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } Array((newValues.toIndexedSeq, bs)).iterator } - new IndexedRDD[K,U](index, newValues) + new VertexSetRDD[U](index, newValues) } - def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = { + def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = { if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } @@ -176,11 +175,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val newValues = thisValues.view.zip(otherValues) Iterator((newValues.toIndexedSeq, newBS)) } - new IndexedRDD(index, newValuesRDD) + new VertexSetRDD(index, newValuesRDD) } - def leftZipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,Option[W])] = { + def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = { if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } @@ -195,18 +194,18 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val newValues = thisValues.view.zip(otherOption) Iterator((newValues.toIndexedSeq, thisBS)) } - new IndexedRDD(index, newValuesRDD) + new VertexSetRDD(index, newValuesRDD) } def leftJoin[W: ClassManifest]( - other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a): - IndexedRDD[K, (V, Option[W]) ] = { + other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a): + VertexSetRDD[(V, Option[W]) ] = { val cleanMerge = index.rdd.context.clean(merge) other match { - case other: IndexedRDD[_, _] if index == other.index => { + case other: VertexSetRDD[_] if index == other.index => { leftZipJoin(other) } case _ => { @@ -247,21 +246,21 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( Iterator((newValues.toIndexedSeq, thisBS)) } // end of newValues - new IndexedRDD(index, newValues) + new VertexSetRDD(index, newValues) } } } -/** + /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): - IndexedRDD[K, (Seq[V], Seq[W])] = { + def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): + VertexSetRDD[(Seq[V], Seq[W])] = { //RDD[(K, (Seq[V], Seq[W]))] = { other match { - case other: IndexedRDD[_, _] if index == other.index => { + case other: VertexSetRDD[_] if index == other.index => { // if both RDDs share exactly the same index and therefore the same super set of keys // then we simply merge the value RDDs. // However it is possible that both RDDs are missing a value for a given key in @@ -284,9 +283,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } Iterator((newValues.toIndexedSeq, newBS)) } - new IndexedRDD(index, newValues) + new VertexSetRDD(index, newValues) } - case other: IndexedRDD[_, _] + case other: VertexSetRDD[_] if index.rdd.partitioner == other.index.rdd.partitioner => { // If both RDDs are indexed using different indices but with the same partitioners // then we we need to first merge the indicies and then use the merged index to @@ -298,7 +297,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( assert(!thisIter.hasNext) val otherIndex = otherIter.next() assert(!otherIter.hasNext) - val newIndex = new BlockIndex[K]() + val newIndex = new VertexIdToIndexMap() // @todo Merge only the keys that correspond to non-null values // Merge the keys newIndex.putAll(thisIndex) @@ -318,7 +317,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( // Get the new index for this partition val newIndex = newIndexIter.next() assert(!newIndexIter.hasNext) - // Get the corresponding indicies and values for this and the other IndexedRDD + // Get the corresponding indicies and values for this and the other VertexSetRDD val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext) val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() @@ -347,7 +346,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } Iterator((newValues.toIndexedSeq, newBS)) }) - new IndexedRDD(new RDDIndex(newIndex), newValues) + new VertexSetRDD(new VertexSetIndex(newIndex), newValues) } case _ => { // Get the partitioner from the index @@ -360,20 +359,20 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( if (other.partitioner == Some(partitioner)) { other } else { - new ShuffledRDD[K, W, (K,W)](other, partitioner) + other.partitionBy(partitioner) } // Join the other RDD with this RDD building a new valueset and new index on the fly val groups = tuples.zipPartitions(otherShuffled)( (thisTuplesIter, otherTuplesIter) => { - // Get the corresponding indicies and values for this IndexedRDD + // Get the corresponding indicies and values for this VertexSetRDD val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext()) // Construct a new index - val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] + val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap] // Construct a new array Buffer to store the values val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) val newBS = new BitSet(thisValues.size) - // populate the newValues with the values in this IndexedRDD + // populate the newValues with the values in this VertexSetRDD for ((k,i) <- thisIndex) { if (thisBS(i)) { newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) @@ -415,14 +414,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) + new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues) } } } // - // def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = { + // def zipJoinToRDD[W: ClassManifest](other: VertexSetRDD[K,W]): RDD[(K,(V,W))] = { // if(index != other.index) { // throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!") // } @@ -447,11 +446,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( other: RDD[(K,W)])( f: (K, V, W) => Z, merge: (Z,Z) => Z = (a:Z, b:Z) => a): - IndexedRDD[K,Z] = { + VertexSetRDD[K,Z] = { val cleanF = index.rdd.context.clean(f) val cleanMerge = index.rdd.context.clean(merge) other match { - case other: IndexedRDD[_, _] if index == other.index => { + case other: VertexSetRDD[_, _] if index == other.index => { val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ (thisIndexIter, thisIter, otherIter) => val index = thisIndexIter.next() @@ -469,7 +468,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } List((newValues, newBS)).iterator } - new IndexedRDD(index, newValues) + new VertexSetRDD(index, newValues) } case _ => { @@ -508,7 +507,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } List((newValues, tempBS)).iterator } // end of newValues - new IndexedRDD(index, newValues) + new VertexSetRDD(index, newValues) } } } @@ -519,11 +518,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( other: RDD[(K,W)])( f: (K, V, Option[W]) => Z, merge: (Z,Z) => Z = (a:Z, b:Z) => a): - IndexedRDD[K,Z] = { + VertexSetRDD[K,Z] = { val cleanF = index.rdd.context.clean(f) val cleanMerge = index.rdd.context.clean(merge) other match { - case other: IndexedRDD[_, _] if index == other.index => { + case other: VertexSetRDD[_, _] if index == other.index => { val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ (thisIndexIter, thisIter, otherIter) => val index = thisIndexIter.next() @@ -541,7 +540,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } List((newValues, thisBS)).iterator } - new IndexedRDD(index, newValues) + new VertexSetRDD(index, newValues) } case _ => { @@ -584,7 +583,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } List((newValues, thisBS)).iterator } // end of newValues - new IndexedRDD(index, newValues) + new VertexSetRDD(index, newValues) } } } @@ -593,7 +592,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( - override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = { + override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { val cleanF = index.rdd.context.clean(f) val newValues = index.rdd.zipPartitions(valuesRDD){ (keysIter, valuesIter) => @@ -609,14 +608,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } Array((oldValues, newBS)).iterator } - new IndexedRDD[K,V](index, newValues) + new VertexSetRDD[V](index, newValues) } /** * Provide the RDD[(K,V)] equivalent output. */ - override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = { tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => // Walk the index to construct the key, value pairs indexMap.iterator @@ -629,27 +628,27 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } } -} // End of IndexedRDD +} // End of VertexSetRDD -object IndexedRDD { +object VertexSetRDD { - def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] = + def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = apply(rdd, (a:V, b:V) => a ) - def apply[K: ClassManifest, V: ClassManifest]( - rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = { + def apply[V: ClassManifest]( + rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = { // Preaggregate and shuffle if necessary // Preaggregation. - val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc) val partitioner = new HashPartitioner(rdd.partitions.size) val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) val groups = preAgg.mapPartitions( iter => { - val indexMap = new BlockIndex[K]() + val indexMap = new VertexIdToIndexMap() val values = new ArrayBuffer[V] val bs = new BitSet for ((k,v) <- iter) { @@ -669,19 +668,19 @@ object IndexedRDD { val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) val values: RDD[(IndexedSeq[V], BitSet)] = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - new IndexedRDD[K,V](new RDDIndex(index), values) + new VertexSetRDD[V](new VertexSetIndex(index), values) } - def apply[K: ClassManifest, V: ClassManifest]( - rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] = + def apply[V: ClassManifest]( + rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = apply(rdd, index, (a:V,b:V) => a) - def apply[K: ClassManifest, V: ClassManifest]( - rdd: RDD[(K,V)], index: RDDIndex[K], - reduceFunc: (V, V) => V): IndexedRDD[K,V] = + def apply[V: ClassManifest]( + rdd: RDD[(Vid,V)], index: VertexSetIndex, + reduceFunc: (V, V) => V): VertexSetRDD[V] = apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) // { // // Get the index Partitioner @@ -721,16 +720,16 @@ object IndexedRDD { // } // List((values, bs)).iterator // }) - // new IndexedRDD[K,V](index, values) + // new VertexSetRDD[K,V](index, values) // } // end of apply - def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest]( - rdd: RDD[(K,V)], - index: RDDIndex[K], + def apply[V: ClassManifest, C: ClassManifest]( + rdd: RDD[(Vid,V)], + index: VertexSetIndex, createCombiner: V => C, mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C): IndexedRDD[K,C] = { + mergeCombiners: (C, C) => C): VertexSetRDD[C] = { // Get the index Partitioner val partitioner = index.rdd.partitioner match { case Some(p) => p @@ -740,7 +739,7 @@ object IndexedRDD { val partitioned = if (rdd.partitioner != Some(partitioner)) { // Preaggregation. - val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, + val aggregator = new Aggregator[Vid, V, C](createCombiner, mergeValue, mergeCombiners) rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) } else { @@ -769,15 +768,15 @@ object IndexedRDD { } Iterator((values, bs)) }) - new IndexedRDD(index, values) + new VertexSetRDD(index, values) } // end of apply /** * Construct and index of the unique values in a given RDD. */ - def makeIndex[K: ClassManifest](keys: RDD[K], - partitioner: Option[Partitioner] = None): RDDIndex[K] = { + def makeIndex(keys: RDD[Vid], + partitioner: Option[Partitioner] = None): VertexSetIndex = { // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD // Ugly hack :-(. In order to partition the keys they must have values. val tbl = keys.mapPartitions(_.map(k => (k, false)), true) @@ -786,7 +785,7 @@ object IndexedRDD { case None => { if (tbl.partitioner.isEmpty) { // @todo: I don't need the boolean its only there to be the second type of the shuffle. - new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl)) + new ShuffledRDD[Vid, Boolean, (Vid, Boolean)](tbl, Partitioner.defaultPartitioner(tbl)) } else { tbl } } case Some(partitioner) => @@ -794,7 +793,7 @@ object IndexedRDD { } val index = shuffledTbl.mapPartitions( iter => { - val indexMap = new BlockIndex[K]() + val indexMap = new VertexIdToIndexMap() for ( (k,_) <- iter ){ if(!indexMap.contains(k)){ val ind = indexMap.size @@ -803,10 +802,10 @@ object IndexedRDD { } Iterator(indexMap) }, true).cache - new RDDIndex(index) + new VertexSetIndex(index) } -} // end of object IndexedRDD +} // end of object VertexSetRDD diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index c6875f0c9c470d3649a892f95ce6f024720d1350..bdf79bf9f0fbae7e1b87797c351f29548d508bfc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -66,17 +66,16 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( object EdgeTripletBuilder { def makeTriplets[VD: ClassManifest, ED: ClassManifest]( - localVidMap: IndexedRDD[Pid, VertexIdToIndexMap], - vTableReplicatedValues: IndexedRDD[Pid, Array[VD]], - eTable: IndexedRDD[Pid, EdgePartition[ED]]): RDD[EdgeTriplet[VD, ED]] = { - val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => { - val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next() - assert(iter.hasNext == false) + localVidMap: RDD[(Pid, VertexIdToIndexMap)], + vTableReplicatedValues: RDD[(Pid, Array[VD]) ], + eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = { + localVidMap.zipPartitions(vTableReplicatedValues, eTable) { + (vidMapIter, replicatedValuesIter, eTableIter) => + val (_, vidToIndex) = vidMapIter.next() + val (_, vertexArray) = replicatedValuesIter.next() + val (_, edgePartition) = eTableIter.next() new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) } - ClosureCleaner.clean(iterFun) - localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable) - .mapPartitions( iterFun ) // end of map partition } } @@ -100,30 +99,30 @@ object EdgeTripletBuilder { * A Graph RDD that supports computation on graphs. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( - @transient val vTable: IndexedRDD[Vid, VD], - @transient val vid2pid: IndexedRDD[Vid, Array[Pid]], - @transient val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap], - @transient val eTable: IndexedRDD[Pid, EdgePartition[ED]]) + @transient val vTable: VertexSetRDD[VD], + @transient val vid2pid: VertexSetRDD[Array[Pid]], + @transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)], + @transient val eTable: RDD[(Pid, EdgePartition[ED])] ) extends Graph[VD, ED] { // def this() = this(null,null,null) /** - * (localVidMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the + * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the * vertex data after it is replicated. Within each partition, it holds a map * from vertex ID to the index where that vertex's attribute is stored. This * index refers to an array in the same partition in vTableReplicatedValues. * - * (vTableReplicatedValues: IndexedRDD[Pid, Array[VD]]) holds the vertex data + * (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data * and is arranged as described above. */ - @transient val vTableReplicatedValues = + @transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] = createVTableReplicated(vTable, vid2pid, localVidMap) /** Return a RDD of vertices. */ - @transient override val vertices: RDD[(Vid, VD)] = vTable + @transient override val vertices = vTable /** Return a RDD of edges. */ @@ -177,36 +176,40 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def reverse: Graph[VD, ED] = { - val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]] - new GraphImpl(vTable, vid2pid, localVidMap, etable) + val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) }, + preservesPartitioning = true) + new GraphImpl(vTable, vid2pid, localVidMap, newEtable) } override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data)) - .asInstanceOf[IndexedRDD[Vid, VD2]] new GraphImpl(newVTable, vid2pid, localVidMap, eTable) } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = { - val newETable = eTable.mapValues(eBlock => eBlock.map(f)) - .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] + val newETable = eTable.mapPartitions(_.map{ case (pid, epart) => (pid, epart.map(f)) }, + preservesPartitioning = true) new GraphImpl(vTable, vid2pid, localVidMap, newETable) } override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { - val newETable = eTable.zipJoin(localVidMap).zipJoin(vTableReplicatedValues).mapValues{ - case ((edgePartition, vidToIndex), vertexArray) => - val et = new EdgeTriplet[VD, ED] - edgePartition.map{e => - et.set(e) - et.srcAttr = vertexArray(vidToIndex(e.srcId)) - et.dstAttr = vertexArray(vidToIndex(e.dstId)) - f(et) - } - }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] + val newETable = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ + (edgePartitionIter, vidToIndexIter, vertexArrayIter) => + val (pid, edgePartition) = edgePartitionIter.next() + val (_, vidToIndex) = vidToIndexIter.next() + val (_, vertexArray) = vertexArrayIter.next() + val et = new EdgeTriplet[VD, ED] + val newEdgePartition = edgePartition.map{e => + et.set(e) + et.srcAttr = vertexArray(vidToIndex(e.srcId)) + et.dstAttr = vertexArray(vidToIndex(e.dstId)) + f(et) + } + Iterator((pid, newEdgePartition)) + } new GraphImpl(vTable, vid2pid, localVidMap, newETable) } @@ -238,7 +241,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Reuse the partitioner (but not the index) from this graph val newVTable = - IndexedRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner)) + VertexSetRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner)) // Restrict the set of edges to those that satisfy the vertex and the edge predicate. @@ -309,53 +312,56 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def mapReduceTriplets[A: ClassManifest]( mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], reduceFunc: (A, A) => A) - : RDD[(Vid, A)] = { + : VertexSetRDD[A] = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) // Map and preaggregate - val preAgg = localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable).flatMap{ - case (pid, ((vidToIndex, vertexArray), edgePartition)) => - // We can reuse the vidToIndex map for aggregation here as well. - /** @todo Since this has the downside of not allowing "messages" to arbitrary - * vertices we should consider just using a fresh map. - */ - val msgArray = new Array[A](vertexArray.size) - val msgBS = new BitSet(vertexArray.size) - // Iterate over the partition - val et = new EdgeTriplet[VD, ED] - edgePartition.foreach{e => - et.set(e) - et.srcAttr = vertexArray(vidToIndex(e.srcId)) - et.dstAttr = vertexArray(vidToIndex(e.dstId)) - mapFunc(et).foreach{ case (vid, msg) => - // verify that the vid is valid - assert(vid == et.srcId || vid == et.dstId) - val ind = vidToIndex(vid) - // Populate the aggregator map - if(msgBS(ind)) { - msgArray(ind) = reduceFunc(msgArray(ind), msg) - } else { - msgArray(ind) = msg - msgBS(ind) = true - } + val preAgg = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ + (edgePartitionIter, vidToIndexIter, vertexArrayIter) => + val (pid, edgePartition) = edgePartitionIter.next() + val (_, vidToIndex) = vidToIndexIter.next() + val (_, vertexArray) = vertexArrayIter.next() + // We can reuse the vidToIndex map for aggregation here as well. + /** @todo Since this has the downside of not allowing "messages" to arbitrary + * vertices we should consider just using a fresh map. + */ + val msgArray = new Array[A](vertexArray.size) + val msgBS = new BitSet(vertexArray.size) + // Iterate over the partition + val et = new EdgeTriplet[VD, ED] + edgePartition.foreach{e => + et.set(e) + et.srcAttr = vertexArray(vidToIndex(e.srcId)) + et.dstAttr = vertexArray(vidToIndex(e.dstId)) + mapFunc(et).foreach{ case (vid, msg) => + // verify that the vid is valid + assert(vid == et.srcId || vid == et.dstId) + val ind = vidToIndex(vid) + // Populate the aggregator map + if(msgBS(ind)) { + msgArray(ind) = reduceFunc(msgArray(ind), msg) + } else { + msgArray(ind) = msg + msgBS(ind) = true } } - // Return the aggregate map - vidToIndex.long2IntEntrySet().fastIterator() - // Remove the entries that did not receive a message - .filter{ entry => msgBS(entry.getValue()) } - // Construct the actual pairs - .map{ entry => - val vid = entry.getLongKey() - val ind = entry.getValue() - val msg = msgArray(ind) - (vid, msg) - } + } + // Return the aggregate map + vidToIndex.long2IntEntrySet().fastIterator() + // Remove the entries that did not receive a message + .filter{ entry => msgBS(entry.getValue()) } + // Construct the actual pairs + .map{ entry => + val vid = entry.getLongKey() + val ind = entry.getValue() + val msg = msgArray(ind) + (vid, msg) + } }.partitionBy(vTable.index.rdd.partitioner.get) // do the final reduction reusing the index map - IndexedRDD(preAgg, vTable.index, reduceFunc) + VertexSetRDD(preAgg, vTable.index, reduceFunc) } @@ -402,7 +408,7 @@ object GraphImpl { defaultVertexAttr: VD, mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { - val vtable = IndexedRDD(vertices, mergeFunc) + val vtable = VertexSetRDD(vertices, mergeFunc) /** * @todo Verify that there are no edges that contain vertices * that are not in vTable. This should probably be resolved: @@ -432,54 +438,54 @@ object GraphImpl { * containing all the edges in a partition. */ protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]]) - : IndexedRDD[Pid, EdgePartition[ED]] = { - // Get the number of partitions - val numPartitions = edges.partitions.size - val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt - IndexedRDD(edges.map { e => - // Random partitioning based on the source vertex id. - // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) - // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) - val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions) - //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) - - // Should we be using 3-tuple or an optimized class - MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + : RDD[(Pid, EdgePartition[ED])] = { + // Get the number of partitions + val numPartitions = edges.partitions.size + val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt + edges.map { e => + // Random partitioning based on the source vertex id. + // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) + // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) + val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions) + //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) + + // Should we be using 3-tuple or an optimized class + MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex( (pid, iter) => { + val builder = new EdgePartitionBuilder[ED] + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) } - .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex({ (pid, iter) => - val builder = new EdgePartitionBuilder[ED] - iter.foreach { message => - val data = message.data - builder.add(data._1, data._2, data._3) - } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true)) + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true).cache() } protected def createVid2Pid[ED: ClassManifest]( - eTable: IndexedRDD[Pid, EdgePartition[ED]], - vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = { + eTable: RDD[(Pid, EdgePartition[ED])], + vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = { val preAgg = eTable.mapPartitions { iter => val (pid, edgePartition) = iter.next() val vSet = new VertexSet edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)}) vSet.iterator.map { vid => (vid.toLong, pid) } } - IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, + VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, (p: Pid) => ArrayBuffer(p), (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b) - .mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]] + .mapValues(a => a.toArray).cache() } - protected def createLocalVidMap[ED: ClassManifest]( - eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexIdToIndexMap] = { - eTable.mapValues{ epart => - val vidToIndex = new VertexIdToIndexMap() + protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]): + RDD[(Pid, VertexIdToIndexMap)] = { + eTable.mapPartitions( _.map{ case (pid, epart) => + val vidToIndex = new VertexIdToIndexMap var i = 0 epart.foreach{ e => if(!vidToIndex.contains(e.srcId)) { @@ -491,16 +497,16 @@ object GraphImpl { i += 1 } } - vidToIndex - } + (pid, vidToIndex) + }, preservesPartitioning = true).cache() } protected def createVTableReplicated[VD: ClassManifest]( - vTable: IndexedRDD[Vid, VD], - vid2pid: IndexedRDD[Vid, Array[Pid]], - replicationMap: IndexedRDD[Pid, VertexIdToIndexMap]): - IndexedRDD[Pid, Array[VD]] = { + vTable: VertexSetRDD[VD], + vid2pid: VertexSetRDD[Array[Pid]], + replicationMap: RDD[(Pid, VertexIdToIndexMap)]): + RDD[(Pid, Array[VD])] = { // Join vid2pid and vTable, generate a shuffle dependency on the joined // result, and get the shuffle id so we can use it on the slave. val msgsByPartition = vTable.zipJoin(vid2pid) @@ -509,9 +515,9 @@ object GraphImpl { } .partitionBy(replicationMap.partitioner.get).cache() - val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){ + replicationMap.zipPartitions(msgsByPartition){ (mapIter, msgsIter) => - val (IndexedSeq(vidToIndex), bs) = mapIter.next() + val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map val vertexArray = new Array[VD](vidToIndex.size) @@ -519,14 +525,12 @@ object GraphImpl { val ind = vidToIndex(msg.data._1) vertexArray(ind) = msg.data._2 } - Iterator((IndexedSeq(vertexArray), bs)) - } - - new IndexedRDD(replicationMap.index, newValuesRDD) + Iterator((pid, vertexArray)) + }.cache() // @todo assert edge table has partitioner - // val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap] = + // val localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap] = // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { // val vidToIndex = new VertexIdToIndexMap // var i = 0 @@ -537,7 +541,7 @@ object GraphImpl { // Array((pid, vidToIndex)).iterator // }, preservesPartitioning = true).indexed(eTable.index) - // val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] = + // val vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]] = // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { // val vertexArray = ArrayBuilder.make[VD] // for (msg <- iter) {