diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index d9cbe3730a8a9c27874dc135d932031afd806e6d..c5db6ce63a43807bb1755313407022458ff4e384 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b // TODO: fetch any remote copy of the split that may be available // TODO: also register a listener for when it unloads logInfo("Computing partition " + split) + val elements = new ArrayBuffer[Any] + elements ++= rdd.compute(split) try { - // BlockManager will iterate over results from compute to create RDD - blockManager.put(key, rdd.compute(split), storageLevel, true) + // Try to put this block in the blockManager + blockManager.put(key, elements, storageLevel, true) //future.apply() // Wait for the reply from the cache tracker - blockManager.get(key) match { - case Some(values) => - return values.asInstanceOf[Iterator[T]] - case None => - logWarning("loading partition failed after computing it " + key) - return null - } } finally { loading.synchronized { loading.remove(key) loading.notifyAll() } } + return elements.iterator.asInstanceOf[Iterator[T]] } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 91b7bebfb30fce4bb6d0e5e6564138bf268c9525..8a111f44c9f435b8267cb5f542431de0b3939f54 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m diskStore.getValues(blockId) match { case Some(iterator) => // Put the block back in memory before returning it - memoryStore.putValues(blockId, iterator, level, true).data match { + // TODO: Consider creating a putValues that also takes in a iterator ? + val elements = new ArrayBuffer[Any] + elements ++= iterator + memoryStore.putValues(blockId, elements, level, true).data match { case Left(iterator2) => return Some(iterator2) case _ => @@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) + : Long = { + val elements = new ArrayBuffer[Any] + elements ++= values + put(blockId, elements, level, tellMaster) + } + /** * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ - def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) - : Long = { + def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, + tellMaster: Boolean = true) : Long = { if (blockId == null) { throw new IllegalArgumentException("Block Id is null") @@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. */ - def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) { + def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) @@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level.useDisk && !diskStore.contains(blockId)) { logInfo("Writing block " + blockId + " to disk") data match { - case Left(iterator) => - diskStore.putValues(blockId, iterator, level, false) + case Left(elements) => + diskStore.putValues(blockId, elements, level, false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 1286600cd1aaf13fb5be8d346794bdb016e65036..096bf8bdd967c05f5a9e55de844a2084b8a681be 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,6 +1,7 @@ package spark.storage import java.nio.ByteBuffer +import scala.collection.mutable.ArrayBuffer import spark.Logging @@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { * @return a PutResult that contains the size of the data, as well as the values put if * returnValues is true (if not, the result's data field can be null) */ - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean) - : PutResult + def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, + returnValues: Boolean) : PutResult /** * Return the size of a block in bytes. diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index fd92a3dc67a824eed1f242fc1d65ef1d89aefb18..8ba64e4b76a0e142307932da74b0f058833d0ac1 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -3,11 +3,15 @@ package spark.storage import java.nio.ByteBuffer import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.channels.FileChannel.MapMode -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import java.util.{Random, Date} -import spark.Utils import java.text.SimpleDateFormat +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + +import scala.collection.mutable.ArrayBuffer + +import spark.Utils + /** * Stores BlockManager blocks on disk. */ @@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def putValues( blockId: String, - values: Iterator[Any], + values: ArrayBuffer[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { @@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val fileOut = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) - objOut.writeAll(values) + objOut.writeAll(values.iterator) objOut.close() val length = file.length() logDebug("Block %s stored as %s file on disk in %d ms".format( diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index e9288fdf435e53c5c2915aa40f9ee3c0df3696fb..773970446a0eecdb6c5dccecf6766507447050a4 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def putValues( blockId: String, - values: Iterator[Any], + values: ArrayBuffer[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { if (level.deserialized) { - val elements = new ArrayBuffer[Any] - elements ++= values - val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - tryToPut(blockId, elements, sizeEstimate, true) - PutResult(sizeEstimate, Left(elements.iterator)) + val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) + tryToPut(blockId, values, sizeEstimate, true) + PutResult(sizeEstimate, Left(values.iterator)) } else { - val bytes = blockManager.dataSerialize(blockId, values) + val bytes = blockManager.dataSerialize(blockId, values.iterator) tryToPut(blockId, bytes, bytes.limit, false) PutResult(bytes.limit(), Right(bytes)) } @@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. val data = if (deserialized) { - Left(value.asInstanceOf[ArrayBuffer[Any]].iterator) + Left(value.asInstanceOf[ArrayBuffer[Any]]) } else { Right(value.asInstanceOf[ByteBuffer].duplicate()) } @@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) for (blockId <- selectedBlocks) { val entry = entries.get(blockId) val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) + Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala index 426c0d26e9b4422651f2873359b829b6c259584d..467605981b3d99b886adc61b981340710e21b760 100644 --- a/core/src/test/scala/spark/CacheTrackerSuite.scala +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite { } catch { case e: Exception => throw new SparkException("Error communicating with actor", e) - } + } } test("CacheTrackerActor slave initialization & cache status") { diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 31b33eae0945af1cf7ab37b4513558b2256c41cc..b9c19e61cd1c26d6e505d5175d9a5a9f4e2bd4a6 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY) - store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true) assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") != None, "list3 was not in store") @@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) assert(store.get("list1") != None, "list1 was not in store") assert(store.get("list1").get.size == 2) assert(store.get("list2") != None, "list2 was not in store") @@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val list3 = List(new Array[Byte](200), new Array[Byte](200)) val list4 = List(new Array[Byte](200), new Array[Byte](200)) // First store list1 and list2, both in memory, and list3, on disk only - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER) - store.put("list3", list3.iterator, StorageLevel.DISK_ONLY) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true) + store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true) // At this point LRU should not kick in because list3 is only on disk assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1").get.size === 2) @@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER) + store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2)