Skip to content
Snippets Groups Projects
Commit 93c90844 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Never store shuffle blocks in BlockManager

After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.
parent 7a26104a
No related branches found
No related tags found
No related merge requests found
......@@ -19,9 +19,7 @@ package org.apache.spark.storage
import java.util.concurrent.ConcurrentHashMap
private[storage] trait BlockInfo {
def level: StorageLevel
def tellMaster: Boolean
private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
// To save space, 'pending' and 'failed' are encoded as special sizes:
@volatile var size: Long = BlockInfo.BLOCK_PENDING
private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
......@@ -81,17 +79,3 @@ private object BlockInfo {
private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L
}
// All shuffle blocks have the same `level` and `tellMaster` properties,
// so we can save space by not storing them in each instance:
private[storage] class ShuffleBlockInfo extends BlockInfo {
// These need to be defined using 'def' instead of 'val' in order for
// the compiler to eliminate the fields:
def level: StorageLevel = StorageLevel.DISK_ONLY
def tellMaster: Boolean = false
}
private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
extends BlockInfo {
// Intentionally left blank
}
\ No newline at end of file
......@@ -465,13 +465,7 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
})
writer
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
}
/**
......@@ -501,7 +495,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
val tinfo = new BlockInfoImpl(level, tellMaster)
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
......
......@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*/
abstract class BlockObjectWriter(val blockId: BlockId) {
var closeEventHandler: () => Unit = _
def open(): BlockObjectWriter
def close() {
closeEventHandler()
}
def close()
def isOpen: Boolean
def registerCloseEventHandler(handler: () => Unit) {
closeEventHandler = handler
}
/**
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
......@@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
ts = null
objOut = null
}
// Invoke the close callback handler.
super.close()
}
override def isOpen: Boolean = objOut != null
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment