Skip to content
Snippets Groups Projects
Commit ee8931d2 authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

Finished documenting vertexrdd.

parent 0fbc0b05
No related branches found
No related tags found
No related merge requests found
......@@ -683,7 +683,60 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
# Vertex and Edge RDDs
<a name="vertex_and_edge_rdds"></a>
GraphX exposes `RDD` views of the vertices and edges stored within the graph. However, because
GraphX maintains the vertices and edges in optimized data-structures and these data-structures
provide additional functionality, the vertices and edges are returned as `VertexRDD` and `EdgeRDD`
respectively. In this section we review some of the additional useful functionality in these types.
## VertexRDDs
The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional
constraint that each `VertexId` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of
vertices each with an attribute of type `A`. Internally, this is achieved by storing the vertex
attributes in a reusable hash-map data-structure. As a consequence if two `VertexRDD`s are derived
from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant
time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the
following additional functionality:
{% highlight scala %}
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
{% endhighlight %}
Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually
implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to
change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused. Both the
`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
`HashMap` and implement the join by linear scan rather than costly point lookups.
The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient
construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`. Conceptually, if I have constructed
a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some
`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the
RDD. For example:
{% highlight scala %}
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
{% endhighlight %}
# Optimized Representation
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment