Skip to content
Snippets Groups Projects
Commit a1413b36 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Andrew Or
Browse files

[SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed

JIRA: https://issues.apache.org/jira/browse/SPARK-11051

When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9072 from viirya/no-localcheckpoint-after-checkpoint.
parent 7ab0ce65
No related branches found
No related tags found
No related merge requests found
......@@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag](
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
/**
......@@ -1520,20 +1524,37 @@ abstract class RDD[T: ClassTag](
persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
}
checkpointData match {
case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning(
"RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
case _ =>
// If this RDD is already checkpointed and materialized, its lineage is already truncated.
// We must not override our `checkpointData` in this case because it is needed to recover
// the checkpointed data. If it is overridden, next time materializing on this RDD will
// cause error.
if (isCheckpointedAndMaterialized) {
logWarning("Not marking RDD for local checkpoint because it was already " +
"checkpointed and materialized")
} else {
// Lineage is not truncated yet, so just override any existing checkpoint data with ours
checkpointData match {
case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
"RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
case _ =>
}
checkpointData = Some(new LocalRDDCheckpointData(this))
}
checkpointData = Some(new LocalRDDCheckpointData(this))
this
}
/**
* Return whether this RDD is marked for checkpointing, either reliably or locally.
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
* return value. Exposed for testing.
*/
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
/**
* Return whether this RDD is marked for local checkpointing.
* Exposed for testing.
......
......@@ -241,9 +241,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
val rdd = new BlockRDD[Int](sc, Array[BlockId]())
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
assert(rdd.isCheckpointedAndMaterialized === false)
checkpoint(rdd, reliableCheckpoint)
assert(rdd.isCheckpointed === false)
assert(rdd.isCheckpointedAndMaterialized === false)
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
assert(rdd.isCheckpointedAndMaterialized === true)
assert(rdd.partitions.size === 0)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment