diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala new file mode 100644 index 0000000000000000000000000000000000000000..ea009f0f4f260f9c37b427ce4a56b6ac070cd70f --- /dev/null +++ b/core/src/main/scala/spark/BlockRDD.scala @@ -0,0 +1,42 @@ +package spark + +import scala.collection.mutable.HashMap + +class BlockRDDSplit(val blockId: String, idx: Int) extends Split { + val index = idx +} + + +class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) { + + @transient + val splits_ = (0 until blockIds.size).map(i => { + new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] + }).toArray + + @transient + lazy val locations_ = { + val blockManager = SparkEnv.get.blockManager + /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ + val locations = blockManager.getLocations(blockIds) + HashMap(blockIds.zip(locations):_*) + } + + override def splits = splits_ + + override def compute(split: Split): Iterator[T] = { + val blockManager = SparkEnv.get.blockManager + val blockId = split.asInstanceOf[BlockRDDSplit].blockId + blockManager.get(blockId) match { + case Some(block) => block.asInstanceOf[Iterator[T]] + case None => + throw new Exception("Could not compute split, block " + blockId + " not found") + } + } + + override def preferredLocations(split: Split) = + locations_(split.asInstanceOf[BlockRDDSplit].blockId) + + override val dependencies: List[Dependency[_]] = Nil +} + diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index fa5dcee7bbf0c4cd66a1d2f0bd363799e4c9eaff..6fe0b94297e6aa74eb294187e542a8881fad5a39 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -91,7 +91,15 @@ class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging { protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) // TODO: remove BoundedMemoryCache - SparkEnv.get.cacheTracker.dropEntry(datasetId.asInstanceOf[(Int, Int)]._2, partition) + + val (keySpaceId, innerDatasetId) = datasetId.asInstanceOf[(Any, Any)] + innerDatasetId match { + case rddId: Int => + SparkEnv.get.cacheTracker.dropEntry(rddId, partition) + case broadcastUUID: java.util.UUID => + // TODO: Maybe something should be done if the broadcasted variable falls out of cache + case _ => + } } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 429e9c936fe3a941f23303809c3dde3bdea98ed5..8a79e85cf94061f000615c4cff3f503db4566584 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -94,6 +94,25 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def getStorageLevel = storageLevel + def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = { + if (!level.useDisk && level.replication < 2) { + throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")") + } + + // This is a hack. Ideally this should re-use the code used by the CacheTracker + // to generate the key. + def getSplitKey(split: Split) = "rdd:%d:%d".format(this.id, split.index) + + persist(level) + sc.runJob(this, (iter: Iterator[T]) => {} ) + + val p = this.partitioner + + new BlockRDD[T](sc, splits.map(getSplitKey).toArray) { + override val partitioner = p + } + } + // Read this RDD; will read from cache if applicable, or otherwise compute final def iterator(split: Split): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 15131960d608918c844b9cc957380aec97096ec3..9faa0e62f24e67a8676fa5fa9c7a37ce087ec7ed 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -5,7 +5,7 @@ import java.nio._ import java.nio.channels.FileChannel.MapMode import java.util.{HashMap => JHashMap} import java.util.LinkedHashMap -import java.util.UUID +import java.util.concurrent.ConcurrentHashMap import java.util.Collections import scala.actors._ @@ -74,7 +74,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging private val NUM_LOCKS = 337 private val locker = new BlockLocker(NUM_LOCKS) - private val blockInfo = Collections.synchronizedMap(new JHashMap[String, BlockInfo]) + private val blockInfo = new ConcurrentHashMap[String, BlockInfo]() private val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private val diskStore: BlockStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 8672a5376ebd057eff95e9e5c5748c429da39296..17f4f51aa8b8e0eba922919fb96c9d39ed47956d 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,16 +1,15 @@ package spark.storage import spark.{Utils, Logging, Serializer, SizeEstimator} - import scala.collection.mutable.ArrayBuffer - import java.io.{File, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode import java.util.{UUID, LinkedHashMap} import java.util.concurrent.Executors - +import java.util.concurrent.ConcurrentHashMap import it.unimi.dsi.fastutil.io._ +import java.util.concurrent.ArrayBlockingQueue /** * Abstract class to store blocks @@ -41,13 +40,29 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging { class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { - class Entry(var value: Any, val size: Long, val deserialized: Boolean) + case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true) private var currentMemory = 0L - private val blockDropper = Executors.newSingleThreadExecutor() - + //private val blockDropper = Executors.newSingleThreadExecutor() + private val blocksToDrop = new ArrayBlockingQueue[String](10000, true) + private val blockDropper = new Thread("memory store - block dropper") { + override def run() { + try{ + while (true) { + val blockId = blocksToDrop.take() + logDebug("Block " + blockId + " ready to be dropped") + blockManager.dropFromMemory(blockId) + } + } catch { + case ie: InterruptedException => + logInfo("Shutting down block dropper") + } + } + } + blockDropper.start() + def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { if (level.deserialized) { bytes.rewind() @@ -124,41 +139,45 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) memoryStore.synchronized { memoryStore.clear() } - blockDropper.shutdown() + //blockDropper.shutdown() + blockDropper.interrupt() logInfo("MemoryStore cleared") } - private def drop(blockId: String) { - blockDropper.submit(new Runnable() { - def run() { - blockManager.dropFromMemory(blockId) - } - }) - } - private def ensureFreeSpace(space: Long) { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( space, currentMemory, maxMemory)) - val droppedBlockIds = new ArrayBuffer[String]() - var droppedMemory = 0L - - memoryStore.synchronized { - val iter = memoryStore.entrySet().iterator() - while (maxMemory - (currentMemory - droppedMemory) < space && iter.hasNext) { - val pair = iter.next() - val blockId = pair.getKey - droppedBlockIds += blockId - droppedMemory += pair.getValue.size - logDebug("Decided to drop " + blockId) + if (maxMemory - currentMemory < space) { + + val selectedBlocks = new ArrayBuffer[String]() + var selectedMemory = 0L + + memoryStore.synchronized { + val iter = memoryStore.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) { + val pair = iter.next() + val blockId = pair.getKey + val entry = pair.getValue() + if (!entry.dropPending) { + selectedBlocks += blockId + entry.dropPending = true + } + selectedMemory += pair.getValue.size + logDebug("Block " + blockId + " selected for dropping") + } + } + + logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " + + blocksToDrop.size + " blocks pending") + var i = 0 + while (i < selectedBlocks.size) { + blocksToDrop.add(selectedBlocks(i)) + i += 1 } - } - - for (blockId <- droppedBlockIds) { - drop(blockId) + selectedBlocks.clear() } - droppedBlockIds.clear() - } + } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 4a79c086e9bf618f6ada018ad748d9084b8b97ad..20638bba92c103e39a4bd4f224b45d81cc9ac0ed 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -59,4 +59,11 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps) assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5))) } + + test("checkpointing") { + val sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint() + assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)) + sc.stop() + } }