Skip to content
Snippets Groups Projects
Commit 0426769f authored by Tathagata Das's avatar Tathagata Das
Browse files

Modified the block dropping code for better performance.

parent 6ad3e1f1
No related branches found
No related tags found
No related merge requests found
package spark.storage package spark.storage
import spark.{Utils, Logging, Serializer, SizeEstimator} import spark.{Utils, Logging, Serializer, SizeEstimator}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import java.io.{File, RandomAccessFile} import java.io.{File, RandomAccessFile}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode import java.nio.channels.FileChannel.MapMode
import java.util.{UUID, LinkedHashMap} import java.util.{UUID, LinkedHashMap}
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ConcurrentHashMap
import it.unimi.dsi.fastutil.io._ import it.unimi.dsi.fastutil.io._
import java.util.concurrent.ArrayBlockingQueue
/** /**
* Abstract class to store blocks * Abstract class to store blocks
...@@ -41,13 +40,28 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging { ...@@ -41,13 +40,28 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
class MemoryStore(blockManager: BlockManager, maxMemory: Long) class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) { extends BlockStore(blockManager) {
class Entry(var value: Any, val size: Long, val deserialized: Boolean) case class Entry(var value: Any, size: Long, deserialized: Boolean, dropPending: Boolean = false)
private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true) private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L private var currentMemory = 0L
private val blockDropper = Executors.newSingleThreadExecutor() //private val blockDropper = Executors.newSingleThreadExecutor()
private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
private val blockDropper = new Thread("memory store - block dropper") {
override def run() {
try{
while (true) {
val blockId = blocksToDrop.take()
blockManager.dropFromMemory(blockId)
}
} catch {
case ie: InterruptedException =>
logInfo("Shutting down block dropper")
}
}
}
blockDropper.start()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) { if (level.deserialized) {
bytes.rewind() bytes.rewind()
...@@ -124,41 +138,52 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -124,41 +138,52 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
memoryStore.synchronized { memoryStore.synchronized {
memoryStore.clear() memoryStore.clear()
} }
blockDropper.shutdown() //blockDropper.shutdown()
blockDropper.interrupt()
logInfo("MemoryStore cleared") logInfo("MemoryStore cleared")
} }
private def drop(blockId: String) { private def drop(blockId: String) {
blockDropper.submit(new Runnable() { /*blockDropper.submit(new Runnable() {
def run() { def run() {
blockManager.dropFromMemory(blockId) blockManager.dropFromMemory(blockId)
} }
}) })
*/
} }
private def ensureFreeSpace(space: Long) { private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory)) space, currentMemory, maxMemory))
val droppedBlockIds = new ArrayBuffer[String]() if (maxMemory - currentMemory < space) {
var droppedMemory = 0L
val selectedBlocks = new ArrayBuffer[String]()
memoryStore.synchronized { var selectedMemory = 0L
val iter = memoryStore.entrySet().iterator()
while (maxMemory - (currentMemory - droppedMemory) < space && iter.hasNext) { memoryStore.synchronized {
val pair = iter.next() val iter = memoryStore.entrySet().iterator()
val blockId = pair.getKey while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
droppedBlockIds += blockId val pair = iter.next()
droppedMemory += pair.getValue.size val blockId = pair.getKey
logDebug("Decided to drop " + blockId) val entry = pair.getValue()
if (!entry.dropPending) {
selectedBlocks += blockId
}
selectedMemory += pair.getValue.size
logDebug("Block" + blockId + " selected for dropping")
}
}
logDebug("" + selectedBlocks.size + " selected for dropping")
var i = 0
while (i < selectedBlocks.size) {
blocksToDrop.add(selectedBlocks(i))
i += 1
} }
} selectedBlocks.clear()
for (blockId <- droppedBlockIds) {
drop(blockId)
} }
droppedBlockIds.clear() }
}
} }
......
...@@ -204,6 +204,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{ ...@@ -204,6 +204,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
assert(store.get("list3").get.size === 2) assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out // Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY) store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY)
Thread.sleep(100)
assert(store.get("list1") === None, "list1 was in store") assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2) assert(store.get("list2").get.size === 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment