diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 85a2b2b3314c9bab12a1d75799840c93df39d0a1..68b99ca1253d5f3b64395f55bedc322c9dbb81b2 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -69,7 +69,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo("Computing partition " + split) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally - if (context.runningLocally) return computedValues + if (context.runningLocally) { return computedValues } val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, true) diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index a85d666acefc8da7e669c92f41f4e7e679253330..214f8244d5091471802476a4d3ceb7ff10fd7848 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark -import scala.Array import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} @@ -25,27 +24,30 @@ import org.scalatest.mock.EasyMockSugar import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockManager, StorageLevel} -import org.junit.Assert // TODO: Test the CacheManager's thread-safety aspects class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - val sc = new SparkContext("local", "test") - - val split = new Partition { override def index: Int = 0 } - - // An RDD which returns the values [1, 2, 3, 4]. - val rdd = new RDD[Int](sc, Nil) { - override def getPartitions: Array[Partition] = Array(split) - override val getDependencies = List[Dependency[_]]() - override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator - } - + var sc : SparkContext = _ var blockManager: BlockManager = _ var cacheManager: CacheManager = _ + var split: Partition = _ + /** An RDD which returns the values [1, 2, 3, 4]. */ + var rdd: RDD[Int] = _ before { + sc = new SparkContext("local", "test") blockManager = mock[BlockManager] cacheManager = new CacheManager(blockManager) + split = new Partition { override def index: Int = 0 } + rdd = new RDD(sc, Nil) { + override def getPartitions: Array[Partition] = Array(split) + override val getDependencies = List[Dependency[_]]() + override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator + } + } + + after { + sc.stop() } test("get uncached rdd") { @@ -58,7 +60,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar whenExecuting(blockManager) { val context = new TaskContext(0, 0, 0, runningLocally = false, null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - Assert.assertEquals(value.toList, List(1, 2, 3, 4)) + assert(value.toList === List(1, 2, 3, 4)) } } @@ -70,7 +72,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar whenExecuting(blockManager) { val context = new TaskContext(0, 0, 0, runningLocally = false, null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - Assert.assertEquals(value.toList, List(5, 6, 7)) + assert(value.toList === List(5, 6, 7)) } } @@ -83,7 +85,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar whenExecuting(blockManager) { val context = new TaskContext(0, 0, 0, runningLocally = true, null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - Assert.assertEquals(value.toList, List(1, 2, 3, 4)) + assert(value.toList === List(1, 2, 3, 4)) } } }