diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 63d1d1767a8cb248a70d7967ebcaabd2334914bf..d47b75544fdba166206474ce0030f7d008ad93c9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -44,7 +44,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
     assertValid()
     val blockManager = SparkEnv.get.blockManager
     val blockId = split.asInstanceOf[BlockRDDPartition].blockId
-    blockManager.get(blockId) match {
+    blockManager.get[T](blockId) match {
       case Some(block) => block.data.asInstanceOf[Iterator[T]]
       case None =>
         throw new Exception("Could not compute split, block " + blockId + " not found")
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 7b1ec6fcbbbf61dcbcd54e6eb135c5c53bbf2af5..2156d576f18747d50897f53b3d6382878e47a33b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -180,11 +180,12 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
    * Deserializes an InputStream into an iterator of values and disposes of it when the end of
    * the iterator is reached.
    */
-  def dataDeserializeStream[T: ClassTag](
+  def dataDeserializeStream[T](
       blockId: BlockId,
-      inputStream: InputStream): Iterator[T] = {
+      inputStream: InputStream)
+      (classTag: ClassTag[T]): Iterator[T] = {
     val stream = new BufferedInputStream(inputStream)
-    getSerializer(implicitly[ClassTag[T]])
+    getSerializer(classTag)
       .newInstance()
       .deserializeStream(wrapStream(blockId, stream))
       .asIterator.asInstanceOf[Iterator[T]]
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c72f28e00cdbc5bc8dfb3d5b252137186caa801d..0614646771bd09d333cb2a89c564a2c491e094c8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
    *
    * This does not acquire a lock on this block in this JVM.
    */
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
+    val ct = implicitly[ClassTag[T]]
     getRemoteBytes(blockId).map { data =>
       val values =
-        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
+        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
       new BlockResult(values, DataReadMethod.Network, data.size)
     }
   }
@@ -602,13 +603,13 @@ private[spark] class BlockManager(
    * any locks if the block was fetched from a remote block manager. The read lock will
    * automatically be freed once the result's `data` iterator is fully consumed.
    */
-  def get(blockId: BlockId): Option[BlockResult] = {
+  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
     val local = getLocalValues(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
       return local
     }
-    val remote = getRemoteValues(blockId)
+    val remote = getRemoteValues[T](blockId)
     if (remote.isDefined) {
       logInfo(s"Found block $blockId remotely")
       return remote
@@ -660,7 +661,7 @@ private[spark] class BlockManager(
       makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
     // Attempt to read the block from local or remote storage. If it's present, then we don't need
     // to go through the local-get-or-put path.
-    get(blockId) match {
+    get[T](blockId)(classTag) match {
       case Some(block) =>
         return Left(block)
       case _ =>
@@ -1204,8 +1205,8 @@ private[spark] class BlockManager(
   /**
    * Read a block consisting of a single object.
    */
-  def getSingle(blockId: BlockId): Option[Any] = {
-    get(blockId).map(_.data.next())
+  def getSingle[T: ClassTag](blockId: BlockId): Option[T] = {
+    get[T](blockId).map(_.data.next().asInstanceOf[T])
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 4ee0e00fde50634e7892149b48463ef382d6def2..4e36adc8baf3fcc6adc6723bf1955a035ecb7c42 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -170,10 +170,12 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     blockManager.master.getLocations(blockId).foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
         blockId.toString)
-      val deserialized = serializerManager.dataDeserializeStream[Int](blockId,
-        new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream()).toList
+      val deserialized = serializerManager.dataDeserializeStream(blockId,
+        new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList
       assert(deserialized === (1 to 100).toList)
     }
+    // This will exercise the getRemoteBytes / getRemoteValues code paths:
+    assert(blockIds.flatMap(id => blockManager.get[Int](id).get.data).toSet === (1 to 1000).toSet)
   }
 
   Seq(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 53fccd8d5e6eddfdb454c6ae7023bf52067e820c..0b2ec298132ad09de1eb5490ff94edc161e0c69a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -120,7 +120,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
     val blockId = partition.blockId
 
     def getBlockFromBlockManager(): Option[Iterator[T]] = {
-      blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+      blockManager.get[T](blockId).map(_.data.asInstanceOf[Iterator[T]])
     }
 
     def getBlockFromWriteAheadLog(): Iterator[T] = {
@@ -163,7 +163,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
         dataRead.rewind()
       }
       serializerManager
-        .dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream())
+        .dataDeserializeStream(
+          blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag)
         .asInstanceOf[Iterator[T]]
     }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index feb5c30c6aa14f7fb983f143d1e8f8898c5a0c2b..7e665454a540045e54c2e29cf87b77cdd8823307 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.{BeforeAndAfter, Matchers}
@@ -163,7 +164,7 @@ class ReceivedBlockHandlerSuite
           val bytes = reader.read(fileSegment)
           reader.close()
           serializerManager.dataDeserializeStream(
-            generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList
+            generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList
         }
         loggedData shouldEqual data
       }