From 93c90844cb222f6cacfc65d9a66e11fc145d12e2 Mon Sep 17 00:00:00 2001 From: Aaron Davidson <aaron@databricks.com> Date: Fri, 1 Nov 2013 19:25:23 -0700 Subject: [PATCH] Never store shuffle blocks in BlockManager After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use within BlockManager (they had a no-arg constructor!). This patch completely eliminates them, saving us around 100-150 bytes per shuffle block. The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding state saved by the MapOutputTracker. --- .../org/apache/spark/storage/BlockInfo.scala | 18 +----------------- .../apache/spark/storage/BlockManager.scala | 10 ++-------- .../spark/storage/BlockObjectWriter.scala | 12 +----------- 3 files changed, 4 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala index dbe0bda615..c8f397609a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala @@ -19,9 +19,7 @@ package org.apache.spark.storage import java.util.concurrent.ConcurrentHashMap -private[storage] trait BlockInfo { - def level: StorageLevel - def tellMaster: Boolean +private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { // To save space, 'pending' and 'failed' are encoded as special sizes: @volatile var size: Long = BlockInfo.BLOCK_PENDING private def pending: Boolean = size == BlockInfo.BLOCK_PENDING @@ -81,17 +79,3 @@ private object BlockInfo { private val BLOCK_PENDING: Long = -1L private val BLOCK_FAILED: Long = -2L } - -// All shuffle blocks have the same `level` and `tellMaster` properties, -// so we can save space by not storing them in each instance: -private[storage] class ShuffleBlockInfo extends BlockInfo { - // These need to be defined using 'def' instead of 'val' in order for - // the compiler to eliminate the fields: - def level: StorageLevel = StorageLevel.DISK_ONLY - def tellMaster: Boolean = false -} - -private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) - extends BlockInfo { - // Intentionally left blank -} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fbedfbc446..a34c95b6f0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -465,13 +465,7 @@ private[spark] class BlockManager( def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) - writer.registerCloseEventHandler(() => { - val myInfo = new ShuffleBlockInfo() - blockInfo.put(blockId, myInfo) - myInfo.markReady(writer.fileSegment().length) - }) - writer + new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) } /** @@ -501,7 +495,7 @@ private[spark] class BlockManager( // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val myInfo = { - val tinfo = new BlockInfoImpl(level, tellMaster) + val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index e49c191c70..469e68fed7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} */ abstract class BlockObjectWriter(val blockId: BlockId) { - var closeEventHandler: () => Unit = _ - def open(): BlockObjectWriter - def close() { - closeEventHandler() - } + def close() def isOpen: Boolean - def registerCloseEventHandler(handler: () => Unit) { - closeEventHandler = handler - } - /** * Flush the partial writes and commit them as a single atomic block. Return the * number of bytes written for this commit. @@ -146,8 +138,6 @@ class DiskBlockObjectWriter( ts = null objOut = null } - // Invoke the close callback handler. - super.close() } override def isOpen: Boolean = objOut != null -- GitLab