diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index d20745d0d4df321974e3ae5684510656beb022bb..9a95364cb16dd45f455d71eb84cbe71a3b4017f9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -28,17 +28,14 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions import org.apache.spark.graphx.impl.VertexPartition /** - * `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is only one entry for - * each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the - * same index can be joined efficiently. + * Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by + * pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be + * joined efficiently. All operations except [[reindex]] preserve the index. To construct a + * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]]. * - * @tparam VD the vertex attribute associated with each vertex in the set. - * - * To construct a `VertexRDD` use the singleton object: - * - * @example Construct a `VertexRDD` from a plain RDD + * @example Construct a `VertexRDD` from a plain RDD: * {{{ - * // Construct an intial vertex set + * // Construct an initial vertex set * val someData: RDD[(VertexID, SomeType)] = loadData(someFile) * val vset = VertexRDD(someData) * // If there were redundant values in someData we would use a reduceFunc @@ -50,6 +47,7 @@ import org.apache.spark.graphx.impl.VertexPartition * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) * }}} * + * @tparam VD the vertex attribute associated with each vertex in the set. */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[VertexPartition[VD]]) @@ -146,7 +144,7 @@ class VertexRDD[@specialized VD: ClassTag]( this.mapVertexPartitions(_.map(f)) /** - * Hides vertices that are the same between `this` and `other`. For vertices that are different, + * Hides vertices that are the same between `this` and `other`; for vertices that are different, * keeps the values from `other`. */ def diff(other: VertexRDD[VD]): VertexRDD[VD] = { @@ -188,7 +186,7 @@ class VertexRDD[@specialized VD: ClassTag]( /** * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is * backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is - * used. The resulting vertex set contains an entry for each vertex in this set. If `other` is + * used. The resulting VertexRDD contains an entry for each vertex in `this`. If `other` is * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex * is picked arbitrarily. * @@ -223,8 +221,8 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Same effect as `leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }`, but `this` and - * `other` must have the same index. + * Efficiently inner joins this VertexRDD with another VertexRDD sharing the same index. See + * [[innerJoin]] for the behavior of the join. */ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { @@ -242,6 +240,12 @@ class VertexRDD[@specialized VD: ClassTag]( * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is * used. + * + * @param other an RDD containing vertices to join. If there are multiple entries for the same + * vertex, one is picked arbitrarily. Use [[aggregateUsingIndex]] to merge multiple entries. + * @param f the join function applied to corresponding values of `this` and `other` + * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this` + * and `other`, with values supplied by `f` */ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { @@ -263,8 +267,15 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Aggregates vertices in `message` that have the same ids using `reduceFunc`, returning a + * Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a * VertexRDD co-indexed with `this`. + * + * @param messages an RDD containing messages to aggregate, where each message is a pair of its + * target vertex ID and the message data + * @param reduceFunc the associative aggregation function for merging messages to the same vertex + * @return a VertexRDD co-indexed with `this`, containing only vertices that received messages. + * For those vertices, their values are the result of applying `reduceFunc` to all received + * messages. */ def aggregateUsingIndex[VD2: ClassTag]( messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {