From 131be5d62ef6b770de5106eb268a45bca385b599 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Mon, 14 Jan 2013 03:28:25 -0800
Subject: [PATCH] Fixed bug in RDD checkpointing.

---
 core/src/main/scala/spark/rdd/CheckpointRDD.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 86c63ca2f4..6f00f6ac73 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -80,12 +80,12 @@ private[spark] object CheckpointRDD extends Logging {
     val serializer = SparkEnv.get.serializer.newInstance()
     val serializeStream = serializer.serializeStream(fileOutputStream)
     serializeStream.writeAll(iterator)
-    fileOutputStream.close()
+    serializeStream.close()
 
     if (!fs.rename(tempOutputPath, finalOutputPath)) {
       if (!fs.delete(finalOutputPath, true)) {
         throw new IOException("Checkpoint failed: failed to delete earlier output of task "
-          + context.attemptId);
+          + context.attemptId)
       }
       if (!fs.rename(tempOutputPath, finalOutputPath)) {
         throw new IOException("Checkpoint failed: failed to save output of task: "
@@ -119,7 +119,7 @@ private[spark] object CheckpointRDD extends Logging {
     val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
     val path = new Path(hdfsPath, "temp")
     val fs = path.getFileSystem(new Configuration())
-    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 10) _)
+    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same")
     assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same")
-- 
GitLab