diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 3985e1a3d9dfaade1f1af818c7d0746b87a3a6d4..27024ecfd9101051d99ac1c7bb354ddccddab2fd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -321,7 +321,7 @@ object CheckpointReader extends Logging {
 
     // Try to read the checkpoint files in the order
     logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
-    val compressionCodec = CompressionCodec.createCodec(conf)
+    var readError: Exception = null
     checkpointFiles.foreach(file => {
       logInfo("Attempting to load checkpoint from file " + file)
       try {
@@ -332,13 +332,15 @@ object CheckpointReader extends Logging {
         return Some(cp)
       } catch {
         case e: Exception =>
+          readError = e
           logWarning("Error reading checkpoint from file " + file, e)
       }
     })
 
     // If none of checkpoint files could be read, then throw exception
     if (!ignoreReadError) {
-      throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
+      throw new SparkException(
+        s"Failed to read checkpoint from directory $checkpointPath", readError)
     }
     None
   }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a2f5d82a79bd355cd3e5644a7a1d0b9659b1883a..bab78a3536b47a26e1702a9066310f88a512afa1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import java.io.{NotSerializableException, ObjectOutputStream}
+import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}
 
 import scala.collection.mutable.{ArrayBuffer, Queue}
 import scala.reflect.ClassTag
@@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](
 
   override def stop() { }
 
+  private def readObject(in: ObjectInputStream): Unit = {
+    throw new NotSerializableException("queueStream doesn't support checkpointing. " +
+      "Please don't use queueStream when checkpointing is enabled.")
+  }
+
   private def writeObject(oos: ObjectOutputStream): Unit = {
-    throw new NotSerializableException("queueStream doesn't support checkpointing")
+    logWarning("queueStream doesn't support checkpointing")
   }
 
   override def compute(validTime: Time): Option[RDD[T]] = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 7423ef6bcb6ea5fcd5af944292c8e2c702ee27cd..d26894e88fc26edeb3e3b06d267b5dd00f7d5579 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.storage.StorageLevel
@@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
   }
 
   test("queueStream doesn't support checkpointing") {
-    val checkpointDir = Utils.createTempDir()
-    ssc = new StreamingContext(master, appName, batchDuration)
-    val rdd = ssc.sparkContext.parallelize(1 to 10)
-    ssc.queueStream[Int](Queue(rdd)).print()
-    ssc.checkpoint(checkpointDir.getAbsolutePath)
-    val e = intercept[NotSerializableException] {
-      ssc.start()
+    val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
+    def creatingFunction(): StreamingContext = {
+      val _ssc = new StreamingContext(conf, batchDuration)
+      val rdd = _ssc.sparkContext.parallelize(1 to 10)
+      _ssc.checkpoint(checkpointDirectory)
+      _ssc.queueStream[Int](Queue(rdd)).register()
+      _ssc
+    }
+    ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+    ssc.start()
+    eventually(timeout(10000 millis)) {
+      assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
+    }
+    ssc.stop()
+    val e = intercept[SparkException] {
+      ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
     }
     // StreamingContext.validate changes the message, so use "contains" here
-    assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
+    assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
+      "Please don't use queueStream when checkpointing is enabled."))
   }
 
   def addInputStream(s: StreamingContext): DStream[Int] = {