diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index cb75da6c211cbd0331019bbd9bea81dc496ca959..c7f20283da3d6e64eac518a2176cab1dd1641133 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -41,9 +41,9 @@ import org.apache.spark.storage.StorageLevel /** - * The `VertexSetIndex` maintains the per-partition mapping from vertex id - * to the corresponding location in the per-partition values array. - * This class is meant to be an opaque type. + * The `VertexSetIndex` maintains the per-partition mapping from + * vertex id to the corresponding location in the per-partition values + * array. This class is meant to be an opaque type. * */ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { @@ -56,8 +56,8 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { } /** - * Returns the partitioner object of the underlying RDD. This is used - * by the VertexSetRDD to partition the values RDD. + * Returns the partitioner object of the underlying RDD. This is + * used by the VertexSetRDD to partition the values RDD. */ def partitioner: Partitioner = rdd.partitioner.get } // end of VertexSetIndex @@ -65,19 +65,21 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { /** - * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there is only - * one entry for each vertex and by pre-indexing the entries for fast, efficient - * joins. + * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there + * is only one entry for each vertex and by pre-indexing the entries + * for fast, efficient joins. * - * In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD - * exposes an index member which can be used to "key" other VertexSetRDDs + * In addition to providing the basic RDD[(Vid,V)] functionality the + * VertexSetRDD exposes an index member which can be used to "key" + * other VertexSetRDDs * - * @tparam V the vertex attribute associated with each vertex in the set. + * @tparam V the vertex attribute associated with each vertex in the + * set. * - * @param index the index which contains the vertex id information and is used - * to organize the values in the RDD. - * @param valuesRDD the values RDD contains the actual vertex attributes organized - * as an array within each partition. + * @param index the index which contains the vertex id information and + * is used to organize the values in the RDD. + * @param valuesRDD the values RDD contains the actual vertex + * attributes organized as an array within each partition. * * To construct a `VertexSetRDD` use the singleton object: * @@ -175,10 +177,18 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo finish documenting + * Restrict the vertex set to the set of vertices satisfying the + * given predicate. + * + * @param pred the user defined predicate + * + * @note The vertex set preserves the original index structure + * which means that the returned RDD can be easily joined with + * the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. */ - override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { - val cleanF = index.rdd.context.clean(f) + override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { + val cleanF = index.rdd.context.clean(pred) val newValues = index.rdd.zipPartitions(valuesRDD){ (keysIter, valuesIter) => val index = keysIter.next() @@ -198,15 +208,15 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * Pass each vertex attribute through a map function and retain - * the original RDD's partitioning and index. + * Pass each vertex attribute through a map function and retain the + * original RDD's partitioning and index. * * @tparam U the type returned by the map function * * @param f the function applied to each value in the RDD - * @return a new VertexSet with values obtaind by applying `f` to each of the - * entries in the original VertexSet. The resulting VertexSetRDD retains the - * same index. + * @return a new VertexSet with values obtaind by applying `f` to + * each of the entries in the original VertexSet. The resulting + * VertexSetRDD retains the same index. */ def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { val cleanF = index.rdd.context.clean(f) @@ -214,9 +224,10 @@ class VertexSetRDD[@specialized V: ClassManifest]( valuesRDD.mapPartitions(iter => iter.map{ case (values, bs) => /** - * @todo Consider using a view rather than creating a new array. - * This is already being done for join operations. It could reduce - * memory overhead but require additional recomputation. + * @todo Consider using a view rather than creating a new + * array. This is already being done for join operations. + * It could reduce memory overhead but require additional + * recomputation. */ val newValues = new Array[U](values.size) for ( ind <- bs ) { @@ -229,16 +240,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * Pass each vertex attribute along with the vertex id through a - * map function and retain the original RDD's partitioning and index. + * Pass each vertex attribute along with the vertex id through a map + * function and retain the original RDD's partitioning and index. * * @tparam U the type returned by the map function * - * @param f the function applied to each vertex id and vertex + * @param f the function applied to each vertex id and vertex * attribute in the RDD - * @return a new VertexSet with values obtaind by applying `f` to each of the - * entries in the original VertexSet. The resulting VertexSetRDD retains the - * same index. + * @return a new VertexSet with values obtaind by applying `f` to + * each of the entries in the original VertexSet. The resulting + * VertexSetRDD retains the same index. */ def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { val cleanF = index.rdd.context.clean(f) @@ -267,16 +278,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * Inner join this VertexSet with another VertexSet which has the same Index. - * This function will fail if both VertexSets do not share the same index. - * The resulting vertex set will only contain vertices that are in both this - * and the other vertex set. + * Inner join this VertexSet with another VertexSet which has the + * same Index. This function will fail if both VertexSets do not + * share the same index. The resulting vertex set will only contain + * vertices that are in both this and the other vertex set. * * @tparam W the attribute type of the other VertexSet * - * @param other the other VertexSet with which to join. - * @return a VertexSetRDD containing only the vertices in both this and the - * other VertexSet and with tuple attributes. + * @param other the other VertexSet with which to join. + * @return a VertexSetRDD containing only the vertices in both this + * and the other VertexSet and with tuple attributes. * */ def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = { @@ -299,17 +310,18 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * Left join this VertexSet with another VertexSet which has the same Index. - * This function will fail if both VertexSets do not share the same index. - * The resulting vertex set contains an entry for each vertex in this set. - * If the other VertexSet is missing any vertex in this VertexSet then a - * `None` attribute is generated + * Left join this VertexSet with another VertexSet which has the + * same Index. This function will fail if both VertexSets do not + * share the same index. The resulting vertex set contains an entry + * for each vertex in this set. If the other VertexSet is missing + * any vertex in this VertexSet then a `None` attribute is generated * * @tparam W the attribute type of the other VertexSet * - * @param other the other VertexSet with which to join. - * @return a VertexSetRDD containing all the vertices in this VertexSet - * with `None` attributes used for Vertices missing in the other VertexSet. + * @param other the other VertexSet with which to join. + * @return a VertexSetRDD containing all the vertices in this + * VertexSet with `None` attributes used for Vertices missing in the + * other VertexSet. * */ def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = { @@ -332,19 +344,21 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * Left join this VertexSet with an RDD containing vertex attribute pairs. - * If the other RDD is backed by a VertexSet with the same index than the - * efficient leftZipJoin implementation is used. - * The resulting vertex set contains an entry for each vertex in this set. - * If the other VertexSet is missing any vertex in this VertexSet then a - * `None` attribute is generated + * Left join this VertexSet with an RDD containing vertex attribute + * pairs. If the other RDD is backed by a VertexSet with the same + * index than the efficient leftZipJoin implementation is used. The + * resulting vertex set contains an entry for each vertex in this + * set. If the other VertexSet is missing any vertex in this + * VertexSet then a `None` attribute is generated * * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. - * @param merge the function used combine duplicate vertex attributes - * @return a VertexSetRDD containing all the vertices in this VertexSet - * with `None` attributes used for Vertices missing in the other VertexSet. + * @param merge the function used combine duplicate vertex + * attributes + * @return a VertexSetRDD containing all the vertices in this + * VertexSet with `None` attributes used for Vertices missing in the + * other VertexSet. * */ def leftJoin[W: ClassManifest]( @@ -581,8 +595,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( object VertexSetRDD { /** - * Construct a vertex set from an RDD of vertex-attribute pairs. - * Duplicate entries are removed arbitrarily. + * Construct a vertex set from an RDD of vertex-attribute pairs. + * Duplicate entries are removed arbitrarily. * * @tparam V the vertex attribute type * @@ -592,14 +606,14 @@ object VertexSetRDD { apply(rdd, (a:V, b:V) => a ) /** - * Construct a vertex set from an RDD of vertex-attribute pairs where - * duplicate entries are merged using the reduceFunc + * Construct a vertex set from an RDD of vertex-attribute pairs + * where duplicate entries are merged using the reduceFunc * * @tparam V the vertex attribute type * * @param rdd the collection of vertex-attribute pairs - * @param reduceFunc the function used to merge attributes of duplicate - * vertices. + * @param reduceFunc the function used to merge attributes of + * duplicate vertices. */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = { @@ -635,7 +649,14 @@ object VertexSetRDD { /** - * @todo finish documenting + * Construct a vertex set from an RDD using an existing index. + * + * @note duplicate vertices are discarded arbitrarily + * + * @tparam the vertex attribute type + * @param rdd the rdd containing vertices + * @param index the index which must be a superset of the vertices + * in RDD */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = @@ -643,7 +664,15 @@ object VertexSetRDD { /** - * @todo finish documenting + * Construct a vertex set from an RDD using an existing index and a + * user defined `combiner` to merge duplicate vertices. + * + * @tparam the vertex attribute type + * @param rdd the rdd containing vertices + * @param index the index which must be a superset of the vertices + * in RDD + * @param reduceFunc the user defined reduce function used to merge + * duplicate vertex attributes. */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], index: VertexSetIndex, @@ -652,7 +681,19 @@ object VertexSetRDD { /** - * @todo finish documenting + * Construct a vertex set from an RDD using an existing index and a + * user defined `combiner` to merge duplicate vertices. + * + * @tparam the vertex attribute type + * @param rdd the rdd containing vertices + * @param index the index which must be a superset of the vertices + * in RDD + * @param createCombiner a user defined function to create a combiner + * from a vertex attribute + * @param mergeValue a user defined function to merge a vertex + * attribute into an existing combiner + * @param mergeCombiners a user defined function to merge combiners + * */ def apply[V: ClassManifest, C: ClassManifest]( rdd: RDD[(Vid,V)], @@ -703,9 +744,8 @@ object VertexSetRDD { /** - * Construct and index of the unique values in a given RDD. - * - * @todo finish documenting + * Construct and index of the unique vertex ids. This can be used + * as an index when building a vertex set. */ def makeIndex(keys: RDD[Vid], partitioner: Option[Partitioner] = None): VertexSetIndex = {