diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index c2b510a31ee3fdb0523cef2dd91432395fad748d..9eabccdee48dbb9c56222d00584cbd6d67bafcaf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def reverse: Graph[VD, ED] = { val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) - new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) + GraphImpl(vertices, newETable) } override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index c65e36636fe10b51b7a8ea4c72b778cae6148add..d9ba4672ce0c5d06a361a5c5522ac1ba4f34d08e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("reverse with join elimination") { + withSpark { sc => + val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2))) + val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0))) + val graph = Graph(vertices, edges).reverse + val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _) + assert(result.collect.toSet === Set((1L, 2))) + } + } + test("subgraph") { withSpark { sc => // Create a star graph of 10 veritces.