Skip to content
Snippets Groups Projects
Commit 131be5d6 authored by Tathagata Das's avatar Tathagata Das
Browse files

Fixed bug in RDD checkpointing.

parent f90f794c
No related branches found
No related tags found
No related merge requests found
......@@ -80,12 +80,12 @@ private[spark] object CheckpointRDD extends Logging {
val serializer = SparkEnv.get.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
fileOutputStream.close()
serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Checkpoint failed: failed to delete earlier output of task "
+ context.attemptId);
+ context.attemptId)
}
if (!fs.rename(tempOutputPath, finalOutputPath)) {
throw new IOException("Checkpoint failed: failed to save output of task: "
......@@ -119,7 +119,7 @@ private[spark] object CheckpointRDD extends Logging {
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(new Configuration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 10) _)
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment