Skip to content
Snippets Groups Projects
Commit 3a04e76c authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Reynold's second round of comments

parent 4f2236a1
No related branches found
No related tags found
No related merge requests found
......@@ -69,7 +69,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo("Computing partition " + split)
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) return computedValues
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
......
......@@ -17,7 +17,6 @@
package org.apache.spark
import scala.Array
import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
......@@ -25,27 +24,30 @@ import org.scalatest.mock.EasyMockSugar
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockManager, StorageLevel}
import org.junit.Assert
// TODO: Test the CacheManager's thread-safety aspects
class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
val sc = new SparkContext("local", "test")
val split = new Partition { override def index: Int = 0 }
// An RDD which returns the values [1, 2, 3, 4].
val rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(split)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
}
var sc : SparkContext = _
var blockManager: BlockManager = _
var cacheManager: CacheManager = _
var split: Partition = _
/** An RDD which returns the values [1, 2, 3, 4]. */
var rdd: RDD[Int] = _
before {
sc = new SparkContext("local", "test")
blockManager = mock[BlockManager]
cacheManager = new CacheManager(blockManager)
split = new Partition { override def index: Int = 0 }
rdd = new RDD(sc, Nil) {
override def getPartitions: Array[Partition] = Array(split)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
}
}
after {
sc.stop()
}
test("get uncached rdd") {
......@@ -58,7 +60,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = false, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
Assert.assertEquals(value.toList, List(1, 2, 3, 4))
assert(value.toList === List(1, 2, 3, 4))
}
}
......@@ -70,7 +72,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = false, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
Assert.assertEquals(value.toList, List(5, 6, 7))
assert(value.toList === List(5, 6, 7))
}
}
......@@ -83,7 +85,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = true, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
Assert.assertEquals(value.toList, List(1, 2, 3, 4))
assert(value.toList === List(1, 2, 3, 4))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment