diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 62608e506d85b820275d9a5a627ddb7c05cf7173..401d5f29bc134e04b2a8c036f7eeb8dad9e2b314 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -25,6 +25,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa import org.apache.spark.graph.impl.AggregationMsg import org.apache.spark.graph.impl.MsgRDDFunctions._ + /** * The `VertexSetIndex` maintains the per-partition mapping from * vertex id to the corresponding location in the per-partition values @@ -88,7 +89,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( extends RDD[(Vid, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { - /** * Construct a new VertexSetRDD that is indexed by only the keys in the RDD. * The resulting VertexSet will be based on a different index and can @@ -96,7 +96,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( */ def reindex(): VertexSetRDD[V] = VertexSetRDD(this) - /** * An internal representation which joins the block indices with the values * This is used by the compute function to emulate RDD[(Vid, V)] @@ -104,19 +103,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) - /** * The partitioner is defined by the index. */ override val partitioner = index.rdd.partitioner - /** * The actual partitions are defined by the tuples. */ override def getPartitions: Array[Partition] = tuples.getPartitions - /** * The preferred locations are computed based on the preferred * locations of the tuples. @@ -124,7 +120,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( override def getPreferredLocations(s: Partition): Seq[String] = tuples.getPreferredLocations(s) - /** * Caching an VertexSetRDD causes the index and values to be cached separately. */ @@ -134,15 +129,12 @@ class VertexSetRDD[@specialized V: ClassManifest]( return this } - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY) - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def cache(): VertexSetRDD[V] = persist() - /** * Provide the RDD[(K,V)] equivalent output. */ @@ -152,7 +144,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( } } // end of compute - /** * Restrict the vertex set to the set of vertices satisfying the * given predicate. @@ -190,7 +181,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( new VertexSetRDD[V](index, newValues) } // end of filter - /** * Pass each vertex attribute through a map function and retain the * original RDD's partitioning and index. @@ -214,7 +204,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( new VertexSetRDD[U](index, newValuesRDD) } // end of mapValues - /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. @@ -247,8 +236,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo update docs to reflect function argument - * * Inner join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not * share the same index. The resulting vertex set will only contain @@ -257,6 +244,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. * @return a VertexSetRDD containing only the vertices in both this * and the other VertexSet and with tuple attributes. * @@ -287,13 +276,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo document + * Inner join this VertexSet with another VertexSet which has the + * same Index. This function will fail if both VertexSets do not + * share the same index. * - * @param other - * @param f - * @tparam W - * @tparam Z - * @return + * @param other the vertex set to join with this vertex set + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a collection of tuples. + * @tparam W the type of the other vertex set attributes + * @tparam Z the type of the tuples emitted by `f` + * @return an RDD containing the tuples emitted by `f` */ def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]): RDD[Z] = { @@ -316,8 +308,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo update docs to reflect function argument - * Left join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not * share the same index. The resulting vertex set contains an entry @@ -327,6 +317,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. * @return a VertexSetRDD containing all the vertices in this * VertexSet with `None` attributes used for Vertices missing in the * other VertexSet. @@ -368,11 +360,12 @@ class VertexSetRDD[@specialized V: ClassManifest]( * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. * @param merge the function used combine duplicate vertex * attributes * @return a VertexSetRDD containing all the vertices in this - * VertexSet with `None` attributes used for Vertices missing in the - * other VertexSet. + * VertexSet with the attribute emitted by f. * */ def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)]) @@ -396,181 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( } } // end of leftJoin - - - /** - * For each key k in `this` or `other`, return a resulting RDD that contains a - * tuple with the list of values for that key in `this` as well as `other`. - */ - /* - def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): - VertexSetRDD[(Seq[V], Seq[W])] = { - //RDD[(K, (Seq[V], Seq[W]))] = { - other match { - case other: VertexSetRDD[_] if index == other.index => { - // if both RDDs share exactly the same index and therefore the same - // super set of keys then we simply merge the value RDDs. - // However it is possible that both RDDs are missing a value for a given key in - // which case the returned RDD should have a null value - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() - assert(!otherIter.hasNext) - /** - * @todo consider implementing this with a view as in leftJoin to - * reduce array allocations - */ - val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) - val newBS = thisBS | otherBS - - var ind = newBS.nextSetBit(0) - while(ind >= 0) { - val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] - val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] - newValues(ind) = (a, b) - ind = newBS.nextSetBit(ind+1) - } - Iterator((newValues.toIndexedSeq, newBS)) - } - new VertexSetRDD(index, newValues) - } - case other: VertexSetRDD[_] - if index.rdd.partitioner == other.index.rdd.partitioner => { - // If both RDDs are indexed using different indices but with the same partitioners - // then we we need to first merge the indicies and then use the merged index to - // merge the values. - val newIndex = - index.rdd.zipPartitions(other.index.rdd)( - (thisIter, otherIter) => { - val thisIndex = thisIter.next() - assert(!thisIter.hasNext) - val otherIndex = otherIter.next() - assert(!otherIter.hasNext) - // Merge the keys - val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity) - var ind = thisIndex.nextPos(0) - while(ind >= 0) { - newIndex.fastAdd(thisIndex.getValue(ind)) - ind = thisIndex.nextPos(ind+1) - } - var ind = otherIndex.nextPos(0) - while(ind >= 0) { - newIndex.fastAdd(otherIndex.getValue(ind)) - ind = otherIndex.nextPos(ind+1) - } - List(newIndex).iterator - }).cache() - // Use the new index along with the this and the other indices to merge the values - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - newIndex.zipPartitions(tuples, other.tuples)( - (newIndexIter, thisTuplesIter, otherTuplesIter) => { - // Get the new index for this partition - val newIndex = newIndexIter.next() - assert(!newIndexIter.hasNext) - // Get the corresponding indicies and values for this and the other VertexSetRDD - val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext) - val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() - assert(!otherTuplesIter.hasNext) - // Preallocate the new Values array - val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) - val newBS = new BitSet(newIndex.size) - - // Lookup the sequences in both submaps - for ((k,ind) <- newIndex) { - // Get the left key - val a = if (thisIndex.contains(k)) { - val ind = thisIndex.get(k) - if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] - } else Seq.empty[V] - // Get the right key - val b = if (otherIndex.contains(k)) { - val ind = otherIndex.get(k) - if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] - } else Seq.empty[W] - // If at least one key was present then we generate a tuple. - if (!a.isEmpty || !b.isEmpty) { - newValues(ind) = (a, b) - newBS.set(ind) - } - } - Iterator((newValues.toIndexedSeq, newBS)) - }) - new VertexSetRDD(new VertexSetIndex(newIndex), newValues) - } - case _ => { - // Get the partitioner from the index - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index - val otherShuffled = - if (other.partitioner == Some(partitioner)) { - other - } else { - other.partitionBy(partitioner) - } - // Join the other RDD with this RDD building a new valueset and new index on the fly - val groups = tuples.zipPartitions(otherShuffled)( - (thisTuplesIter, otherTuplesIter) => { - // Get the corresponding indicies and values for this VertexSetRDD - val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext()) - // Construct a new index - val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap] - // Construct a new array Buffer to store the values - val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) - val newBS = new BitSet(thisValues.size) - // populate the newValues with the values in this VertexSetRDD - for ((k,i) <- thisIndex) { - if (thisBS.get(i)) { - newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) - newBS.set(i) - } - } - // Now iterate through the other tuples updating the map - for ((k,w) <- otherTuplesIter){ - if (newIndex.contains(k)) { - val ind = newIndex.get(k) - if(newBS.get(ind)) { - newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) - } else { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it - newBS.set(ind) - newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } - } else { - // update the index - val ind = newIndex.size - newIndex.put(k, ind) - newBS.set(ind) - // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) - } - } - Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) - }).cache() - - // Extract the index and values from the above RDD - val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - - new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues) - } - } - } // end of cogroup - */ - } // End of VertexSetRDD - /** * The VertexSetRDD singleton is used to construct VertexSets */ @@ -627,7 +448,6 @@ object VertexSetRDD { new VertexSetRDD[V](new VertexSetIndex(index), values) } // end of apply - /** * Construct a vertex set from an RDD using an existing index. * @@ -642,7 +462,6 @@ object VertexSetRDD { rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = apply(rdd, index, (a:V,b:V) => a) - /** * Construct a vertex set from an RDD using an existing index and a * user defined `combiner` to merge duplicate vertices. @@ -659,8 +478,17 @@ object VertexSetRDD { reduceFunc: (V, V) => V): VertexSetRDD[V] = apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) - - def aggregate[V: ClassManifest]( + /** + * Construct a vertex set from an RDD of AggregationMsgs + * + * @tparam V the vertex attribute type + * @param rdd the rdd containing vertices + * @param index the index which must be a superset of the vertices + * in RDD + * @param reduceFunc the user defined reduce function used to merge + * duplicate vertex attributes. + */ + private[spark] def aggregate[V: ClassManifest]( rdd: RDD[AggregationMsg[V]], index: VertexSetIndex, reduceFunc: (V, V) => V): VertexSetRDD[V] = { @@ -696,7 +524,6 @@ object VertexSetRDD { new VertexSetRDD(index, values) } - /** * Construct a vertex set from an RDD using an existing index and a * user defined `combiner` to merge duplicate vertices. @@ -767,7 +594,6 @@ object VertexSetRDD { new VertexSetRDD(index, values) } // end of apply - /** * Construct an index of the unique vertices. The resulting index * can be used to build VertexSets over subsets of the vertices in