Skip to content
Snippets Groups Projects
Commit 81065321 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #139 from aarondav/shuffle-next

Never store shuffle blocks in BlockManager

After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.

Note: This should *not* be merged directly into 0.8.0 -- see #138
parents 0b26a392 93c90844
No related branches found
No related tags found
No related merge requests found
...@@ -19,9 +19,7 @@ package org.apache.spark.storage ...@@ -19,9 +19,7 @@ package org.apache.spark.storage
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
private[storage] trait BlockInfo { private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
def level: StorageLevel
def tellMaster: Boolean
// To save space, 'pending' and 'failed' are encoded as special sizes: // To save space, 'pending' and 'failed' are encoded as special sizes:
@volatile var size: Long = BlockInfo.BLOCK_PENDING @volatile var size: Long = BlockInfo.BLOCK_PENDING
private def pending: Boolean = size == BlockInfo.BLOCK_PENDING private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
...@@ -81,17 +79,3 @@ private object BlockInfo { ...@@ -81,17 +79,3 @@ private object BlockInfo {
private val BLOCK_PENDING: Long = -1L private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L private val BLOCK_FAILED: Long = -2L
} }
// All shuffle blocks have the same `level` and `tellMaster` properties,
// so we can save space by not storing them in each instance:
private[storage] class ShuffleBlockInfo extends BlockInfo {
// These need to be defined using 'def' instead of 'val' in order for
// the compiler to eliminate the fields:
def level: StorageLevel = StorageLevel.DISK_ONLY
def tellMaster: Boolean = false
}
private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
extends BlockInfo {
// Intentionally left blank
}
\ No newline at end of file
...@@ -465,13 +465,7 @@ private[spark] class BlockManager( ...@@ -465,13 +465,7 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = { : BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
})
writer
} }
/** /**
...@@ -501,7 +495,7 @@ private[spark] class BlockManager( ...@@ -501,7 +495,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will // to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo. // not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = { val myInfo = {
val tinfo = new BlockInfoImpl(level, tellMaster) val tinfo = new BlockInfo(level, tellMaster)
// Do atomically ! // Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
......
...@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} ...@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*/ */
abstract class BlockObjectWriter(val blockId: BlockId) { abstract class BlockObjectWriter(val blockId: BlockId) {
var closeEventHandler: () => Unit = _
def open(): BlockObjectWriter def open(): BlockObjectWriter
def close() { def close()
closeEventHandler()
}
def isOpen: Boolean def isOpen: Boolean
def registerCloseEventHandler(handler: () => Unit) {
closeEventHandler = handler
}
/** /**
* Flush the partial writes and commit them as a single atomic block. Return the * Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit. * number of bytes written for this commit.
...@@ -146,8 +138,6 @@ class DiskBlockObjectWriter( ...@@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
ts = null ts = null
objOut = null objOut = null
} }
// Invoke the close callback handler.
super.close()
} }
override def isOpen: Boolean = objOut != null override def isOpen: Boolean = objOut != null
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment