diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 921b887a8978a21b8dbb689afa06121138fde095..0615f7b565e404fc331982ad9a6ee4fa165d6a3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(null)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0e5f39f7722ab4b87b058dcddf874379c8c55868
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -0,0 +1,58 @@
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.util.IntParam
+import java.io.File
+import org.apache.spark.rdd.RDD
+import com.google.common.io.Files
+import java.nio.charset.Charset
+
+object RecoverableNetworkWordCount {
+
+  def createContext(master: String, ip: String, port: Int, outputPath: String) = {
+
+    val outputFile = new File(outputPath)
+    if (outputFile.exists()) outputFile.delete()
+
+    // Create the context with a 1 second batch size
+    println("Creating new context")
+    val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
+      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+    // Create a NetworkInputDStream on target ip:port and count the
+    // words in input stream of \n delimited test (eg. generated by 'nc')
+    val lines = ssc.socketTextStream(ip, port)
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+    wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
+      val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
+      println(counts)
+      println("Appending to " + outputFile.getAbsolutePath)
+      Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+    })
+    ssc
+  }
+
+  def main(args: Array[String]) {
+    if (args.length != 5) {
+      System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+      System.err.println(
+        """
+          |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
+          |
+          |In local mode, <master> should be 'local[n]' with n > 1
+          |Both <checkpoint-directory> and <output-directory> should be full paths
+        """.stripMargin
+      )
+      System.exit(1)
+    }
+    val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args
+    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
+      () => {
+        createContext(master, ip, port, outputPath)
+      })
+    ssc.start()
+
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7b343d2376addc7171546ebf9c6d9e428bcd9d71..139e2c08b5d881439d020d069fede8304628a8b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.MetadataCleaner
 
@@ -141,9 +141,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
 private[streaming]
 object CheckpointReader extends Logging {
 
+  def doesCheckpointExist(path: String): Boolean = {
+    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
+    val fs = new Path(path).getFileSystem(new Configuration())
+    (attempts.count(p => fs.exists(p)) > 1)
+  }
+
   def read(path: String): Checkpoint = {
     val fs = new Path(path).getFileSystem(new Configuration())
-    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
 
     val compressionCodec = CompressionCodec.createCodec()
 
@@ -175,7 +181,7 @@ object CheckpointReader extends Logging {
       }
 
     })
-    throw new Exception("Could not read checkpoint from path '" + path + "'")
+    throw new SparkException("Could not read checkpoint from path '" + path + "'")
   }
 }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 41da028a3cf9f0dbfd1dbf68cad6ec39591ed294..01b213ab42c9be184ce73d75cac01e81dfe0e5f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -570,12 +570,28 @@ class StreamingContext private (
 }
 
 
-object StreamingContext {
+object StreamingContext extends Logging {
 
   implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
     new PairDStreamFunctions[K, V](stream)
   }
 
+  def getOrCreate(
+      checkpointPath: String,
+      creatingFunc: () => StreamingContext,
+      createOnCheckpointError: Boolean = false
+    ): StreamingContext = {
+    if (CheckpointReader.doesCheckpointExist(checkpointPath)) {
+      logInfo("Creating streaming context from checkpoint file")
+      new StreamingContext(checkpointPath)
+    } else {
+      logInfo("Creating new streaming context")
+      val ssc = creatingFunc()
+      ssc.checkpoint(checkpointPath)
+      ssc
+    }
+  }
+
   protected[streaming] def createNewSparkContext(
       master: String,
       appName: String,