Skip to content
Snippets Groups Projects
Commit 0d7e3852 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD


## What changes were proposed in this pull request?

EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed.

This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException.

The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic.
## How was this patch tested?

New unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15396 from tdas/SPARK-14804.

(cherry picked from commit 47d5d0dd)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent a5c10ff2
No related branches found
No related tags found
No related merge requests found
......@@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
def isCheckpointed: Boolean = isCheckpointedAndMaterialized
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
* return value. Exposed for testing.
*/
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
private[spark] def isCheckpointedAndMaterialized: Boolean =
checkpointData.exists(_.isCheckpointed)
/**
* Return whether this RDD is marked for local checkpointing.
......
......@@ -19,6 +19,7 @@ package org.apache.spark.graphx
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
......@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}
test("checkpointing") {
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
edges.checkpoint()
// EdgeRDD not yet checkpointed
assert(!edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(!edges.partitionsRDD.isCheckpointed)
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
val data = edges.collect().toSeq // force checkpointing
// EdgeRDD shows up as checkpointed, but internally it is not.
// Only internal partitionsRDD is checkpointed.
assert(edges.isCheckpointed)
assert(!edges.isCheckpointedAndMaterialized)
assert(edges.partitionsRDD.isCheckpointed)
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
assert(edges.collect().toSeq === data) // test checkpointed RDD
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
......@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}
test("checkpoint") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n)
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
verts.checkpoint()
// VertexRDD not yet checkpointed
assert(!verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(!verts.partitionsRDD.isCheckpointed)
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
val data = verts.collect().toSeq // force checkpointing
// VertexRDD shows up as checkpointed, but internally it is not.
// Only internal partitionsRDD is checkpointed.
assert(verts.isCheckpointed)
assert(!verts.isCheckpointedAndMaterialized)
assert(verts.partitionsRDD.isCheckpointed)
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
assert(verts.collect().toSeq === data) // test checkpointed RDD
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment