diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index f8d54a8f738ce741693a70001bb385b9b5785389..e86d7ef76779867181ab3885c257864375623919 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -157,6 +157,16 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( /** Return the value at the specified position. */ def getValue(pos: Int): T = _data(pos) + def iterator() = new Iterator[T] { + var pos = nextPos(0) + override def hasNext: Boolean = pos != INVALID_POS + override def next(): T = { + val tmp = getValue(pos) + pos = nextPos(pos+1) + tmp + } + } + /** Return the value at the specified position. */ def getValueSafe(pos: Int): T = { assert(_bitset.get(pos)) diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index acfdc4378b0a0cd612832e8e3860e53690215827..f5b4c57f72902167d0560ed98a1cafbd72b2a439 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -1,7 +1,7 @@ package org.apache.spark.graph import org.apache.spark.rdd.RDD - +import org.apache.spark.storage.StorageLevel /** * The Graph abstractly represents a graph with arbitrary objects @@ -12,21 +12,21 @@ import org.apache.spark.rdd.RDD * operations return new graphs. * * @see GraphOps for additional graph member functions. - * + * * @note The majority of the graph operations are implemented in * `GraphOps`. All the convenience operations are defined in the * `GraphOps` class which may be shared across multiple graph * implementations. * * @tparam VD the vertex attribute type - * @tparam ED the edge attribute type + * @tparam ED the edge attribute type */ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** * Get the vertices and their data. * - * @note vertex ids are unique. + * @note vertex ids are unique. * @return An RDD containing the vertices in this graph * * @see Vertex for the vertex type. @@ -70,6 +70,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { */ val triplets: RDD[EdgeTriplet[VD, ED]] + + + def persist(newLevel: StorageLevel): Graph[VD, ED] + + /** * Return a graph that is cached when first created. This is used to * pin a graph in memory enabling multiple queries to reuse the same @@ -100,7 +105,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @tparam VD2 the new vertex data type * * @example We might use this operation to change the vertex values - * from one type to another to initialize an algorithm. + * from one type to another to initialize an algorithm. * {{{ * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") * val root = 42 @@ -190,7 +195,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @return the subgraph containing only the vertices and edges that * satisfy the predicates. */ - def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] @@ -255,12 +260,12 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @param reduceFunc the user defined reduce function which should * be commutative and assosciative and is used to combine the output * of the map phase. - * + * * @example We can use this function to compute the inDegree of each * vertex * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") - * val inDeg: RDD[(Vid, Int)] = + * val inDeg: RDD[(Vid, Int)] = * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _) * }}} * @@ -269,12 +274,12 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * Graph API in that enables neighborhood level computation. For * example this function can be used to count neighbors satisfying a * predicate or implement PageRank. - * + * */ def mapReduceTriplets[A: ClassManifest]( mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], reduceFunc: (A, A) => A) - : VertexSetRDD[A] + : VertexSetRDD[A] /** @@ -296,11 +301,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @example This function is used to update the vertices with new * values based on external data. For example we could add the out * degree to each vertex record - * + * * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("webgraph") * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees() - * val graph = rawGraph.outerJoinVertices(outDeg) { + * val graph = rawGraph.outerJoinVertices(outDeg) { * (vid, data, optDeg) => optDeg.getOrElse(0) * } * }}} @@ -337,7 +342,7 @@ object Graph { * (i.e., the undirected degree). * * @param rawEdges the RDD containing the set of edges in the graph - * + * * @return a graph with edge attributes containing the count of * duplicate edges and vertex attributes containing the total degree * of each vertex. @@ -368,10 +373,10 @@ object Graph { rawEdges.map { case (s, t) => Edge(s, t, 1) } } // Determine unique vertices - /** @todo Should this reduceByKey operation be indexed? */ - val vertices: RDD[(Vid, Int)] = + /** @todo Should this reduceByKey operation be indexed? */ + val vertices: RDD[(Vid, Int)] = edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _) - + // Return graph GraphImpl(vertices, edges, 0) } @@ -392,7 +397,7 @@ object Graph { * */ def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid,VD)], + vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = { val defaultAttr: VD = null.asInstanceOf[VD] Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a) @@ -416,7 +421,7 @@ object Graph { * */ def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid,VD)], + vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD, mergeFunc: (VD, VD) => VD): Graph[VD, ED] = { diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 501e593e917eae3cf4df6940b138d2ff9a5d2c0b..3b4d3c0df2a51ca178194f4623ff02e4baa960ec 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -98,14 +98,14 @@ object Pregel { : Graph[VD, ED] = { // Receive the first set of messages - var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) + var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)).cache var i = 0 while (i < numIter) { // compute the messages val messages = g.mapReduceTriplets(sendMsg, mergeMsg) // receive the messages - g = g.joinVertices(messages)(vprog) + g = g.joinVertices(messages)(vprog).cache // count the iteration i += 1 } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 8611d2f0ce1a98da2adcfd08e6c232dd107b5904..e8b8bb32280f394e3e240fe701675529b7876ebc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa * The `VertexSetIndex` maintains the per-partition mapping from * vertex id to the corresponding location in the per-partition values * array. This class is meant to be an opaque type. - * + * */ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { /** @@ -55,7 +55,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { * In addition to providing the basic RDD[(Vid,V)] functionality the * VertexSetRDD exposes an index member which can be used to "key" * other VertexSetRDDs - * + * * @tparam V the vertex attribute associated with each vertex in the * set. * @@ -84,7 +84,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { class VertexSetRDD[@specialized V: ClassManifest]( @transient val index: VertexSetIndex, @transient val valuesRDD: RDD[ ( Array[V], BitSet) ]) - extends RDD[(Vid, V)](index.rdd.context, + extends RDD[(Vid, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -100,32 +100,32 @@ class VertexSetRDD[@specialized V: ClassManifest]( * An internal representation which joins the block indices with the values * This is used by the compute function to emulate RDD[(Vid, V)] */ - protected[spark] val tuples = + protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) /** - * The partitioner is defined by the index. + * The partitioner is defined by the index. */ override val partitioner = index.rdd.partitioner - + /** * The actual partitions are defined by the tuples. */ - override def getPartitions: Array[Partition] = tuples.getPartitions - + override def getPartitions: Array[Partition] = tuples.getPartitions + /** - * The preferred locations are computed based on the preferred - * locations of the tuples. + * The preferred locations are computed based on the preferred + * locations of the tuples. */ - override def getPreferredLocations(s: Partition): Seq[String] = + override def getPreferredLocations(s: Partition): Seq[String] = tuples.getPreferredLocations(s) /** - * Caching an VertexSetRDD causes the index and values to be cached separately. + * Caching an VertexSetRDD causes the index and values to be cached separately. */ override def persist(newLevel: StorageLevel): VertexSetRDD[V] = { index.persist(newLevel) @@ -143,7 +143,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * Provide the RDD[(K,V)] equivalent output. + * Provide the RDD[(K,V)] equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = { tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => @@ -154,19 +154,19 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** * Restrict the vertex set to the set of vertices satisfying the - * given predicate. - * + * given predicate. + * * @param pred the user defined predicate * * @note The vertex set preserves the original index structure * which means that the returned RDD can be easily joined with - * the original vertex-set. Furthermore, the filter only - * modifies the bitmap index and so no new values are allocated. + * the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. */ override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { val cleanPred = index.rdd.context.clean(pred) - val newValues = index.rdd.zipPartitions(valuesRDD){ - (keysIter: Iterator[VertexIdToIndexMap], + val newValues = index.rdd.zipPartitions(valuesRDD){ + (keysIter: Iterator[VertexIdToIndexMap], valuesIter: Iterator[(Array[V], BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext == false) @@ -174,7 +174,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( assert(valuesIter.hasNext == false) // Allocate the array to store the results into val newBS = new BitSet(index.capacity) - // Iterate over the active bits in the old bitset and + // Iterate over the active bits in the old bitset and // evaluate the predicate var ind = bs.nextSetBit(0) while(ind >= 0) { @@ -193,7 +193,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** * Pass each vertex attribute through a map function and retain the * original RDD's partitioning and index. - * + * * @tparam U the type returned by the map function * * @param f the function applied to each value in the RDD @@ -204,12 +204,12 @@ class VertexSetRDD[@specialized V: ClassManifest]( def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { val cleanF = index.rdd.context.clean(f) val newValuesRDD: RDD[ (Array[U], BitSet) ] = - valuesRDD.mapPartitions(iter => iter.map{ + valuesRDD.mapPartitions(iter => iter.map{ case (values, bs: BitSet) => val newValues = new Array[U](values.size) bs.iterator.foreach { ind => newValues(ind) = cleanF(values(ind)) } (newValues, bs) - }, preservesPartitioning = true) + }, preservesPartitioning = true) new VertexSetRDD[U](index, newValuesRDD) } // end of mapValues @@ -217,7 +217,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. - * + * * @tparam U the type returned by the map function * * @param f the function applied to each vertex id and vertex @@ -229,8 +229,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { val cleanF = index.rdd.context.clean(f) val newValues: RDD[ (Array[U], BitSet) ] = - index.rdd.zipPartitions(valuesRDD){ - (keysIter: Iterator[VertexIdToIndexMap], + index.rdd.zipPartitions(valuesRDD){ + (keysIter: Iterator[VertexIdToIndexMap], valuesIter: Iterator[(Array[V], BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext == false) @@ -254,7 +254,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( * vertices that are in both this and the other vertex set. * * @tparam W the attribute type of the other VertexSet - * + * * @param other the other VertexSet with which to join. * @return a VertexSetRDD containing only the vertices in both this * and the other VertexSet and with tuple attributes. @@ -324,7 +324,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( * any vertex in this VertexSet then a `None` attribute is generated * * @tparam W the attribute type of the other VertexSet - * + * * @param other the other VertexSet with which to join. * @return a VertexSetRDD containing all the vertices in this * VertexSet with `None` attributes used for Vertices missing in the @@ -365,7 +365,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( * VertexSet then a `None` attribute is generated * * @tparam W the attribute type of the other VertexSet - * + * * @param other the other VertexSet with which to join. * @param merge the function used combine duplicate vertex * attributes @@ -398,28 +398,28 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * For each key k in `this` or `other`, return a resulting RDD that contains a + * For each key k in `this` or `other`, return a resulting RDD that contains a * tuple with the list of values for that key in `this` as well as `other`. */ /* - def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): + def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): VertexSetRDD[(Seq[V], Seq[W])] = { //RDD[(K, (Seq[V], Seq[W]))] = { other match { case other: VertexSetRDD[_] if index == other.index => { - // if both RDDs share exactly the same index and therefore the same - // super set of keys then we simply merge the value RDDs. - // However it is possible that both RDDs are missing a value for a given key in + // if both RDDs share exactly the same index and therefore the same + // super set of keys then we simply merge the value RDDs. + // However it is possible that both RDDs are missing a value for a given key in // which case the returned RDD should have a null value - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => + (thisIter, otherIter) => val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS) = otherIter.next() assert(!otherIter.hasNext) - /** - * @todo consider implementing this with a view as in leftJoin to + /** + * @todo consider implementing this with a view as in leftJoin to * reduce array allocations */ val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) @@ -428,20 +428,20 @@ class VertexSetRDD[@specialized V: ClassManifest]( var ind = newBS.nextSetBit(0) while(ind >= 0) { val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] - val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] + val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] newValues(ind) = (a, b) ind = newBS.nextSetBit(ind+1) } Iterator((newValues.toIndexedSeq, newBS)) } - new VertexSetRDD(index, newValues) + new VertexSetRDD(index, newValues) } - case other: VertexSetRDD[_] + case other: VertexSetRDD[_] if index.rdd.partitioner == other.index.rdd.partitioner => { // If both RDDs are indexed using different indices but with the same partitioners // then we we need to first merge the indicies and then use the merged index to // merge the values. - val newIndex = + val newIndex = index.rdd.zipPartitions(other.index.rdd)( (thisIter, otherIter) => { val thisIndex = thisIter.next() @@ -463,7 +463,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( List(newIndex).iterator }).cache() // Use the new index along with the this and the other indices to merge the values - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = newIndex.zipPartitions(tuples, other.tuples)( (newIndexIter, thisTuplesIter, otherTuplesIter) => { // Get the new index for this partition @@ -507,7 +507,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( case None => throw new SparkException("An index must have a partitioner.") } // Shuffle the other RDD using the partitioner for this index - val otherShuffled = + val otherShuffled = if (other.partitioner == Some(partitioner)) { other } else { @@ -527,7 +527,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( // populate the newValues with the values in this VertexSetRDD for ((k,i) <- thisIndex) { if (thisBS.get(i)) { - newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) + newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) newBS.set(i) } } @@ -538,28 +538,28 @@ class VertexSetRDD[@specialized V: ClassManifest]( if(newBS.get(ind)) { newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) } else { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it newBS.set(ind) newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } + } } else { // update the index val ind = newIndex.size newIndex.put(k, ind) newBS.set(ind) // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) } } Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) }).cache() - // Extract the index and values from the above RDD + // Extract the index and values from the above RDD val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - + new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues) } } @@ -583,7 +583,7 @@ object VertexSetRDD { * * @param rdd the collection of vertex-attribute pairs */ - def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = + def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = apply(rdd, (a:V, b:V) => a ) /** @@ -591,7 +591,7 @@ object VertexSetRDD { * where duplicate entries are merged using the reduceFunc * * @tparam V the vertex attribute type - * + * * @param rdd the collection of vertex-attribute pairs * @param reduceFunc the function used to merge attributes of * duplicate vertices. @@ -602,12 +602,12 @@ object VertexSetRDD { // Preaggregate and shuffle if necessary val preAgg = rdd.partitioner match { case Some(p) => rdd - case None => + case None => val partitioner = new HashPartitioner(rdd.partitions.size) // Preaggregation. val aggregator = new Aggregator[Vid, V, V](v => v, cReduceFunc, cReduceFunc) rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner) - } + } val groups = preAgg.mapPartitions( iter => { val hashMap = new PrimitiveKeyOpenHashMap[Vid, V] @@ -629,8 +629,8 @@ object VertexSetRDD { /** * Construct a vertex set from an RDD using an existing index. - * - * @note duplicate vertices are discarded arbitrarily + * + * @note duplicate vertices are discarded arbitrarily * * @tparam V the vertex attribute type * @param rdd the rdd containing vertices @@ -638,13 +638,13 @@ object VertexSetRDD { * in RDD */ def apply[V: ClassManifest]( - rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = + rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = apply(rdd, index, (a:V,b:V) => a) /** * Construct a vertex set from an RDD using an existing index and a - * user defined `combiner` to merge duplicate vertices. + * user defined `combiner` to merge duplicate vertices. * * @tparam V the vertex attribute type * @param rdd the rdd containing vertices @@ -655,13 +655,13 @@ object VertexSetRDD { */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], index: VertexSetIndex, - reduceFunc: (V, V) => V): VertexSetRDD[V] = + reduceFunc: (V, V) => V): VertexSetRDD[V] = apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) - + /** * Construct a vertex set from an RDD using an existing index and a - * user defined `combiner` to merge duplicate vertices. + * user defined `combiner` to merge duplicate vertices. * * @tparam V the vertex attribute type * @param rdd the rdd containing vertices @@ -675,7 +675,7 @@ object VertexSetRDD { * */ def apply[V: ClassManifest, C: ClassManifest]( - rdd: RDD[(Vid,V)], + rdd: RDD[(Vid,V)], index: VertexSetIndex, createCombiner: V => C, mergeValue: (C, V) => C, @@ -689,7 +689,7 @@ object VertexSetRDD { case None => throw new SparkException("An index must have a partitioner.") } // Preaggregate and shuffle if necessary - val partitioned = + val partitioned = if (rdd.partitioner != Some(partitioner)) { // Preaggregation. val aggregator = new Aggregator[Vid, V, C](cCreateCombiner, cMergeValue, @@ -732,23 +732,23 @@ object VertexSetRDD { /** * Construct an index of the unique vertices. The resulting index - * can be used to build VertexSets over subsets of the vertices in + * can be used to build VertexSets over subsets of the vertices in * the input. */ - def makeIndex(keys: RDD[Vid], + def makeIndex(keys: RDD[Vid], partitioner: Option[Partitioner] = None): VertexSetIndex = { // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD - // Ugly hack :-(. In order to partition the keys they must have values. + // Ugly hack :-(. In order to partition the keys they must have values. val tbl = keys.mapPartitions(_.map(k => (k, false)), true) // Shuffle the table (if necessary) val shuffledTbl = partitioner match { case None => { if (tbl.partitioner.isEmpty) { - // @todo: I don't need the boolean its only there to be the second type of the shuffle. + // @todo: I don't need the boolean its only there to be the second type of the shuffle. new ShuffledRDD[Vid, Boolean, (Vid, Boolean)](tbl, Partitioner.defaultPartitioner(tbl)) } else { tbl } } - case Some(partitioner) => + case Some(partitioner) => tbl.partitionBy(partitioner) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index b178e3bb42bb69c4e5a147b9fce5826148217b77..9ce06eb9e8b2cb481857f53a838bbd6eb9ddaada 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -14,6 +14,7 @@ import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap} @@ -102,13 +103,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( makeTriplets(localVidMap, vTableReplicatedValues, eTable) - override def cache(): Graph[VD, ED] = { - eTable.cache() - vid2pid.cache() - vTable.cache() + override def persist(newLevel: StorageLevel): Graph[VD, ED] = { + eTable.persist(newLevel) + vid2pid.persist(newLevel) + vTable.persist(newLevel) + localVidMap.persist(newLevel) + // vTableReplicatedValues.persist(newLevel) this } + override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) override def statistics: Map[String, Any] = { val numVertices = this.numVertices @@ -398,7 +402,7 @@ object GraphImpl { val vSet = new VertexSet edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)}) vSet.iterator.map { vid => (vid.toLong, pid) } - } + }.partitionBy(vTableIndex.rdd.partitioner.get) VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, (p: Pid) => ArrayBuffer(p), (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index ee28d1429e017c8d9a5a1847665322dca9cfc01c..7b53e9cce82a3a7eb32ff7daa82015965d0e1cd2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -8,10 +8,9 @@ package object graph { type Vid = Long type Pid = Int - type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T] - type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet + type VertexSet = OpenHashSet[Vid] type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList - + // type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap type VertexIdToIndexMap = OpenHashSet[Vid]