From 0426769f89d4017ecb61c7528eb0f66cdc9c05fc Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Thu, 26 Jul 2012 20:53:45 -0700 Subject: [PATCH] Modified the block dropping code for better performance. --- .../main/scala/spark/storage/BlockStore.scala | 75 ++++++++++++------- .../spark/storage/BlockManagerSuite.scala | 1 + 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 8672a5376e..ce6faced34 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,16 +1,15 @@ package spark.storage import spark.{Utils, Logging, Serializer, SizeEstimator} - import scala.collection.mutable.ArrayBuffer - import java.io.{File, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode import java.util.{UUID, LinkedHashMap} import java.util.concurrent.Executors - +import java.util.concurrent.ConcurrentHashMap import it.unimi.dsi.fastutil.io._ +import java.util.concurrent.ArrayBlockingQueue /** * Abstract class to store blocks @@ -41,13 +40,28 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging { class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { - class Entry(var value: Any, val size: Long, val deserialized: Boolean) + case class Entry(var value: Any, size: Long, deserialized: Boolean, dropPending: Boolean = false) private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true) private var currentMemory = 0L - private val blockDropper = Executors.newSingleThreadExecutor() - + //private val blockDropper = Executors.newSingleThreadExecutor() + private val blocksToDrop = new ArrayBlockingQueue[String](10000, true) + private val blockDropper = new Thread("memory store - block dropper") { + override def run() { + try{ + while (true) { + val blockId = blocksToDrop.take() + blockManager.dropFromMemory(blockId) + } + } catch { + case ie: InterruptedException => + logInfo("Shutting down block dropper") + } + } + } + blockDropper.start() + def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { if (level.deserialized) { bytes.rewind() @@ -124,41 +138,52 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) memoryStore.synchronized { memoryStore.clear() } - blockDropper.shutdown() + //blockDropper.shutdown() + blockDropper.interrupt() logInfo("MemoryStore cleared") } private def drop(blockId: String) { - blockDropper.submit(new Runnable() { + /*blockDropper.submit(new Runnable() { def run() { blockManager.dropFromMemory(blockId) } }) + */ } private def ensureFreeSpace(space: Long) { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( space, currentMemory, maxMemory)) - val droppedBlockIds = new ArrayBuffer[String]() - var droppedMemory = 0L - - memoryStore.synchronized { - val iter = memoryStore.entrySet().iterator() - while (maxMemory - (currentMemory - droppedMemory) < space && iter.hasNext) { - val pair = iter.next() - val blockId = pair.getKey - droppedBlockIds += blockId - droppedMemory += pair.getValue.size - logDebug("Decided to drop " + blockId) + if (maxMemory - currentMemory < space) { + + val selectedBlocks = new ArrayBuffer[String]() + var selectedMemory = 0L + + memoryStore.synchronized { + val iter = memoryStore.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) { + val pair = iter.next() + val blockId = pair.getKey + val entry = pair.getValue() + if (!entry.dropPending) { + selectedBlocks += blockId + } + selectedMemory += pair.getValue.size + logDebug("Block" + blockId + " selected for dropping") + } + } + + logDebug("" + selectedBlocks.size + " selected for dropping") + var i = 0 + while (i < selectedBlocks.size) { + blocksToDrop.add(selectedBlocks(i)) + i += 1 } - } - - for (blockId <- droppedBlockIds) { - drop(blockId) + selectedBlocks.clear() } - droppedBlockIds.clear() - } + } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 14ff5f8e3d..64d137c6c8 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -204,6 +204,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{ assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY) + Thread.sleep(100) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) -- GitLab