Skip to content
Snippets Groups Projects
Commit 3b348f90 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Improve log messages from BlockManager

parent 22b6b16e
No related branches found
No related tags found
No related merge requests found
......@@ -99,7 +99,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
private def initialize() {
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory, maxMemory))
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
}
......
......@@ -20,8 +20,7 @@ sealed trait ToBlockManagerMaster
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
maxDiskSize: Long)
maxMemSize: Long)
extends ToBlockManagerMaster
class HeartBeat(
......@@ -85,22 +84,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val maxDisk: Long) {
val maxMem: Long) {
private var lastSeenMs = timeMs
private var remainedMem = maxMem
private var remainedDisk = maxDisk
private var remainingMem = maxMem
private val blocks = new JHashMap[String, StorageLevel]
logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format(
blockManagerId.ip, blockManagerId.port, maxMem, maxDisk))
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
lastSeenMs = System.currentTimeMillis() / 1000
}
def updateBlockInfo(
blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized {
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
: Unit = synchronized {
updateLastSeenMs()
......@@ -109,10 +106,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val originalLevel: StorageLevel = blocks.get(blockId)
if (originalLevel.useMemory) {
remainedMem += memSize
}
if (originalLevel.useDisk) {
remainedDisk += diskSize
remainingMem += memSize
}
}
......@@ -120,26 +114,28 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// isValid means it is either stored in-memory or on-disk.
blocks.put(blockId, storageLevel)
if (storageLevel.useMemory) {
remainedMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem))
remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(remainingMem)))
}
if (storageLevel.useDisk) {
remainedDisk -= diskSize
logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
logInfo("Added %s on disk on %s:%d (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
} else if (blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val originalLevel: StorageLevel = blocks.get(blockId)
blocks.remove(blockId)
if (originalLevel.useMemory) {
logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk))
remainingMem += memSize
logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(remainingMem)))
}
if (originalLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
logInfo("Removed %s on %s:%d on disk (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
}
}
......@@ -149,15 +145,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
def getRemainedMem: Long = {
return remainedMem
}
def getRemainedDisk: Long = {
return remainedDisk
return remainingMem
}
override def toString: String = {
return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk
return "BlockManagerInfo " + timeMs + " " + remainingMem
}
def clear() {
......@@ -181,8 +173,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize, maxDiskSize) =>
register(blockManagerId, maxMemSize, maxDiskSize)
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
......@@ -210,7 +202,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
logInfo("Got unknown message: " + other)
}
private def register(blockManagerId: BlockManagerId, maxMemSize: Long, maxDiskSize: Long) {
private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
......@@ -218,7 +210,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
}
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
......
......@@ -39,8 +39,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
blockId, bytes.limit, (finishTime - startTime)))
logDebug("Block %s stored as %s file on disk in %d ms".format(
blockId, Utils.memoryBytesToString(bytes.limit), (finishTime - startTime)))
}
override def putValues(
......@@ -51,12 +51,16 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
: Either[Iterator[Any], ByteBuffer] = {
logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
val fileOut = blockManager.wrapForCompression(
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
blockId, Utils.memoryBytesToString(file.length()), (finishTime - startTime)))
if (returnValues) {
// Return a byte buffer for the contents of the file
......
......@@ -56,15 +56,15 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
} else {
val entry = new Entry(bytes, bytes.limit, false)
ensureFreeSpace(bytes.limit)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.limit, freeMemory))
logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
}
}
......@@ -83,8 +83,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
Left(elements.iterator)
} else {
val bytes = blockManager.dataSerialize(values)
......@@ -92,8 +92,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(bytes, bytes.limit, false)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.limit, freeMemory))
logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
Right(bytes)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment