Skip to content
Snippets Groups Projects
Commit 56c771cb authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

[SPARK-1931] Reconstruct routing tables in Graph.partitionBy

905173df introduced a bug in partitionBy where, after repartitioning the edges, it reuses the VertexRDD without updating the routing tables to reflect the new edge layout. Subsequent accesses of the triplets contain nulls for many vertex properties.

This commit adds a test for this bug and fixes it by introducing `VertexRDD#withEdges` and calling it in `partitionBy`.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #885 from ankurdave/SPARK-1931 and squashes the following commits:

3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins
9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy
parent cb7fe503
No related branches found
No related tags found
No related merge requests found
......@@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag](
def reverseRoutingTables(): VertexRDD[VD] =
this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
(partIter, routingTableIter) =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable))
}
new VertexRDD(vertexPartitions)
}
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
private[graphx] def shipVertexAttributes(
shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
......
......@@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true))
GraphImpl.fromExistingRDDs(vertices, newEdges)
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}
override def reverse: Graph[VD, ED] = {
......@@ -277,7 +277,11 @@ object GraphImpl {
GraphImpl(vertexRDD, edgeRDD)
}
/** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
/**
* Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
* VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
......@@ -290,7 +294,8 @@ object GraphImpl {
/**
* Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
* vertices.
* vertices. The VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
*/
def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
......
......@@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
Iterator((part.srcIds ++ part.dstIds).toSet)
}.collect
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
// Forming triplets view
val g = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
val gPart = g.partitionBy(EdgePartition2D)
assert(gPart.triplets.collect.map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment