Skip to content
Snippets Groups Projects
Commit d0f0fc8c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #302 from tdas/blockmanager-fix

Blockmanager fix
parents 6607f546 9915989b
No related branches found
No related tags found
No related merge requests found
...@@ -50,16 +50,6 @@ private[spark] ...@@ -50,16 +50,6 @@ private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null) case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message) extends Exception(message)
private[spark] class BlockLocker(numLockers: Int) {
private val hashLocker = Array.fill(numLockers)(new Object())
def getLock(blockId: String): Object = {
return hashLocker(math.abs(blockId.hashCode % numLockers))
}
}
private[spark] private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging { extends Logging {
...@@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
} }
private val NUM_LOCKS = 337 private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
private val locker = new BlockLocker(NUM_LOCKS)
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore = private[storage] val diskStore: BlockStore =
...@@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight = val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized // Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
...@@ -150,28 +139,28 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -150,28 +139,28 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/ */
def reportBlockStatus(blockId: String) { def reportBlockStatus(blockId: String) {
locker.getLock(blockId).synchronized {
val curLevel = blockInfo.get(blockId) match { val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null => case null =>
StorageLevel.NONE (StorageLevel.NONE, 0L, 0L)
case info => case info =>
info.synchronized {
info.level match { info.level match {
case null => case null =>
StorageLevel.NONE (StorageLevel.NONE, 0L, 0L)
case level => case level =>
val inMem = level.useMemory && memoryStore.contains(blockId) val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId)
new StorageLevel(onDisk, inMem, level.deserialized, level.replication) (
new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
if (inMem) memoryStore.getSize(blockId) else 0L,
if (onDisk) diskStore.getSize(blockId) else 0L
)
} }
} }
master.mustHeartBeat(HeartBeat(
blockManagerId,
blockId,
curLevel,
if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
logDebug("Told master about block " + blockId)
} }
master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
logDebug("Told master about block " + blockId)
} }
/** /**
...@@ -213,9 +202,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -213,9 +202,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
} }
locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId) if (info != null) {
if (info != null) { info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread info.waitForReady() // In case the block is still being put() by another thread
val level = info.level val level = info.level
logDebug("Level for block " + blockId + " is " + level) logDebug("Level for block " + blockId + " is " + level)
...@@ -273,9 +262,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -273,9 +262,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
} }
} }
} else {
logDebug("Block " + blockId + " not registered locally")
} }
} else {
logDebug("Block " + blockId + " not registered locally")
} }
return None return None
} }
...@@ -298,9 +287,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -298,9 +287,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
} }
locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId) if (info != null) {
if (info != null) { info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread info.waitForReady() // In case the block is still being put() by another thread
val level = info.level val level = info.level
logDebug("Level for block " + blockId + " is " + level) logDebug("Level for block " + blockId + " is " + level)
...@@ -338,9 +327,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -338,9 +327,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new Exception("Block " + blockId + " not found on disk, though it should be") throw new Exception("Block " + blockId + " not found on disk, though it should be")
} }
} }
} else {
logDebug("Block " + blockId + " not registered locally")
} }
} else {
logDebug("Block " + blockId + " not registered locally")
} }
return None return None
} }
...@@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Size of the block in bytes (to return to caller) // Size of the block in bytes (to return to caller)
var size = 0L var size = 0L
locker.getLock(blockId).synchronized { myInfo.synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block") + " to get into synchronized block")
...@@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
null null
} }
locker.getLock(blockId).synchronized { myInfo.synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block") + " to get into synchronized block")
...@@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m ...@@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/ */
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory") logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId) if (info != null) {
val level = info.level info.synchronized {
if (level.useDisk && !diskStore.contains(blockId)) { val level = info.level
logInfo("Writing block " + blockId + " to disk") if (level.useDisk && !diskStore.contains(blockId)) {
data match { logInfo("Writing block " + blockId + " to disk")
case Left(elements) => data match {
diskStore.putValues(blockId, elements, level, false) case Left(elements) =>
case Right(bytes) => diskStore.putValues(blockId, elements, level, false)
diskStore.putBytes(blockId, bytes, level) case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
}
memoryStore.remove(blockId)
if (info.tellMaster) {
reportBlockStatus(blockId)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
} }
} }
memoryStore.remove(blockId) } else {
if (info.tellMaster) { // The block has already been dropped
reportBlockStatus(blockId)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
} }
} }
......
...@@ -18,12 +18,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -18,12 +18,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = { override def getSize(blockId: String): Long = {
synchronized { entries.synchronized {
entries.get(blockId).size entries.get(blockId).size
} }
} }
...@@ -60,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -60,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def getBytes(blockId: String): Option[ByteBuffer] = { override def getBytes(blockId: String): Option[ByteBuffer] = {
val entry = synchronized { val entry = entries.synchronized {
entries.get(blockId) entries.get(blockId)
} }
if (entry == null) { if (entry == null) {
...@@ -73,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -73,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def getValues(blockId: String): Option[Iterator[Any]] = { override def getValues(blockId: String): Option[Iterator[Any]] = {
val entry = synchronized { val entry = entries.synchronized {
entries.get(blockId) entries.get(blockId)
} }
if (entry == null) { if (entry == null) {
...@@ -87,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -87,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def remove(blockId: String) { override def remove(blockId: String) {
synchronized { entries.synchronized {
val entry = entries.get(blockId) val entry = entries.get(blockId)
if (entry != null) { if (entry != null) {
entries.remove(blockId) entries.remove(blockId)
...@@ -101,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -101,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def clear() { override def clear() {
synchronized { entries.synchronized {
entries.clear() entries.clear()
} }
logInfo("MemoryStore cleared") logInfo("MemoryStore cleared")
...@@ -122,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -122,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Try to put in a set of values, if we can free up enough space. The value should either be * Try to put in a set of values, if we can free up enough space. The value should either be
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller. * size must also be passed by the caller.
*
* Locks on the object putLock to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
*/ */
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = { private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
synchronized { // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
// to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
// released, it must be ensured that those to-be-dropped blocks are not double counted for
// freeing up more space for another block that needs to be put. Only then the actually dropping
// of blocks (and writing to disk if necessary) can proceed in parallel.
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) { if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized) val entry = new Entry(value, size, deserialized)
entries.put(blockId, entry) entries.synchronized { entries.put(blockId, entry) }
currentMemory += size currentMemory += size
if (deserialized) { if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
...@@ -157,10 +171,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -157,10 +171,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid). * don't fit into memory that we want to avoid).
* *
* Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
* might fill up before the caller puts in their new value.) * Otherwise, the freed space may fill up before the caller puts in their new value.
*/ */
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = { private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory)) space, currentMemory, maxMemory))
...@@ -169,36 +184,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -169,36 +184,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false return false
} }
// TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
// in order to allow parallelism in writing to disk
if (maxMemory - currentMemory < space) { if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd) val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]() val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L var selectedMemory = 0L
val iterator = entries.entrySet().iterator() // This is synchronized to ensure that the set of entries is not changed
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { // (because of getValue or getBytes) while traversing the iterator, as that
val pair = iterator.next() // can lead to exceptions.
val blockId = pair.getKey entries.synchronized {
if (rddToAdd != null && rddToAdd == getRddId(blockId)) { val iterator = entries.entrySet().iterator()
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
"block from the same RDD") val pair = iterator.next()
return false val blockId = pair.getKey
if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
"block from the same RDD")
return false
}
selectedBlocks += blockId
selectedMemory += pair.getValue.size
} }
selectedBlocks += blockId
selectedMemory += pair.getValue.size
} }
if (maxMemory - (currentMemory - selectedMemory) >= space) { if (maxMemory - (currentMemory - selectedMemory) >= space) {
logInfo(selectedBlocks.size + " blocks selected for dropping") logInfo(selectedBlocks.size + " blocks selected for dropping")
for (blockId <- selectedBlocks) { for (blockId <- selectedBlocks) {
val entry = entries.get(blockId) val entry = entries.synchronized { entries.get(blockId) }
val data = if (entry.deserialized) { // This should never be null as only one thread should be dropping
Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) // blocks and removing entries. However the check is still here for
} else { // future safety.
Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
blockManager.dropFromMemory(blockId, data)
} }
blockManager.dropFromMemory(blockId, data)
} }
return true return true
} else { } else {
...@@ -209,7 +232,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -209,7 +232,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def contains(blockId: String): Boolean = { override def contains(blockId: String): Boolean = {
synchronized { entries.containsKey(blockId) } entries.synchronized { entries.containsKey(blockId) }
} }
} }
package spark.storage
import akka.actor._
import spark.KryoSerializer
import java.util.concurrent.ArrayBlockingQueue
import util.Random
/**
* This class tests the BlockManager and MemoryStore for thread safety and
* deadlocks. It spawns a number of producer and consumer threads. Producer
* threads continuously pushes blocks into the BlockManager and consumer
* threads continuously retrieves the blocks form the BlockManager and tests
* whether the block is correct or not.
*/
private[spark] object ThreadingTest {
val numProducers = 5
val numBlocksPerProducer = 20000
private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
override def run() {
for (i <- 1 to numBlocksPerProducer) {
val blockId = "b-" + id + "-" + i
val blockSize = Random.nextInt(1000)
val block = (1 to blockSize).map(_ => Random.nextInt())
val level = randomLevel()
val startTime = System.currentTimeMillis()
manager.put(blockId, block.iterator, level, true)
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
queue.add((blockId, block))
}
println("Producer thread " + id + " terminated")
}
def randomLevel(): StorageLevel = {
math.abs(Random.nextInt()) % 4 match {
case 0 => StorageLevel.MEMORY_ONLY
case 1 => StorageLevel.MEMORY_ONLY_SER
case 2 => StorageLevel.MEMORY_AND_DISK
case 3 => StorageLevel.MEMORY_AND_DISK_SER
}
}
}
private[spark] class ConsumerThread(
manager: BlockManager,
queue: ArrayBlockingQueue[(String, Seq[Int])]
) extends Thread {
var numBlockConsumed = 0
override def run() {
println("Consumer thread started")
while(numBlockConsumed < numBlocksPerProducer) {
val (blockId, block) = queue.take()
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
case None =>
assert(false, "Block " + blockId + " could not be retrieved")
}
numBlockConsumed += 1
}
println("Consumer thread terminated")
}
}
def main(args: Array[String]) {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
consumers.foreach(_.start)
producers.foreach(_.join)
consumers.foreach(_.join)
blockManager.stop()
blockManagerMaster.stop()
actorSystem.shutdown()
actorSystem.awaitTermination()
println("Everything stopped.")
println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment