diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 37fb60c4cc9c81bfc8310ac0d392469ce8f98a40..f08efa7bb954585640bb5a83a164eae1c91e69d0 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -58,7 +58,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test("aggregateNeighborsSourceAttrOnly") { + test("aggregateNeighbors - source attribute replication only") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 // Create a star graph where the degree of each vertex is its attribute @@ -78,6 +78,24 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("aggregateNeighbors - no vertex attribute replication") { + withSpark(new SparkContext("local[2]", "test")) { sc => + val n = 3 + // Not serializable because it captures org.scalatest.Engine + class UnserializableAttribute {} + // Create a star graph where vertex attributes are not serializable + val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)))) + .mapVertices { (id, attr) => new UnserializableAttribute } + + // Should not serialize any vertex attributes + val ignoreAttributes = star.aggregateNeighbors( + (vid, edge) => Some(0), + (a: Int, b: Int) => a + b, + EdgeDirection.In) + assert(ignoreAttributes.collect().toSet === (1 to n).map(x => (x, 0)).toSet) + } + } + test("joinVertices") { withSpark(new SparkContext("local", "test")) { sc => val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2)