Skip to content
Snippets Groups Projects
Commit 235a47ce authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

Rebuild routing table after Graph.reverse

GraphImpl.reverse used to reverse edges in each partition of the edge RDD but preserve the routing table and replicated vertex view, since reversing should not affect partitioning.

However, the old routing table would then have incorrect information for srcAttrOnly and dstAttrOnly. These RDDs should be switched.

A simple fix is for Graph.reverse to rebuild the routing table and replicated vertex view.

Thanks to Bogdan Ghidireac for reporting this issue on the [mailing list](http://apache-spark-user-list.1001560.n3.nabble.com/graph-reverse-amp-Pregel-API-td4338.html).

Author: Ankur Dave <ankurdave@gmail.com>

Closes #431 from ankurdave/fix-reverse-bug and squashes the following commits:

75d63cb [Ankur Dave] Rebuild routing table after Graph.reverse
parent 987760ec
No related branches found
No related tags found
No related merge requests found
...@@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ...@@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def reverse: Graph[VD, ED] = { override def reverse: Graph[VD, ED] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) GraphImpl(vertices, newETable)
} }
override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = { override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
......
...@@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { ...@@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
} }
} }
test("reverse with join elimination") {
withSpark { sc =>
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect.toSet === Set((1L, 2)))
}
}
test("subgraph") { test("subgraph") {
withSpark { sc => withSpark { sc =>
// Create a star graph of 10 veritces. // Create a star graph of 10 veritces.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment