Skip to content
Snippets Groups Projects
Commit 57709208 authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

Cleanning up documentation of VertexSetRDD.scala

parent b8e294a2
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
import org.apache.spark.graph.impl.AggregationMsg
import org.apache.spark.graph.impl.MsgRDDFunctions._
/**
* The `VertexSetIndex` maintains the per-partition mapping from
* vertex id to the corresponding location in the per-partition values
......@@ -88,7 +89,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
extends RDD[(Vid, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
/**
* Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
* The resulting VertexSet will be based on a different index and can
......@@ -96,7 +96,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
*/
def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
/**
* An internal representation which joins the block indices with the values
* This is used by the compute function to emulate RDD[(Vid, V)]
......@@ -104,19 +103,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
protected[spark] val tuples =
new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
/**
* The partitioner is defined by the index.
*/
override val partitioner = index.rdd.partitioner
/**
* The actual partitions are defined by the tuples.
*/
override def getPartitions: Array[Partition] = tuples.getPartitions
/**
* The preferred locations are computed based on the preferred
* locations of the tuples.
......@@ -124,7 +120,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
override def getPreferredLocations(s: Partition): Seq[String] =
tuples.getPreferredLocations(s)
/**
* Caching an VertexSetRDD causes the index and values to be cached separately.
*/
......@@ -134,15 +129,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
return this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexSetRDD[V] = persist()
/**
* Provide the RDD[(K,V)] equivalent output.
*/
......@@ -152,7 +144,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
} // end of compute
/**
* Restrict the vertex set to the set of vertices satisfying the
* given predicate.
......@@ -190,7 +181,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
new VertexSetRDD[V](index, newValues)
} // end of filter
/**
* Pass each vertex attribute through a map function and retain the
* original RDD's partitioning and index.
......@@ -214,7 +204,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
new VertexSetRDD[U](index, newValuesRDD)
} // end of mapValues
/**
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
......@@ -247,8 +236,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo update docs to reflect function argument
*
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set will only contain
......@@ -257,6 +244,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexSetRDD containing only the vertices in both this
* and the other VertexSet and with tuple attributes.
*
......@@ -287,13 +276,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo document
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index.
*
* @param other
* @param f
* @tparam W
* @tparam Z
* @return
* @param other the vertex set to join with this vertex set
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a collection of tuples.
* @tparam W the type of the other vertex set attributes
* @tparam Z the type of the tuples emitted by `f`
* @return an RDD containing the tuples emitted by `f`
*/
def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
RDD[Z] = {
......@@ -316,8 +308,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo update docs to reflect function argument
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set contains an entry
......@@ -327,6 +317,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexSetRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
......@@ -368,11 +360,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @param merge the function used combine duplicate vertex
* attributes
* @return a VertexSetRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
* VertexSet with the attribute emitted by f.
*
*/
def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)])
......@@ -396,181 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
} // end of leftJoin
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this` as well as `other`.
*/
/*
def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
VertexSetRDD[(Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
other match {
case other: VertexSetRDD[_] if index == other.index => {
// if both RDDs share exactly the same index and therefore the same
// super set of keys then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS) = otherIter.next()
assert(!otherIter.hasNext)
/**
* @todo consider implementing this with a view as in leftJoin to
* reduce array allocations
*/
val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
val newBS = thisBS | otherBS
var ind = newBS.nextSetBit(0)
while(ind >= 0) {
val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
newValues(ind) = (a, b)
ind = newBS.nextSetBit(ind+1)
}
Iterator((newValues.toIndexedSeq, newBS))
}
new VertexSetRDD(index, newValues)
}
case other: VertexSetRDD[_]
if index.rdd.partitioner == other.index.rdd.partitioner => {
// If both RDDs are indexed using different indices but with the same partitioners
// then we we need to first merge the indicies and then use the merged index to
// merge the values.
val newIndex =
index.rdd.zipPartitions(other.index.rdd)(
(thisIter, otherIter) => {
val thisIndex = thisIter.next()
assert(!thisIter.hasNext)
val otherIndex = otherIter.next()
assert(!otherIter.hasNext)
// Merge the keys
val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity)
var ind = thisIndex.nextPos(0)
while(ind >= 0) {
newIndex.fastAdd(thisIndex.getValue(ind))
ind = thisIndex.nextPos(ind+1)
}
var ind = otherIndex.nextPos(0)
while(ind >= 0) {
newIndex.fastAdd(otherIndex.getValue(ind))
ind = otherIndex.nextPos(ind+1)
}
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
newIndex.zipPartitions(tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
val newIndex = newIndexIter.next()
assert(!newIndexIter.hasNext)
// Get the corresponding indicies and values for this and the other VertexSetRDD
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext)
val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
assert(!otherTuplesIter.hasNext)
// Preallocate the new Values array
val newValues = new Array[(Seq[V], Seq[W])](newIndex.size)
val newBS = new BitSet(newIndex.size)
// Lookup the sequences in both submaps
for ((k,ind) <- newIndex) {
// Get the left key
val a = if (thisIndex.contains(k)) {
val ind = thisIndex.get(k)
if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
} else Seq.empty[V]
// Get the right key
val b = if (otherIndex.contains(k)) {
val ind = otherIndex.get(k)
if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
} else Seq.empty[W]
// If at least one key was present then we generate a tuple.
if (!a.isEmpty || !b.isEmpty) {
newValues(ind) = (a, b)
newBS.set(ind)
}
}
Iterator((newValues.toIndexedSeq, newBS))
})
new VertexSetRDD(new VertexSetIndex(newIndex), newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) {
other
} else {
other.partitionBy(partitioner)
}
// Join the other RDD with this RDD building a new valueset and new index on the fly
val groups = tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this VertexSetRDD
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap]
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
val newBS = new BitSet(thisValues.size)
// populate the newValues with the values in this VertexSetRDD
for ((k,i) <- thisIndex) {
if (thisBS.get(i)) {
newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
newBS.set(i)
}
}
// Now iterate through the other tuples updating the map
for ((k,w) <- otherTuplesIter){
if (newIndex.contains(k)) {
val ind = newIndex.get(k)
if(newBS.get(ind)) {
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
} else {
// If the other key was in the index but not in the values
// of this indexed RDD then create a new values entry for it
newBS.set(ind)
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
}
} else {
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
newBS.set(ind)
// Update the values
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
}
}
Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
}
}
} // end of cogroup
*/
} // End of VertexSetRDD
/**
* The VertexSetRDD singleton is used to construct VertexSets
*/
......@@ -627,7 +448,6 @@ object VertexSetRDD {
new VertexSetRDD[V](new VertexSetIndex(index), values)
} // end of apply
/**
* Construct a vertex set from an RDD using an existing index.
*
......@@ -642,7 +462,6 @@ object VertexSetRDD {
rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
apply(rdd, index, (a:V,b:V) => a)
/**
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
......@@ -659,8 +478,17 @@ object VertexSetRDD {
reduceFunc: (V, V) => V): VertexSetRDD[V] =
apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
def aggregate[V: ClassManifest](
/**
* Construct a vertex set from an RDD of AggregationMsgs
*
* @tparam V the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
*/
private[spark] def aggregate[V: ClassManifest](
rdd: RDD[AggregationMsg[V]], index: VertexSetIndex,
reduceFunc: (V, V) => V): VertexSetRDD[V] = {
......@@ -696,7 +524,6 @@ object VertexSetRDD {
new VertexSetRDD(index, values)
}
/**
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
......@@ -767,7 +594,6 @@ object VertexSetRDD {
new VertexSetRDD(index, values)
} // end of apply
/**
* Construct an index of the unique vertices. The resulting index
* can be used to build VertexSets over subsets of the vertices in
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment