Skip to content
Snippets Groups Projects
Commit 16baea62 authored by Tathagata Das's avatar Tathagata Das
Browse files

Fixed bug in CheckpointRDD to prevent exception when the original RDD had zero splits.

parent 99a5fc49
No related branches found
No related tags found
No related merge requests found
...@@ -24,8 +24,8 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri ...@@ -24,8 +24,8 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
val dirContents = fs.listStatus(new Path(checkpointPath)) val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numSplits = splitFiles.size val numSplits = splitFiles.size
if (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || if (numSplits > 0 && (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
!splitFiles(numSplits-1).endsWith(CheckpointRDD.splitIdToFile(numSplits-1))) { !splitFiles(numSplits-1).endsWith(CheckpointRDD.splitIdToFile(numSplits-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath) throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
} }
Array.tabulate(numSplits)(i => new CheckpointRDDSplit(i)) Array.tabulate(numSplits)(i => new CheckpointRDDSplit(i))
......
...@@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { ...@@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
} }
test("CheckpointRDD with zero partitions") {
val rdd = new BlockRDD[Int](sc, Array[String]())
assert(rdd.splits.size === 0)
assert(rdd.isCheckpointed === false)
rdd.checkpoint()
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
assert(rdd.splits.size === 0)
}
/** /**
* Test checkpointing of the final RDD generated by the given operation. By default, * Test checkpointing of the final RDD generated by the given operation. By default,
* this method tests whether the size of serialized RDD has reduced after checkpointing or not. * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment