diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index 30566adba37d7eeefe5f3db9ff98526680429bab..2f1f907c6c746c7b6ff28844117c31a5799d83e5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -72,7 +72,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI */ class IndexedRDD[K: ClassManifest, V: ClassManifest]( @transient val index: RDDIndex[K], - @transient val valuesRDD: RDD[ (Array[V], BitSet) ]) + @transient val valuesRDD: RDD[ (Seq[V], BitSet) ]) extends RDD[(K, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -113,46 +113,160 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = { + val cleanF = index.rdd.context.clean(f) + val newValuesRDD = valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + val newValues = new Array[U](values.size) + for ( ind <- bs ) { + newValues(ind) = f(values(ind)) + } + (newValues.toSeq, bs) + }, preservesPartitioning = true) + new IndexedRDD[K,U](index, newValuesRDD) + } + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = { + val cleanF = index.rdd.context.clean(f) + val newValues = index.rdd.zipPartitions(valuesRDD){ + (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val (oldValues, bs) = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newValues: Array[U] = new Array[U](oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if (bs(i)) { newValues(i) = f(k, oldValues(i)) } + } + Array((newValues.toSeq, bs)).iterator + } + new IndexedRDD[K,U](index, newValues) + } - def zipJoinRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = { - assert(index == other.index) - index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ - (thisIndexIter, thisIter, otherIter) => - val index = thisIndexIter.next() - assert(!thisIndexIter.hasNext) + + def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = { + if(index != other.index) { + throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") + } + val newValuesRDD: RDD[ (Seq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS) = otherIter.next() assert(!otherIter.hasNext) val newBS = thisBS & otherBS - - index.iterator.flatMap{ case (k,i) => - if(newBS(i)) List((k, (thisValues(i), otherValues(i)))) - else List.empty - } + val newValues = thisValues.view.zip(otherValues) + Iterator((newValues, newBS)) } + new IndexedRDD(index, newValuesRDD) } - def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = { - assert(index == other.index) - val newValuesRDD = valuesRDD.zipPartitions(other.valuesRDD){ + + def leftZipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,Option[W])] = { + if(index != other.index) { + throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") + } + val newValuesRDD: RDD[ (Seq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ (thisIter, otherIter) => val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS) = otherIter.next() assert(!otherIter.hasNext) - val newBS = thisBS & otherBS - val newValues = new Array[(V,W)](thisValues.size) - for( i <- newBS ) { - newValues(i) = (thisValues(i), otherValues(i)) - } - List((newValues, newBS)).iterator + val otherOption = otherValues.view.zipWithIndex + .map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None } + val newValues = thisValues.view.zip(otherOption) + Iterator((newValues, thisBS)) } new IndexedRDD(index, newValuesRDD) } + + def leftJoin[W: ClassManifest]( + other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a): + IndexedRDD[K, (V, Option[W]) ] = { + val cleanMerge = index.rdd.context.clean(merge) + other match { + case other: IndexedRDD[_, _] if index == other.index => { + leftZipJoin(other) + } + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) other + else other.partitionBy(partitioner) + val newValues = index.rdd.zipPartitions(valuesRDD, other) { + (thisIndexIter, thisIter, tuplesIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val newW = new Array[W](thisValues.size) + // track which values are matched with values in other + val wBS = new BitSet(thisValues.size) + for( (k, w) <- tuplesIter if index.contains(k) ) { + val ind = index.get(k) + if(thisBS(ind)) { + if(wBS(ind)) { + newW(ind) = cleanMerge(newW(ind), w) + } else { + newW(ind) = w + wBS(ind) = true + } + } + } + + val otherOption = newW.view.zipWithIndex + .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None } + val newValues = thisValues.view.zip(otherOption) + + Iterator((newValues.toSeq, thisBS)) + } // end of newValues + new IndexedRDD(index, newValues) + } + } + } + + + + // + // def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = { + // if(index != other.index) { + // throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!") + // } + // index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + // (thisIndexIter, thisIter, otherIter) => + // val index = thisIndexIter.next() + // assert(!thisIndexIter.hasNext) + // val (thisValues, thisBS) = thisIter.next() + // assert(!thisIter.hasNext) + // val (otherValues, otherBS) = otherIter.next() + // assert(!otherIter.hasNext) + // val newBS = thisBS & otherBS + // index.iterator.filter{ case (k,i) => newBS(i) }.map{ + // case (k,i) => (k, (thisValues(i), otherValues(i))) + // } + // } + // } + + +/* This is probably useful but we are not using it def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest]( other: RDD[(K,W)])( f: (K, V, W) => Z, @@ -222,9 +336,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } } } +*/ - - +/* def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest]( other: RDD[(K,W)])( f: (K, V, Option[W]) => Z, @@ -299,7 +413,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } } - +*/ /** @@ -320,15 +434,13 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( assert(keysIter.hasNext() == false) val (oldValues, bs) = valuesIter.next() assert(valuesIter.hasNext() == false) - // Allocate the array to store the results into - val newValues: Array[V] = oldValues.clone().asInstanceOf[Array[V]] + // Allocate the array to store the results into val newBS = new BitSet(oldValues.size) // Populate the new Values for( (k,i) <- index ) { - if ( bs(i) && f( (k, oldValues(i)) ) ) { newBS(i) = true } - else { newValues(i) = null.asInstanceOf[V] } + newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) } - Array((newValues, newBS)).iterator + Array((oldValues, newBS)).iterator } new IndexedRDD[K,V](index, newValues) } @@ -371,7 +483,7 @@ object IndexedRDD { val groups = preAgg.mapPartitions( iter => { val indexMap = new BlockIndex[K]() - val values = new ArrayBuffer[V]() + val values = new ArrayBuffer[V] val bs = new BitSet for ((k,v) <- iter) { if(!indexMap.contains(k)) { @@ -384,7 +496,7 @@ object IndexedRDD { values(ind) = reduceFunc(values(ind), v) } } - List( (indexMap, (values.toArray, bs)) ).iterator + Iterator( (indexMap, (values.toSeq, bs)) ) }, true).cache // extract the index and the values val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) @@ -468,7 +580,7 @@ object IndexedRDD { } // Use the index to build the new values table - val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + val values: RDD[ (Seq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) @@ -487,7 +599,7 @@ object IndexedRDD { bs(ind) = true } } - List((values, bs)).iterator + Iterator((values, bs)) }) new IndexedRDD(index, values) } // end of apply @@ -521,7 +633,7 @@ object IndexedRDD { indexMap.put(k, ind) } } - List(indexMap).iterator + Iterator(indexMap) }, true).cache new RDDIndex(index) } diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala index e497ef691f4ca28eee7718d4692cdcd7974e0b2e..0310711d371e119d2e312f3844fc776aaa71e870 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala @@ -37,48 +37,22 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K def reindex(): IndexedRDD[K,V] = IndexedRDD(self) - /** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. - */ - override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { - val cleanF = self.index.rdd.context.clean(f) - val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ - case (values, bs) => - val newValues = new Array[U](values.size) - for ( ind <- bs ) { - newValues(ind) = f(values(ind)) - } - (newValues, bs) - }, preservesPartitioning = true) - new IndexedRDD[K,U](self.index, newValuesRDD) - } - - - /** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. - */ - override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { - val cleanF = self.index.rdd.context.clean(f) - val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ - (keysIter, valuesIter) => - val index = keysIter.next() - assert(keysIter.hasNext() == false) - val (oldValues, bs) = valuesIter.next() - assert(valuesIter.hasNext() == false) - // Allocate the array to store the results into - val newValues: Array[U] = new Array[U](oldValues.size) - // Populate the new Values - for( (k,i) <- index ) { - if (bs(i)) { newValues(i) = f(k, oldValues(i)) } - } - Array((newValues, bs)).iterator - } - new IndexedRDD[K,U](self.index, newValues) - } - - + // /** + // * Pass each value in the key-value pair RDD through a map function without changing the keys; + // * this also retains the original RDD's partitioning. + // */ + // override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { + // val cleanF = self.index.rdd.context.clean(f) + // val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ + // case (values, bs) => + // val newValues = new Array[U](values.size) + // for ( ind <- bs ) { + // newValues(ind) = f(values(ind)) + // } + // (newValues.toSeq, bs) + // }, preservesPartitioning = true) + // new IndexedRDD[K,U](self.index, newValuesRDD) + // } /** * Pass each value in the key-value pair RDD through a flatMap function without changing the @@ -97,7 +71,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K newBS(ind) = true } } - (newValues, newBS) + (newValues.toSeq, newBS) }, preservesPartitioning = true) new IndexedRDD[K,U](self.index, newValuesRDD) } @@ -162,7 +136,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] newValues(ind) = (a, b) } - List((newValues, newBS)).iterator + Iterator((newValues.toSeq, newBS)) } new IndexedRDD(self.index, newValues) } @@ -225,7 +199,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K newBS(ind) = true } } - List((newValues, newBS)).iterator + Iterator((newValues.toSeq, newBS)) }) new IndexedRDD(new RDDIndex(newIndex), newValues) } @@ -288,7 +262,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K // case null => null // case (s, ab) => Seq((s, ab.toSeq)) // }.toSeq - List( (newIndex, (newValues.toArray, newBS)) ).iterator + Iterator( (newIndex, (newValues.toSeq, newBS)) ) }).cache() // Extract the index and values from the above RDD diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index c2ef63d1fd3d81e42baacd92ebc7af2da189615f..ef3aa199bdf419a59c6c94405cd946a86b950136 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -28,10 +28,11 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { /** * Set the edge properties of this triplet. */ - protected[spark] def set(other: Edge[ED]) { + protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = { srcId = other.srcId dstId = other.dstId attr = other.attr + this } /** diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 87fb9dcd2e05f0f6410baa459c9723da98443a7b..ce1b9467c407d3838716d06528452de239a4a98c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -5,6 +5,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuilder +import scala.collection.mutable.BitSet + import org.apache.spark.SparkContext._ import org.apache.spark.Partitioner @@ -40,7 +42,6 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( et.dstId = edgePartition.dstIds(pos) // assert(vmap.containsKey(e.dst.id)) et.dstAttr = vertexArray(vidToIndex(et.dstId)) - //println("Iter called: " + pos) et.attr = edgePartition.data(pos) pos += 1 et @@ -63,32 +64,47 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( } } // end of Edge Triplet Iterator + + object EdgeTripletBuilder { def makeTriplets[VD: ClassManifest, ED: ClassManifest]( - vTableReplicationMap: IndexedRDD[Pid, VertexIdToIndexMap], + localVidMap: IndexedRDD[Pid, VertexIdToIndexMap], vTableReplicatedValues: IndexedRDD[Pid, Array[VD]], eTable: IndexedRDD[Pid, EdgePartition[ED]]): RDD[EdgeTriplet[VD, ED]] = { val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => { val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next() - //assert(iter.hasNext == false) - // Return an iterator that looks up the hash map to find matching - // vertices for each edge. + assert(iter.hasNext == false) new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) } ClosureCleaner.clean(iterFun) - vTableReplicationMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable) + localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable) .mapPartitions( iterFun ) // end of map partition } - } +// { +// val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => { +// val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next() +// assert(iter.hasNext == false) +// // Return an iterator that looks up the hash map to find matching +// // vertices for each edge. +// new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) +// } +// ClosureCleaner.clean(iterFun) +// localVidMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable) +// .mapPartitions( iterFun ) // end of map partition +// } +// } + + /** * A Graph RDD that supports computation on graphs. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vTable: IndexedRDD[Vid, VD], @transient val vid2pid: IndexedRDD[Vid, Array[Pid]], + @transient val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap], @transient val eTable: IndexedRDD[Pid, EdgePartition[ED]]) extends Graph[VD, ED] { @@ -96,7 +112,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** - * (vTableReplicationMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the + * (localVidMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the * vertex data after it is replicated. Within each partition, it holds a map * from vertex ID to the index where that vertex's attribute is stored. This * index refers to an array in the same partition in vTableReplicatedValues. @@ -104,8 +120,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( * (vTableReplicatedValues: IndexedRDD[Pid, Array[VD]]) holds the vertex data * and is arranged as described above. */ - @transient val (vTableReplicationMap, vTableReplicatedValues) = - createVTableReplicated(vTable, vid2pid, eTable) + @transient val vTableReplicatedValues = + createVTableReplicated(vTable, vid2pid, localVidMap) /** Return a RDD of vertices. */ @@ -119,8 +135,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** Return a RDD that brings edges with its source and destination vertices together. */ - @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = - EdgeTripletBuilder.makeTriplets(vTableReplicationMap, vTableReplicatedValues, eTable) + @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = + EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable) // { @@ -164,26 +180,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def reverse: Graph[VD, ED] = { val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]] - new GraphImpl(vTable, vid2pid, etable) + new GraphImpl(vTable, vid2pid, localVidMap, etable) } override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data)) .asInstanceOf[IndexedRDD[Vid, VD2]] - new GraphImpl(newVTable, vid2pid, eTable) + new GraphImpl(newVTable, vid2pid, localVidMap, eTable) } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = { val newETable = eTable.mapValues(eBlock => eBlock.map(f)) .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] - new GraphImpl(vTable, vid2pid, newETable) + new GraphImpl(vTable, vid2pid, localVidMap, newETable) } override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { - val newETable = eTable.zipJoin(vTableReplicationMap).zipJoin(vTableReplicatedValues).mapValues{ + val newETable = eTable.zipJoin(localVidMap).zipJoin(vTableReplicatedValues).mapValues{ case ((edgePartition, vidToIndex), vertexArray) => val et = new EdgeTriplet[VD, ED] edgePartition.map{e => @@ -193,7 +209,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( f(et) } }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] - new GraphImpl(vTable, vid2pid, newETable) + new GraphImpl(vTable, vid2pid, localVidMap, newETable) } // override def correctEdges(): Graph[VD, ED] = { @@ -239,8 +255,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // behaves deterministically. // @todo reindex the vertex and edge tables val newVid2Pid = createVid2Pid(newETable, newVTable.index) + val newVidMap = createLocalVidMap(newETable) - new GraphImpl(newVTable, newVid2Pid, newETable) + new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable) } @@ -276,9 +293,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Because all ETs with the same src and dst will live on the same // partition due to the EdgePartitioner, this guarantees that these // ET groups will be complete. - .groupBy { t: EdgeTriplet[VD, ED] => - //println("(" + t.src.id + ", " + t.dst.id + ", " + t.data + ")") - (t.srcId, t.dstId) } + .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } //.groupBy { e => (e.src, e.dst) } // Apply the user supplied supplied edge group function to // each group of edges @@ -308,7 +323,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newETable = createETable(newEdges, eTable.index.partitioner.numPartitions) - new GraphImpl(vTable, vid2pid, newETable) + + new GraphImpl(vTable, vid2pid, localVidMap, newETable) } @@ -318,9 +334,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter => partIter.toList - .groupBy { e: Edge[ED] => - println(e.srcId + " " + e.dstId) - (e.srcId, e.dstId) } + .groupBy { e: Edge[ED] => (e.srcId, e.dstId) } .mapValues { ts => f(ts.toIterator) } .toList .toIterator @@ -330,7 +344,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newETable = createETable(newEdges, eTable.index.partitioner.numPartitions) - new GraphImpl(vTable, vid2pid, newETable) + new GraphImpl(vTable, vid2pid, localVidMap, newETable) } @@ -348,26 +362,45 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( ClosureCleaner.clean(reduceFunc) // Map and preaggregate - val preAgg = vTableReplicationMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable).flatMap{ + val preAgg = localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable).flatMap{ case (pid, ((vidToIndex, vertexArray), edgePartition)) => - val aggMap = new VertexHashMap[A] + // We can reuse the vidToIndex map for aggregation here as well. + /** @todo Since this has the downside of not allowing "messages" to arbitrary + * vertices we should consider just using a fresh map. + */ + val msgArray = new Array[A](vertexArray.size) + val msgBS = new BitSet(vertexArray.size) + // Iterate over the partition val et = new EdgeTriplet[VD, ED] edgePartition.foreach{e => et.set(e) et.srcAttr = vertexArray(vidToIndex(e.srcId)) et.dstAttr = vertexArray(vidToIndex(e.dstId)) - mapFunc(et).foreach{case (vid, a) => - if(aggMap.containsKey(vid)) { - aggMap.put(vid, reduceFunc(aggMap.get(vid), a)) - } else { aggMap.put(vid, a) } + mapFunc(et).foreach{ case (vid, msg) => + // verify that the vid is valid + assert(vid == et.srcId || vid == et.dstId) + val ind = vidToIndex(vid) + // Populate the aggregator map + if(msgBS(ind)) { + msgArray(ind) = reduceFunc(msgArray(ind), msg) + } else { + msgArray(ind) = msg + msgBS(ind) = true + } } } // Return the aggregate map - aggMap.long2ObjectEntrySet().fastIterator().map{ - entry => (entry.getLongKey(), entry.getValue()) + vidToIndex.long2IntEntrySet().fastIterator() + // Remove the entries that did not receive a message + .filter{ entry => msgBS(entry.getValue()) } + // Construct the actual pairs + .map{ entry => + val vid = entry.getLongKey() + val ind = entry.getValue() + val msg = msgArray(ind) + (vid, msg) } }.partitionBy(vTable.index.rdd.partitioner.get) - // do the final reduction reusing the index map IndexedRDD(preAgg, vTable.index, reduceFunc) } @@ -377,8 +410,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] = { ClosureCleaner.clean(updateF) - val newVTable = vTable.zipJoinLeftWithKeys(updates)(updateF) - new GraphImpl(newVTable, vid2pid, eTable) + val newVTable = vTable.leftJoin(updates).mapValuesWithKeys( + (vid, vu) => updateF(vid, vu._1, vu._2) ) + new GraphImpl(newVTable, vid2pid, localVidMap, eTable) } @@ -417,8 +451,8 @@ object GraphImpl { val vtable = vertices.indexed(numVPart) val etable = createETable(edges, numEPart) val vid2pid = createVid2Pid(etable, vtable.index) - - new GraphImpl(vtable, vid2pid, etable) + val localVidMap = createLocalVidMap(etable) + new GraphImpl(vtable, vid2pid, localVidMap, etable) } @@ -476,40 +510,77 @@ object GraphImpl { } - protected def createVTableReplicated[VD: ClassManifest, ED: ClassManifest]( - vTable: IndexedRDD[Vid, VD], vid2pid: IndexedRDD[Vid, Array[Pid]], - eTable: IndexedRDD[Pid, EdgePartition[ED]]): - (IndexedRDD[Pid, VertexIdToIndexMap], IndexedRDD[Pid, Array[VD]]) = { + protected def createLocalVidMap[ED: ClassManifest]( + eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexIdToIndexMap] = { + eTable.mapValues{ epart => + val vidToIndex = new VertexIdToIndexMap() + var i = 0 + epart.foreach{ e => + if(!vidToIndex.contains(e.srcId)) { + vidToIndex.put(e.srcId, i) + i += 1 + } + if(!vidToIndex.contains(e.dstId)) { + vidToIndex.put(e.dstId, i) + i += 1 + } + } + vidToIndex + } + } + + + protected def createVTableReplicated[VD: ClassManifest]( + vTable: IndexedRDD[Vid, VD], + vid2pid: IndexedRDD[Vid, Array[Pid]], + replicationMap: IndexedRDD[Pid, VertexIdToIndexMap]): + IndexedRDD[Pid, Array[VD]] = { // Join vid2pid and vTable, generate a shuffle dependency on the joined // result, and get the shuffle id so we can use it on the slave. - val msgsByPartition = - vTable.zipJoinRDD(vid2pid) - .flatMap { case (vid, (vdata, pids)) => - pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } - } - .partitionBy(eTable.partitioner.get).cache() - // @todo assert edge table has partitioner + val msgsByPartition = vTable.zipJoin(vid2pid) + .flatMap { case (vid, (vdata, pids)) => + pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } + } + .partitionBy(replicationMap.partitioner.get).cache() + + val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){ + (mapIter, msgsIter) => + val (Seq(vidToIndex), bs) = mapIter.next() + assert(!mapIter.hasNext) + // Populate the vertex array using the vidToIndex map + val vertexArray = new Array[VD](vidToIndex.size) + for (msg <- msgsIter) { + val ind = vidToIndex(msg.data._1) + vertexArray(ind) = msg.data._2 + } + Iterator((Seq(vertexArray), bs)) + } - val vTableReplicationMap: IndexedRDD[Pid, VertexIdToIndexMap] = - msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { - val vidToIndex = new VertexIdToIndexMap - var i = 0 - for (msg <- iter) { - vidToIndex.put(msg.data._1, i) - } - Array((pid, vidToIndex)).iterator - }, preservesPartitioning = true).indexed(eTable.index) - - val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] = - msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { - val vertexArray = ArrayBuilder.make[VD] - for (msg <- iter) { - vertexArray += msg.data._2 - } - Array((pid, vertexArray.result)).iterator - }, preservesPartitioning = true).indexed(eTable.index) + new IndexedRDD(replicationMap.index, newValuesRDD) + + // @todo assert edge table has partitioner - (vTableReplicationMap, vTableReplicatedValues) + // val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap] = + // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { + // val vidToIndex = new VertexIdToIndexMap + // var i = 0 + // for (msg <- iter) { + // vidToIndex.put(msg.data._1, i) + // i += 1 + // } + // Array((pid, vidToIndex)).iterator + // }, preservesPartitioning = true).indexed(eTable.index) + + // val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] = + // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { + // val vertexArray = ArrayBuilder.make[VD] + // for (msg <- iter) { + // vertexArray += msg.data._2 + // } + // Array((pid, vertexArray.result)).iterator + // }, preservesPartitioning = true).indexed(eTable.index) + + // (localVidMap, vTableReplicatedValues) } diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index 47d5acb9e76ba55a9bac9972c2f382a4e575cec0..4627c3566ca192f675c4129cb23a2ff0398f727b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -9,7 +9,7 @@ package object graph { type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList // @todo replace with rxin's fast hashmap - type VertexIdToIndexMap = scala.collection.mutable.HashMap[Vid, Int] + type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap /** * Return the default null-like value for a data type T.