From 57d7487d3da19df04de52235812fe7c8c24cc259 Mon Sep 17 00:00:00 2001
From: Ankur Dave <ankurdave@gmail.com>
Date: Fri, 10 Jan 2014 15:42:44 -0800
Subject: [PATCH] Improve docs for VertexRDD

---
 .../org/apache/spark/graphx/GraphOps.scala    |   2 +-
 .../org/apache/spark/graphx/VertexRDD.scala   | 152 +++++++-----------
 .../graphx/impl/EdgePartitionBuilder.scala    |   3 -
 .../graphx/impl/EdgeTripletIterator.scala     |   1 -
 4 files changed, 62 insertions(+), 96 deletions(-)

diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 69f27601ce..0121cb1449 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -232,7 +232,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
    * {{{
    * graph.filter(
    *   graph => {
-   *     val degrees: VertexSetRDD[Int] = graph.outDegrees
+   *     val degrees: VertexRDD[Int] = graph.outDegrees
    *     graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
    *   },
    *   vpred = (vid: VertexID, deg:Int) => deg > 0
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 971e2615d4..3ef9d6e9cf 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -27,11 +27,10 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx.impl.MsgRDDFunctions
 import org.apache.spark.graphx.impl.VertexPartition
 
-
 /**
- * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is
- * only one entry for each vertex and by pre-indexing the entries for fast,
- * efficient joins.
+ * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is only one entry for
+ * each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the
+ * same index can be joined efficiently.
  *
  * @tparam VD the vertex attribute associated with each vertex in the set.
  *
@@ -46,7 +45,7 @@ import org.apache.spark.graphx.impl.VertexPartition
  * val vset2 = VertexRDD(someData, reduceFunc)
  * // Finally we can use the VertexRDD to index another dataset
  * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
- * val vset3 = VertexRDD(otherData, vset.index)
+ * val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
  * // Now we can construct very fast joins between the two sets
  * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
  * }}}
@@ -61,32 +60,18 @@ class VertexRDD[@specialized VD: ClassTag](
   partitionsRDD.setName("VertexRDD")
 
   /**
-   * Construct a new VertexRDD that is indexed by only the keys in the RDD.
-   * The resulting VertexRDD will be based on a different index and can
-   * no longer be quickly joined with this RDD.
+   * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
+   * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD.
    */
   def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
 
-  /**
-   * The partitioner is defined by the index.
-   */
   override val partitioner = partitionsRDD.partitioner
 
-  /**
-   * The actual partitions are defined by the tuples.
-   */
   override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
 
-  /**
-   * The preferred locations are computed based on the preferred
-   * locations of the tuples.
-   */
   override protected def getPreferredLocations(s: Partition): Seq[String] =
     partitionsRDD.preferredLocations(s)
 
-  /**
-   * Caching a VertexRDD causes the index and values to be cached separately.
-   */
   override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
     partitionsRDD.persist(newLevel)
     this
@@ -103,20 +88,20 @@ class VertexRDD[@specialized VD: ClassTag](
     this
   }
 
-  /** Return the number of vertices in this set. */
+  /** The number of vertices in the RDD. */
   override def count(): Long = {
     partitionsRDD.map(_.size).reduce(_ + _)
   }
 
   /**
-   * Provide the `RDD[(VertexID, VD)]` equivalent output.
+   * Provides the `RDD[(VertexID, VD)]` equivalent output.
    */
   override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
     firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
   }
 
   /**
-   * Return a new VertexRDD by applying a function to each VertexPartition of this RDD.
+   * Applies a function to each [[impl.VertexPartition]] of this RDD and returns a new VertexRDD.
    */
   def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
     : VertexRDD[VD2] = {
@@ -126,51 +111,43 @@ class VertexRDD[@specialized VD: ClassTag](
 
 
   /**
-   * Restrict the vertex set to the set of vertices satisfying the
-   * given predicate.
-   *
-   * @param pred the user defined predicate, which takes a tuple to conform to
-   * the RDD[(VertexID, VD)] interface
+   * Restricts the vertex set to the set of vertices satisfying the given predicate. This operation
+   * preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask
+   * rather than allocating new memory.
    *
-   * @note The vertex set preserves the original index structure
-   * which means that the returned RDD can be easily joined with
-   * the original vertex-set.  Furthermore, the filter only
-   * modifies the bitmap index and so no new values are allocated.
+   * @param pred the user defined predicate, which takes a tuple to conform to the
+   * `RDD[(VertexID, VD)]` interface
    */
   override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
     this.mapVertexPartitions(_.filter(Function.untupled(pred)))
 
   /**
-   * Pass each vertex attribute through a map function and retain the
-   * original RDD's partitioning and index.
+   * Maps each vertex attribute, preserving the index.
    *
    * @tparam VD2 the type returned by the map function
    *
    * @param f the function applied to each value in the RDD
-   * @return a new VertexRDD with values obtained by applying `f` to
-   * each of the entries in the original VertexRDD.  The resulting
-   * VertexRDD retains the same index.
+   * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
+   * original VertexRDD
    */
   def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
     this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
 
   /**
-   * Pass each vertex attribute through a map function and retain the
-   * original RDD's partitioning and index.
+   * Maps each vertex attribute, additionally supplying the vertex ID.
    *
    * @tparam VD2 the type returned by the map function
    *
-   * @param f the function applied to each value in the RDD
-   * @return a new VertexRDD with values obtained by applying `f` to
-   * each of the entries in the original VertexRDD.  The resulting
-   * VertexRDD retains the same index.
+   * @param f the function applied to each ID-value pair in the RDD
+   * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
+   * original VertexRDD.  The resulting VertexRDD retains the same index.
    */
   def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
     this.mapVertexPartitions(_.map(f))
 
   /**
-   * Hides vertices that are the same between this and other. For vertices that are different, keeps
-   * the values from `other`.
+   * Hides vertices that are the same between `this` and `other`. For vertices that are different,
+   * keeps the values from `other`.
    */
   def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
     val newPartitionsRDD = partitionsRDD.zipPartitions(
@@ -184,22 +161,17 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
-   * Left join this VertexSet with another VertexSet which has the
-   * same Index.  This function will fail if both VertexSets do not
-   * share the same index.  The resulting vertex set contains an entry
-   * for each vertex in this set.  If the other VertexSet is missing
-   * any vertex in this VertexSet then a `None` attribute is generated
-   *
-   * @tparam VD2 the attribute type of the other VertexSet
-   * @tparam VD3 the attribute type of the resulting VertexSet
+   * Left joins this RDD with another VertexRDD with the same index. This function will fail if both
+   * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each
+   * vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
    *
-   * @param other the other VertexSet with which to join.
-   * @param f the function mapping a vertex id and its attributes in
-   * this and the other vertex set to a new vertex attribute.
-   * @return a VertexRDD containing all the vertices in this
-   * VertexSet with `None` attributes used for Vertices missing in the
-   * other VertexSet.
+   * @tparam VD2 the attribute type of the other VertexRDD
+   * @tparam VD3 the attribute type of the resulting VertexRDD
    *
+   * @param other the other VertexRDD with which to join.
+   * @param f the function mapping a vertex id and its attributes in this and the other vertex set
+   * to a new vertex attribute.
+   * @return a VertexRDD containing the results of `f`
    */
   def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
       (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
@@ -214,29 +186,25 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
-   * Left join this VertexRDD with an RDD containing vertex attribute
-   * pairs.  If the other RDD is backed by a VertexRDD with the same
-   * index than the efficient leftZipJoin implementation is used.  The
-   * resulting vertex set contains an entry for each vertex in this
-   * set.  If the other VertexRDD is missing any vertex in this
-   * VertexRDD then a `None` attribute is generated.
-   *
-   * If there are duplicates, the vertex is picked at random.
+   * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
+   * backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
+   * used. The resulting vertex set contains an entry for each vertex in this set. If `other` is
+   * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex
+   * is picked arbitrarily.
    *
    * @tparam VD2 the attribute type of the other VertexRDD
    * @tparam VD3 the attribute type of the resulting VertexRDD
    *
-   * @param other the other VertexRDD with which to join.
-   * @param f the function mapping a vertex id and its attributes in
-   * this and the other vertex set to a new vertex attribute.
-   * @return a VertexRDD containing all the vertices in this
-   * VertexRDD with the attribute emitted by f.
+   * @param other the other VertexRDD with which to join
+   * @param f the function mapping a vertex id and its attributes in this and the other vertex set
+   * to a new vertex attribute.
+   * @return a VertexRDD containing all the vertices in this VertexRDD with the attributes emitted
+   * by `f`.
    */
   def leftJoin[VD2: ClassTag, VD3: ClassTag]
       (other: RDD[(VertexID, VD2)])
       (f: (VertexID, VD, Option[VD2]) => VD3)
-    : VertexRDD[VD3] =
-  {
+    : VertexRDD[VD3] = {
     // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
     // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
     other match {
@@ -255,8 +223,8 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
-   * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
-   * must have the same index.
+   * Same effect as `leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }`, but `this` and
+   * `other` must have the same index.
    */
   def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
       (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
@@ -271,8 +239,9 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
-   * Replace vertices with corresponding vertices in `other`, and drop vertices without a
-   * corresponding vertex in `other`.
+   * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
+   * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is
+   * used.
    */
   def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
       (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
@@ -294,12 +263,11 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
-   * Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is
-   * co-indexed with this one.
+   * Aggregates vertices in `message` that have the same ids using `reduceFunc`, returning a
+   * VertexRDD co-indexed with `this`.
    */
   def aggregateUsingIndex[VD2: ClassTag](
-      messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
-  {
+      messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
     val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
     val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
       val vertexPartition: VertexPartition[VD] = thisIter.next()
@@ -312,12 +280,12 @@ class VertexRDD[@specialized VD: ClassTag](
 
 
 /**
- * The VertexRDD singleton is used to construct VertexRDDs
+ * The VertexRDD singleton is used to construct VertexRDDs.
  */
 object VertexRDD {
 
   /**
-   * Construct a vertex set from an RDD of vertex-attribute pairs.
+   * Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
    * Duplicate entries are removed arbitrarily.
    *
    * @tparam VD the vertex attribute type
@@ -336,16 +304,15 @@ object VertexRDD {
   }
 
   /**
-   * Construct a vertex set from an RDD of vertex-attribute pairs.
-   * Duplicate entries are merged using mergeFunc.
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
+   * `mergeFunc`.
    *
    * @tparam VD the vertex attribute type
    *
    * @param rdd the collection of vertex-attribute pairs
    * @param mergeFunc the associative, commutative merge function.
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
-  {
+  def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
     val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
       case Some(p) => rdd
       case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
@@ -356,9 +323,12 @@ object VertexRDD {
     new VertexRDD(vertexPartitions)
   }
 
+  /**
+   * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
+   * `defaultVal` otherwise.
+   */
   def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
-    : VertexRDD[VD] =
-  {
+    : VertexRDD[VD] = {
     VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
       value.getOrElse(default)
     }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index d4f08497a2..ca64e9af66 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -6,10 +6,7 @@ import scala.util.Sorting
 import org.apache.spark.graphx._
 import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 
-
-//private[graph]
 class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
-
   var edges = new PrimitiveVector[Edge[ED]](size)
 
   /** Add a new edge to the partition. */
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 79fd962ffd..c5258360da 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -5,7 +5,6 @@ import scala.reflect.ClassTag
 import org.apache.spark.graphx._
 import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
 
-
 /**
  * The Iterator type returned when constructing edge triplets. This class technically could be
  * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
-- 
GitLab