diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index a15800917c6f4631ae341ae5e07c957e46d11c6c..6c139f32da31dcd3fe9536ac1d2215d24cd58061 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -116,7 +116,15 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
             logWarning("Some blocks have Write Ahead Log information; this is unexpected")
           }
         }
-        new BlockRDD[T](ssc.sc, blockIds)
+        val validBlockIds = blockIds.filter { id =>
+          ssc.sparkContext.env.blockManager.master.contains(id)
+        }
+        if (validBlockIds.size != blockIds.size) {
+          logWarning("Some blocks could not be recovered as they were not found in memory. " +
+            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
+            "for more details.")
+        }
+        new BlockRDD[T](ssc.sc, validBlockIds)
       }
     } else {
       // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 620b8a36a2baf2ae37da782261936e44b9a11171..e081ffe46f50206c2c7e477a32d0a13fe74f6903 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -75,7 +75,7 @@ private[streaming]
 class WriteAheadLogBackedBlockRDD[T: ClassTag](
     @transient sc: SparkContext,
     @transient blockIds: Array[BlockId],
-    @transient walRecordHandles: Array[WriteAheadLogRecordHandle],
+    @transient val walRecordHandles: Array[WriteAheadLogRecordHandle],
     @transient isBlockIdValid: Array[Boolean] = Array.empty,
     storeInBlockManager: Boolean = false,
     storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6d388d9624d924a0148335489b5d0205ddc0a627
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
+import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult, Receiver, WriteAheadLogBasedStoreResult}
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
+import org.apache.spark.{SparkConf, SparkEnv}
+
+class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll {
+
+  override def afterAll(): Unit = {
+    StreamingContext.getActive().map { _.stop() }
+  }
+
+  testWithoutWAL("createBlockRDD creates empty BlockRDD when no block info") { receiverStream =>
+    val rdd = receiverStream.createBlockRDD(Time(0), Seq.empty)
+    assert(rdd.isInstanceOf[BlockRDD[_]])
+    assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
+    assert(rdd.isEmpty())
+  }
+
+  testWithoutWAL("createBlockRDD creates correct BlockRDD with block info") { receiverStream =>
+    val blockInfos = Seq.fill(5) { createBlockInfo(withWALInfo = false) }
+    val blockIds = blockInfos.map(_.blockId)
+
+    // Verify that there are some blocks that are present, and some that are not
+    require(blockIds.forall(blockId => SparkEnv.get.blockManager.master.contains(blockId)))
+
+    val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
+    assert(rdd.isInstanceOf[BlockRDD[_]])
+    assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
+    val blockRDD = rdd.asInstanceOf[BlockRDD[_]]
+    assert(blockRDD.blockIds.toSeq === blockIds)
+  }
+
+  testWithoutWAL("createBlockRDD filters non-existent blocks before creating BlockRDD") {
+    receiverStream =>
+      val presentBlockInfos = Seq.fill(2)(createBlockInfo(withWALInfo = false, createBlock = true))
+      val absentBlockInfos = Seq.fill(3)(createBlockInfo(withWALInfo = false, createBlock = false))
+      val blockInfos = presentBlockInfos ++ absentBlockInfos
+      val blockIds = blockInfos.map(_.blockId)
+
+      // Verify that there are some blocks that are present, and some that are not
+      require(blockIds.exists(blockId => SparkEnv.get.blockManager.master.contains(blockId)))
+      require(blockIds.exists(blockId => !SparkEnv.get.blockManager.master.contains(blockId)))
+
+      val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
+      assert(rdd.isInstanceOf[BlockRDD[_]])
+      val blockRDD = rdd.asInstanceOf[BlockRDD[_]]
+      assert(blockRDD.blockIds.toSeq === presentBlockInfos.map { _.blockId})
+  }
+
+  testWithWAL("createBlockRDD creates empty WALBackedBlockRDD when no block info") {
+    receiverStream =>
+      val rdd = receiverStream.createBlockRDD(Time(0), Seq.empty)
+      assert(rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
+      assert(rdd.isEmpty())
+  }
+
+  testWithWAL(
+    "createBlockRDD creates correct WALBackedBlockRDD with all block info having WAL info") {
+    receiverStream =>
+      val blockInfos = Seq.fill(5) { createBlockInfo(withWALInfo = true) }
+      val blockIds = blockInfos.map(_.blockId)
+      val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
+      assert(rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
+      val blockRDD = rdd.asInstanceOf[WriteAheadLogBackedBlockRDD[_]]
+      assert(blockRDD.blockIds.toSeq === blockIds)
+      assert(blockRDD.walRecordHandles.toSeq === blockInfos.map { _.walRecordHandleOption.get })
+  }
+
+  testWithWAL("createBlockRDD creates BlockRDD when some block info dont have WAL info") {
+    receiverStream =>
+      val blockInfos1 = Seq.fill(2) { createBlockInfo(withWALInfo = true) }
+      val blockInfos2 = Seq.fill(3) { createBlockInfo(withWALInfo = false) }
+      val blockInfos = blockInfos1 ++ blockInfos2
+      val blockIds = blockInfos.map(_.blockId)
+      val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
+      assert(rdd.isInstanceOf[BlockRDD[_]])
+      val blockRDD = rdd.asInstanceOf[BlockRDD[_]]
+      assert(blockRDD.blockIds.toSeq === blockIds)
+  }
+
+
+  private def testWithoutWAL(msg: String)(body: ReceiverInputDStream[_] => Unit): Unit = {
+    test(s"Without WAL enabled: $msg") {
+      runTest(enableWAL = false, body)
+    }
+  }
+
+  private def testWithWAL(msg: String)(body: ReceiverInputDStream[_] => Unit): Unit = {
+    test(s"With WAL enabled: $msg") {
+      runTest(enableWAL = true, body)
+    }
+  }
+
+  private def runTest(enableWAL: Boolean, body: ReceiverInputDStream[_] => Unit): Unit = {
+    val conf = new SparkConf()
+    conf.setMaster("local[4]").setAppName("ReceiverInputDStreamSuite")
+    conf.set(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY, enableWAL.toString)
+    require(WriteAheadLogUtils.enableReceiverLog(conf) === enableWAL)
+    val ssc = new StreamingContext(conf, Seconds(1))
+    val receiverStream = new ReceiverInputDStream[Int](ssc) {
+      override def getReceiver(): Receiver[Int] = null
+    }
+    withStreamingContext(ssc) { ssc =>
+      body(receiverStream)
+    }
+  }
+
+  /**
+   * Create a block info for input to the ReceiverInputDStream.createBlockRDD
+   * @param withWALInfo Create block with WAL info in it
+   * @param createBlock Actually create the block in the BlockManager
+   * @return
+   */
+  private def createBlockInfo(
+      withWALInfo: Boolean,
+      createBlock: Boolean = true): ReceivedBlockInfo = {
+    val blockId = new StreamBlockId(0, Random.nextLong())
+    if (createBlock) {
+      SparkEnv.get.blockManager.putSingle(blockId, 1, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      require(SparkEnv.get.blockManager.master.contains(blockId))
+    }
+    val storeResult = if (withWALInfo) {
+      new WriteAheadLogBasedStoreResult(blockId, None, new WriteAheadLogRecordHandle { })
+    } else {
+      new BlockManagerBasedStoreResult(blockId, None)
+    }
+    new ReceivedBlockInfo(0, None, None, storeResult)
+  }
+}