diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 407cab45ed4c6a9d61012b7bc4508874c5192a0b..1d2ecdd341813358529ab15c8c009fb343719f6e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
     if (sc_ != null) {
       sc_
     } else if (isCheckpointPresent) {
-      new SparkContext(cp_.createSparkConf())
+      SparkContext.getOrCreate(cp_.createSparkConf())
     } else {
       throw new SparkException("Cannot create StreamingContext without a SparkContext")
     }
@@ -750,53 +750,6 @@ object StreamingContext extends Logging {
     checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
   }
 
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
-   * that the SparkConf configuration in the checkpoint data will not be restored as the
-   * SparkContext has already been created.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new StreamingContext using the given SparkContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: SparkContext => StreamingContext,
-      sparkContext: SparkContext
-    ): StreamingContext = {
-    getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
-   * that the SparkConf configuration in the checkpoint data will not be restored as the
-   * SparkContext has already been created.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new StreamingContext using the given SparkContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   * @param createOnError  Whether to create a new StreamingContext if there is an
-   *                       error in reading checkpoint data. By default, an exception will be
-   *                       thrown on error.
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: SparkContext => StreamingContext,
-      sparkContext: SparkContext,
-      createOnError: Boolean
-    ): StreamingContext = {
-    val checkpointOption = CheckpointReader.read(
-      checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
-    checkpointOption.map(new StreamingContext(sparkContext, _, null))
-                    .getOrElse(creatingFunc(sparkContext))
-  }
-
   /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8fbed2c50644c5d3437a0083af7b02387557669..b639b94d5ca475b4204cc7be23eb8ff67e80c078 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -804,51 +804,6 @@ object JavaStreamingContext {
     new JavaStreamingContext(ssc)
   }
 
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new JavaStreamingContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
-      sparkContext: JavaSparkContext
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
-      creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
-    }, sparkContext.sc)
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new JavaStreamingContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   * @param createOnError  Whether to create a new JavaStreamingContext if there is an
-   *                       error in reading checkpoint data.
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
-      sparkContext: JavaSparkContext,
-      createOnError: Boolean
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
-      creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
-    }, sparkContext.sc, createOnError)
-    new JavaStreamingContext(ssc)
-  }
-
   /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2e00b980b9e44a089723a2469130b3fd59fef5da..1077b1b2cb7e30ddfa44ec408b70eb295f874c72 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1766,29 +1766,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
 
-    // Function to create JavaStreamingContext using existing JavaSparkContext
-    // without any output operations (used to detect the new context)
-    Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
-        new Function<JavaSparkContext, JavaStreamingContext>() {
-          public JavaStreamingContext call(JavaSparkContext context) {
-            newContextCreated.set(true);
-            return new JavaStreamingContext(context, Seconds.apply(1));
-          }
-        };
-
-    JavaSparkContext sc = new JavaSparkContext(conf);
-    newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
-    Assert.assertTrue("new context not created", newContextCreated.get());
-    ssc.stop(false);
-
     newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
-    Assert.assertTrue("new context not created", newContextCreated.get());
-    ssc.stop(false);
-
-    newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+    JavaSparkContext sc = new JavaSparkContext(conf);
+    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+        new org.apache.hadoop.conf.Configuration());
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
   }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5f93332896de1c4c51228493d493a1b86070037f..4b12affbb0ddd85a5090a1aa433c3c375fae5487 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
       ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
-      assert(ssc.conf.get("someKey") === "someValue")
-    }
-  }
-
-  test("getOrCreate with existing SparkContext") {
-    val conf = new SparkConf().setMaster(master).setAppName(appName)
-    sc = new SparkContext(conf)
-
-    // Function to create StreamingContext that has a config to identify it to be new context
-    var newContextCreated = false
-    def creatingFunction(sparkContext: SparkContext): StreamingContext = {
-      newContextCreated = true
-      new StreamingContext(sparkContext, batchDuration)
-    }
-
-    // Call ssc.stop(stopSparkContext = false) after a body of cody
-    def testGetOrCreate(body: => Unit): Unit = {
-      newContextCreated = false
-      try {
-        body
-      } finally {
-        if (ssc != null) {
-          ssc.stop(stopSparkContext = false)
-        }
-        ssc = null
-      }
-    }
-
-    val emptyPath = Utils.createTempDir().getAbsolutePath()
-
-    // getOrCreate should create new context with empty path
-    testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
-      assert(ssc != null, "no context created")
-      assert(newContextCreated, "new context not created")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+      assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
     }
 
-    val corrutedCheckpointPath = createCorruptedCheckpoint()
-
-    // getOrCreate should throw exception with fake checkpoint file and createOnError = false
-    intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
-    }
-
-    // getOrCreate should throw exception with fake checkpoint file
-    intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
-    }
-
-    // getOrCreate should create new context with fake checkpoint file and createOnError = true
-    testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
-      assert(ssc != null, "no context created")
-      assert(newContextCreated, "new context not created")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
-    }
-
-    val checkpointPath = createValidCheckpoint()
-
-    // StreamingContext.getOrCreate should recover context with checkpoint path
+    // getOrCreate should recover StreamingContext with existing SparkContext
     testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
+      sc = new SparkContext(conf)
+      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
-      assert(!ssc.conf.contains("someKey"),
-        "recovered StreamingContext unexpectedly has old config")
+      assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
     }
   }