Skip to content
Snippets Groups Projects
Commit c49d1566 authored by Brennon York's avatar Brennon York Committed by Sean Owen
Browse files

[SPARK-5790][GraphX]: VertexRDD's won't zip properly for `diff` capability (added tests)

Added tests that maropu [created](https://github.com/maropu/spark/blob/1f64794b2ce33e64f340e383d4e8a60639a7eb4b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala) for vertices with differing partition counts. Wanted to make sure his work got captured /merged as its not in the master branch and I don't believe there's a PR out already for it.

Author: Brennon York <brennon.york@capitalone.com>

Closes #5023 from brennonyork/SPARK-5790 and squashes the following commits:

83bbd29 [Brennon York] added maropu's tests for vertices with differing partition counts
parent 127268bc
No related branches found
No related tags found
No related merge requests found
...@@ -19,7 +19,7 @@ package org.apache.spark.graphx ...@@ -19,7 +19,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.SparkContext import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
class VertexRDDSuite extends FunSuite with LocalSparkContext { class VertexRDDSuite extends FunSuite with LocalSparkContext {
...@@ -58,6 +58,16 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { ...@@ -58,6 +58,16 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
} }
} }
test("diff vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.diff(vertexB)
assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
}
}
test("leftJoin") { test("leftJoin") {
withSpark { sc => withSpark { sc =>
val n = 100 val n = 100
...@@ -73,6 +83,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { ...@@ -73,6 +83,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
} }
} }
test("leftJoin vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
val vertexB = VertexRDD(
vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
old - newOpt.getOrElse(0)
}
assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
}
}
test("innerJoin") { test("innerJoin") {
withSpark { sc => withSpark { sc =>
val n = 100 val n = 100
...@@ -87,6 +110,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { ...@@ -87,6 +110,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
(0 to n by 2).map(x => (x.toLong, 0)).toSet) } (0 to n by 2).map(x => (x.toLong, 0)).toSet) }
} }
test("innerJoin vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
val vertexB = VertexRDD(
vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
old - newVal
}
assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
}
}
test("aggregateUsingIndex") { test("aggregateUsingIndex") {
withSpark { sc => withSpark { sc =>
val n = 100 val n = 100
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment