diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index a5ec5e2fb182cb5b7bce8e8276ba7adefd021990..299162f12c1dfe03ee34da32dfef666196ad3666 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -36,7 +36,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
 
   @Override
   public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
-    BlockId blockId = BlockId.fromString(blockIdString);
+    BlockId blockId = BlockId.apply(blockIdString);
     String path = pResolver.getAbsolutePath(blockId.filename());
     // if getFilePath returns null, close the channel
     if (path == null) {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index d8cd2355c1647d89368ad86bb4956db060c52ebd..9ade37382370a927174c89419b0761233c332e88 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -58,7 +58,7 @@ private[spark] object FileHeader {
     for (i <- 1 to idLength) {
       idBuilder += buf.readByte().asInstanceOf[Char]
     }
-    val blockId = BlockId.fromString(idBuilder.toString())
+    val blockId = BlockId(idBuilder.toString())
     new FileHeader(length, blockId)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index de3cab4e78695f1aaaed30451647a968233ccc79..bb1c8ac20e7ace261011cc74df3ee34b18e772c1 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -100,7 +100,7 @@ private[spark] object ShuffleCopier extends Logging {
     }
     val host = args(0)
     val port = args(1).toInt
-    val blockId = BlockId.fromString(args(2))
+    val blockId = BlockId(args(2))
     val threads = if (args.length > 3) args(3).toInt else 10
 
     val copiers = Executors.newFixedThreadPool(80)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index b6062bc3470d6b3b63d116bb04b07a387e1be706..611a44e5b90bdf10f102127c92308a191c6d5a62 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -55,7 +55,7 @@ private[spark] object ShuffleSender {
 
     val pResovler = new PathResolver {
       override def getAbsolutePath(blockIdString: String): String = {
-        val blockId = BlockId.fromString(blockIdString)
+        val blockId = BlockId(blockIdString)
         if (!blockId.isShuffle) {
           throw new Exception("Block " + blockId + " is not a shuffle block")
         }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 62b4e37db64c1302806cbbaa6f90926b1475e9b5..0fc212d8f97736c7d54d75a6ca968b2ab0c13455 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -42,28 +42,29 @@ private[spark] abstract class BlockId {
   }
 }
 
-case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
+private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
   def filename = "rdd_" + rddId + "_" + splitIndex
 }
 
+private[spark]
 case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
   def filename = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
 }
 
-case class BroadcastBlockId(broadcastId: Long) extends BlockId {
+private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId {
   def filename = "broadcast_" + broadcastId
 }
 
-case class TaskResultBlockId(taskId: Long) extends BlockId {
+private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
   def filename = "taskresult_" + taskId
 }
 
-case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
+private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
   def filename = "input-" + streamId + "-" + uniqueId
 }
 
 // Intended only for testing purposes
-case class TestBlockId(id: String) extends BlockId {
+private[spark] case class TestBlockId(id: String) extends BlockId {
   def filename = "test_" + id
 }
 
@@ -76,7 +77,7 @@ private[spark] object BlockId {
   val StreamInput = "input-([0-9]+)-([0-9]+)".r
   val Test = "test_(.*)".r
 
-  def fromString(id: String) = id match {
+  def apply(id: String) = id match {
     case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
     case Shuffle(shuffleId, mapId, reduceId) =>
       ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index b0832fd28e87a5263d510723959f081a346615e9..cc0c46ec161265fe4911e9d9c11cdb223c6ff86a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -70,7 +70,7 @@ private[storage] object BlockManagerMessages {
 
     override def readExternal(in: ObjectInput) {
       blockManagerId = BlockManagerId(in)
-      blockId = BlockId.fromString(in.readUTF())
+      blockId = BlockId(in.readUTF())
       storageLevel = StorageLevel(in)
       memSize = in.readLong()
       diskSize = in.readLong()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
index 8ccda83890a235629ef8eb0ecce512691314d439..7e8ee2486e1fbaf597e06214cba3f1617a705b43 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
@@ -74,7 +74,7 @@ private[spark] class BlockMessage() {
     for (i <- 1 to idLength) {
       idBuilder += buffer.getChar()
     }
-    id = BlockId.fromString(idBuilder.toString)
+    id = BlockId(idBuilder.toString)
     
     if (typ == BlockMessage.TYPE_PUT_BLOCK) {
 
@@ -117,7 +117,7 @@ private[spark] class BlockMessage() {
   def toBufferMessage: BufferMessage = {
     val startTime = System.currentTimeMillis
     val buffers = new ArrayBuffer[ByteBuffer]()
-    var buffer = ByteBuffer.allocate(4 + 4 + id.filename.length * 2) // TODO: Why x2?
+    var buffer = ByteBuffer.allocate(4 + 4 + id.filename.length * 2)
     buffer.putInt(typ).putInt(id.filename.length)
     id.filename.foreach((x: Char) => buffer.putChar(x))
     buffer.flip()
@@ -201,8 +201,8 @@ private[spark] object BlockMessage {
 
   def main(args: Array[String]) {
     val B = new BlockMessage()
-    B.set(new PutBlock(
-      new TestBlockId("ABC"), ByteBuffer.allocate(10), StorageLevel.MEMORY_AND_DISK_SER_2))
+    val blockId = TestBlockId("ABC")
+    B.set(new PutBlock(blockId, ByteBuffer.allocate(10), StorageLevel.MEMORY_AND_DISK_SER_2))
     val bMsg = B.toBufferMessage
     val C = new BlockMessage()
     C.set(bMsg)
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 9e3ee8765433365bd5c1dcbbff55eb80be22ff10..4200935d9384adb364b7217ccad9b621255931b0 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -316,7 +316,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
   private[storage] def startShuffleBlockSender(port: Int): Int = {
     val pResolver = new PathResolver {
       override def getAbsolutePath(blockIdString: String): String = {
-        val blockId = BlockId.fromString(blockIdString)
+        val blockId = BlockId(blockIdString)
         if (!blockId.isShuffle) null
         else DiskStore.this.getFile(blockId).getAbsolutePath
       }
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 8358596861ce1aa336d5f83edb3a7582a2384ba7..1720007e4e70b6aff4685aacc8459a007343afc1 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -25,15 +25,15 @@ private[spark]
 case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
   blocks: Map[BlockId, BlockStatus]) {
 
-  def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0l)
+  def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
 
   def memUsedByRDD(rddId: Int) =
-    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0l)
+    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
 
-  def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0l)
+  def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
 
   def diskUsedByRDD(rddId: Int) =
-    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0l)
+    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
 
   def memRemaining : Long = maxMem - memUsed()
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..538482f6ff8317947586a6144da8dad759c5232e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+
+class BlockIdSuite extends FunSuite {
+  def assertSame(id1: BlockId, id2: BlockId) {
+    assert(id1.filename === id2.filename)
+    assert(id1.toString === id2.toString)
+    assert(id1.hashCode === id2.hashCode)
+    assert(id1 === id2)
+  }
+
+  def assertDifferent(id1: BlockId, id2: BlockId) {
+    assert(id1.filename != id2.filename)
+    assert(id1.toString != id2.toString)
+    assert(id1.hashCode != id2.hashCode)
+    assert(id1 != id2)
+  }
+
+  test("basic-functions") {
+    case class MyBlockId(filename: String) extends BlockId
+
+    val id = MyBlockId("a")
+    assertSame(id, MyBlockId("a"))
+    assertDifferent(id, MyBlockId("b"))
+    assert(id.asRDDId === None)
+
+    try {
+      // Try to deserialize an invalid block id.
+      BlockId("a")
+      fail()
+    } catch {
+      case e: IllegalStateException => // OK
+      case _ => fail()
+    }
+  }
+
+  test("rdd") {
+    val id = RDDBlockId(1, 2)
+    assertSame(id, RDDBlockId(1, 2))
+    assertDifferent(id, RDDBlockId(1, 1))
+    assert(id.toString === "rdd_1_2")
+    assert(id.asRDDId.get.rddId === 1)
+    assert(id.asRDDId.get.splitIndex === 2)
+    assert(id.isRDD)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("shuffle") {
+    val id = ShuffleBlockId(1, 2, 3)
+    assertSame(id, ShuffleBlockId(1, 2, 3))
+    assertDifferent(id, ShuffleBlockId(3, 2, 3))
+    assert(id.toString === "shuffle_1_2_3")
+    assert(id.asRDDId === None)
+    assert(id.shuffleId === 1)
+    assert(id.mapId === 2)
+    assert(id.reduceId === 3)
+    assert(id.isShuffle)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("broadcast") {
+    val id = BroadcastBlockId(42)
+    assertSame(id, BroadcastBlockId(42))
+    assertDifferent(id, BroadcastBlockId(123))
+    assert(id.toString === "broadcast_42")
+    assert(id.asRDDId === None)
+    assert(id.broadcastId === 42)
+    assert(id.isBroadcast)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("taskresult") {
+    val id = TaskResultBlockId(60)
+    assertSame(id, TaskResultBlockId(60))
+    assertDifferent(id, TaskResultBlockId(61))
+    assert(id.toString === "taskresult_60")
+    assert(id.asRDDId === None)
+    assert(id.taskId === 60)
+    assert(!id.isRDD)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("stream") {
+    val id = StreamBlockId(1, 100)
+    assertSame(id, StreamBlockId(1, 100))
+    assertDifferent(id, StreamBlockId(2, 101))
+    assert(id.toString === "input-1-100")
+    assert(id.asRDDId === None)
+    assert(id.streamId === 1)
+    assert(id.uniqueId === 100)
+    assert(!id.isBroadcast)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("test") {
+    val id = TestBlockId("abc")
+    assertSame(id, TestBlockId("abc"))
+    assertDifferent(id, TestBlockId("ab"))
+    assert(id.toString === "test_abc")
+    assert(id.asRDDId === None)
+    assert(id.id === "abc")
+    assert(!id.isShuffle)
+    assertSame(id, BlockId(id.toString))
+  }
+}