diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
index dbe0bda61589ca6a2d8e138113e48e8acfdaa392..c8f397609a0b473fc6c8557b3edf7f155509b2bb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
 
 import java.util.concurrent.ConcurrentHashMap
 
-private[storage] trait BlockInfo {
-  def level: StorageLevel
-  def tellMaster: Boolean
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
   // To save space, 'pending' and 'failed' are encoded as special sizes:
   @volatile var size: Long = BlockInfo.BLOCK_PENDING
   private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
@@ -81,17 +79,3 @@ private object BlockInfo {
   private val BLOCK_PENDING: Long = -1L
   private val BLOCK_FAILED: Long = -2L
 }
-
-// All shuffle blocks have the same `level` and `tellMaster` properties,
-// so we can save space by not storing them in each instance:
-private[storage] class ShuffleBlockInfo extends BlockInfo {
-  // These need to be defined using 'def' instead of 'val' in order for
-  // the compiler to eliminate the fields:
-  def level: StorageLevel = StorageLevel.DISK_ONLY
-  def tellMaster: Boolean = false
-}
-
-private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
-  extends BlockInfo {
-  // Intentionally left blank
-}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index fbedfbc4460217b6ffa103b172f88979be656d4a..a34c95b6f07b67e63fc8099ba79d695845a1b097 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -465,13 +465,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
-    writer.registerCloseEventHandler(() => {
-      val myInfo = new ShuffleBlockInfo()
-      blockInfo.put(blockId, myInfo)
-      myInfo.markReady(writer.fileSegment().length)
-    })
-    writer
+    new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
   }
 
   /**
@@ -501,7 +495,7 @@ private[spark] class BlockManager(
     // to be dropped right after it got put into memory. Note, however, that other threads will
     // not be able to get() this block until we call markReady on its BlockInfo.
     val myInfo = {
-      val tinfo = new BlockInfoImpl(level, tellMaster)
+      val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index e49c191c70a1176e9ac0a25994343f4556bcb8ac..469e68fed74bb4effb3aa9efa11157859d047084 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
  */
 abstract class BlockObjectWriter(val blockId: BlockId) {
 
-  var closeEventHandler: () => Unit = _
-
   def open(): BlockObjectWriter
 
-  def close() {
-    closeEventHandler()
-  }
+  def close()
 
   def isOpen: Boolean
 
-  def registerCloseEventHandler(handler: () => Unit) {
-    closeEventHandler = handler
-  }
-
   /**
    * Flush the partial writes and commit them as a single atomic block. Return the
    * number of bytes written for this commit.
@@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
       ts = null
       objOut = null
     }
-    // Invoke the close callback handler.
-    super.close()
   }
 
   override def isOpen: Boolean = objOut != null