Skip to content
Snippets Groups Projects
Commit 7a013529 authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

[SPARK-2455] Mark (Shippable)VertexPartition serializable

VertexPartition and ShippableVertexPartition are contained in RDDs but are not marked Serializable, leading to NotSerializableExceptions when using Java serialization.

The fix is simply to mark them as Serializable. This PR does that and adds a test for serializing them using Java and Kryo serialization.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #1376 from ankurdave/SPARK-2455 and squashes the following commits:

ed4a51b [Ankur Dave] Make (Shippable)VertexPartition serializable
1fd42c5 [Ankur Dave] Add failing tests for Java serialization
parent 2245c87a
No related branches found
No related tags found
No related merge requests found
......@@ -119,7 +119,7 @@ object RoutingTablePartition {
*/
private[graphx]
class RoutingTablePartition(
private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable {
/** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
val numEdgePartitions: Int = routingTable.size
......
......@@ -60,7 +60,8 @@ private[graphx] object VertexPartitionBase {
* `VertexPartitionBaseOpsConstructor` typeclass (for example,
* [[VertexPartition.VertexPartitionOpsConstructor]]).
*/
private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag] {
private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag]
extends Serializable {
def index: VertexIdToIndexMap
def values: Array[VD]
......
......@@ -35,7 +35,7 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
private[graphx] abstract class VertexPartitionBaseOps
[VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
(self: Self[VD])
extends Logging {
extends Serializable with Logging {
def withIndex(index: VertexIdToIndexMap): Self[VD]
def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
......
......@@ -23,6 +23,7 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.graphx._
......@@ -124,18 +125,21 @@ class EdgePartitionSuite extends FunSuite {
assert(ep.numActives == Some(2))
}
test("Kryo serialization") {
test("serialization") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
val conf = new SparkConf()
val javaSer = new JavaSerializer(new SparkConf())
val kryoSer = new KryoSerializer(new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
val s = new KryoSerializer(conf).newInstance()
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
assert(aSer.srcIds.toList === a.srcIds.toList)
assert(aSer.dstIds.toList === a.dstIds.toList)
assert(aSer.data.toList === a.data.toList)
assert(aSer.index != null)
assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
assert(aSer.srcIds.toList === a.srcIds.toList)
assert(aSer.dstIds.toList === a.dstIds.toList)
assert(aSer.data.toList === a.data.toList)
assert(aSer.index != null)
assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
}
}
}
......@@ -17,9 +17,14 @@
package org.apache.spark.graphx.impl
import org.apache.spark.graphx._
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.graphx._
class VertexPartitionSuite extends FunSuite {
test("isDefined, filter") {
......@@ -116,4 +121,17 @@ class VertexPartitionSuite extends FunSuite {
assert(vp3.index.getPos(2) === -1)
}
test("serialization") {
val verts = Set((0L, 1), (1L, 1), (2L, 1))
val vp = VertexPartition(verts.iterator)
val javaSer = new JavaSerializer(new SparkConf())
val kryoSer = new KryoSerializer(new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp))
assert(vpSer.iterator.toSet === verts)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment