Skip to content
Snippets Groups Projects
Commit 57d7487d authored by Ankur Dave's avatar Ankur Dave
Browse files

Improve docs for VertexRDD

parent 11dd35c2
No related branches found
No related tags found
No related merge requests found
......@@ -232,7 +232,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* {{{
* graph.filter(
* graph => {
* val degrees: VertexSetRDD[Int] = graph.outDegrees
* val degrees: VertexRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
* vpred = (vid: VertexID, deg:Int) => deg > 0
......
......@@ -27,11 +27,10 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.MsgRDDFunctions
import org.apache.spark.graphx.impl.VertexPartition
/**
* A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is
* only one entry for each vertex and by pre-indexing the entries for fast,
* efficient joins.
* A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is only one entry for
* each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the
* same index can be joined efficiently.
*
* @tparam VD the vertex attribute associated with each vertex in the set.
*
......@@ -46,7 +45,7 @@ import org.apache.spark.graphx.impl.VertexPartition
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
* val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
* val vset3 = VertexRDD(otherData, vset.index)
* val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
* }}}
......@@ -61,32 +60,18 @@ class VertexRDD[@specialized VD: ClassTag](
partitionsRDD.setName("VertexRDD")
/**
* Construct a new VertexRDD that is indexed by only the keys in the RDD.
* The resulting VertexRDD will be based on a different index and can
* no longer be quickly joined with this RDD.
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
* VertexRDD will be based on a different index and can no longer be quickly joined with this RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
/**
* The partitioner is defined by the index.
*/
override val partitioner = partitionsRDD.partitioner
/**
* The actual partitions are defined by the tuples.
*/
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
/**
* The preferred locations are computed based on the preferred
* locations of the tuples.
*/
override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s)
/**
* Caching a VertexRDD causes the index and values to be cached separately.
*/
override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
partitionsRDD.persist(newLevel)
this
......@@ -103,20 +88,20 @@ class VertexRDD[@specialized VD: ClassTag](
this
}
/** Return the number of vertices in this set. */
/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
}
/**
* Provide the `RDD[(VertexID, VD)]` equivalent output.
* Provides the `RDD[(VertexID, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
/**
* Return a new VertexRDD by applying a function to each VertexPartition of this RDD.
* Applies a function to each [[impl.VertexPartition]] of this RDD and returns a new VertexRDD.
*/
def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
: VertexRDD[VD2] = {
......@@ -126,51 +111,43 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Restrict the vertex set to the set of vertices satisfying the
* given predicate.
*
* @param pred the user defined predicate, which takes a tuple to conform to
* the RDD[(VertexID, VD)] interface
* Restricts the vertex set to the set of vertices satisfying the given predicate. This operation
* preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask
* rather than allocating new memory.
*
* @note The vertex set preserves the original index structure
* which means that the returned RDD can be easily joined with
* the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
* @param pred the user defined predicate, which takes a tuple to conform to the
* `RDD[(VertexID, VD)]` interface
*/
override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/**
* Pass each vertex attribute through a map function and retain the
* original RDD's partitioning and index.
* Maps each vertex attribute, preserving the index.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each value in the RDD
* @return a new VertexRDD with values obtained by applying `f` to
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD
*/
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
/**
* Pass each vertex attribute through a map function and retain the
* original RDD's partitioning and index.
* Maps each vertex attribute, additionally supplying the vertex ID.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each value in the RDD
* @return a new VertexRDD with values obtained by applying `f` to
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
* @param f the function applied to each ID-value pair in the RDD
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD. The resulting VertexRDD retains the same index.
*/
def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
* Hides vertices that are the same between this and other. For vertices that are different, keeps
* the values from `other`.
* Hides vertices that are the same between `this` and `other`. For vertices that are different,
* keeps the values from `other`.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
......@@ -184,22 +161,17 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set contains an entry
* for each vertex in this set. If the other VertexSet is missing
* any vertex in this VertexSet then a `None` attribute is generated
*
* @tparam VD2 the attribute type of the other VertexSet
* @tparam VD3 the attribute type of the resulting VertexSet
* Left joins this RDD with another VertexRDD with the same index. This function will fail if both
* VertexRDDs do not share the same index. The resulting vertex set contains an entry for each
* vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
*
* @param other the other VertexRDD with which to join.
* @param f the function mapping a vertex id and its attributes in this and the other vertex set
* to a new vertex attribute.
* @return a VertexRDD containing the results of `f`
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
......@@ -214,29 +186,25 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
* Left join this VertexRDD with an RDD containing vertex attribute
* pairs. If the other RDD is backed by a VertexRDD with the same
* index than the efficient leftZipJoin implementation is used. The
* resulting vertex set contains an entry for each vertex in this
* set. If the other VertexRDD is missing any vertex in this
* VertexRDD then a `None` attribute is generated.
*
* If there are duplicates, the vertex is picked at random.
* Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
* used. The resulting vertex set contains an entry for each vertex in this set. If `other` is
* missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex
* is picked arbitrarily.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
*
* @param other the other VertexRDD with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexRDD containing all the vertices in this
* VertexRDD with the attribute emitted by f.
* @param other the other VertexRDD with which to join
* @param f the function mapping a vertex id and its attributes in this and the other vertex set
* to a new vertex attribute.
* @return a VertexRDD containing all the vertices in this VertexRDD with the attributes emitted
* by `f`.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: RDD[(VertexID, VD2)])
(f: (VertexID, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] =
{
: VertexRDD[VD3] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
other match {
......@@ -255,8 +223,8 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
* Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
* must have the same index.
* Same effect as `leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }`, but `this` and
* `other` must have the same index.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
......@@ -271,8 +239,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
* Replace vertices with corresponding vertices in `other`, and drop vertices without a
* corresponding vertex in `other`.
* Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is
* used.
*/
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
......@@ -294,12 +263,11 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
* Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is
* co-indexed with this one.
* Aggregates vertices in `message` that have the same ids using `reduceFunc`, returning a
* VertexRDD co-indexed with `this`.
*/
def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
{
messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertexPartition: VertexPartition[VD] = thisIter.next()
......@@ -312,12 +280,12 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* The VertexRDD singleton is used to construct VertexRDDs
* The VertexRDD singleton is used to construct VertexRDDs.
*/
object VertexRDD {
/**
* Construct a vertex set from an RDD of vertex-attribute pairs.
* Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
* Duplicate entries are removed arbitrarily.
*
* @tparam VD the vertex attribute type
......@@ -336,16 +304,15 @@ object VertexRDD {
}
/**
* Construct a vertex set from an RDD of vertex-attribute pairs.
* Duplicate entries are merged using mergeFunc.
* Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
* `mergeFunc`.
*
* @tparam VD the vertex attribute type
*
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
{
def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
......@@ -356,9 +323,12 @@ object VertexRDD {
new VertexRDD(vertexPartitions)
}
/**
* Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
* `defaultVal` otherwise.
*/
def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
: VertexRDD[VD] =
{
: VertexRDD[VD] = {
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
value.getOrElse(default)
}
......
......@@ -6,10 +6,7 @@ import scala.util.Sorting
import org.apache.spark.graphx._
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
//private[graph]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
......
......@@ -5,7 +5,6 @@ import scala.reflect.ClassTag
import org.apache.spark.graphx._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
/**
* The Iterator type returned when constructing edge triplets. This class technically could be
* an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment