diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a56e542242d5fe061927951e80ea491967853ad0..a97bb174438a50cc0111fb3e095332a441f26ebc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag](
    */
   private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
   {
-    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
+    if (isCheckpointedAndMaterialized) {
+      firstParent[T].iterator(split, context)
+    } else {
+      compute(split, context)
+    }
   }
 
   /**
@@ -1520,20 +1524,37 @@ abstract class RDD[T: ClassTag](
       persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
     }
 
-    checkpointData match {
-      case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning(
-        "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
-      case _ =>
+    // If this RDD is already checkpointed and materialized, its lineage is already truncated.
+    // We must not override our `checkpointData` in this case because it is needed to recover
+    // the checkpointed data. If it is overridden, next time materializing on this RDD will
+    // cause error.
+    if (isCheckpointedAndMaterialized) {
+      logWarning("Not marking RDD for local checkpoint because it was already " +
+        "checkpointed and materialized")
+    } else {
+      // Lineage is not truncated yet, so just override any existing checkpoint data with ours
+      checkpointData match {
+        case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
+          "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
+        case _ =>
+      }
+      checkpointData = Some(new LocalRDDCheckpointData(this))
     }
-    checkpointData = Some(new LocalRDDCheckpointData(this))
     this
   }
 
   /**
-   * Return whether this RDD is marked for checkpointing, either reliably or locally.
+   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    */
   def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
+  /**
+   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
+   * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
+   * return value. Exposed for testing.
+   */
+  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+
   /**
    * Return whether this RDD is marked for local checkpointing.
    * Exposed for testing.
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 4d70bfed909b63440cfc015652365e1b750b6e18..119e5fc28e41297bbf21a78d09c697e713748f2d 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -241,9 +241,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
     val rdd = new BlockRDD[Int](sc, Array[BlockId]())
     assert(rdd.partitions.size === 0)
     assert(rdd.isCheckpointed === false)
+    assert(rdd.isCheckpointedAndMaterialized === false)
     checkpoint(rdd, reliableCheckpoint)
+    assert(rdd.isCheckpointed === false)
+    assert(rdd.isCheckpointedAndMaterialized === false)
     assert(rdd.count() === 0)
     assert(rdd.isCheckpointed === true)
+    assert(rdd.isCheckpointedAndMaterialized === true)
     assert(rdd.partitions.size === 0)
   }