From bce00dac403d3be2be59218b7b93a56c34c68f1a Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Wed, 13 May 2015 17:33:15 -0700
Subject: [PATCH] [SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to
 be recreated from checkpoint and existing SparkContext

This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are.

* Hard to implement in python.
* New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553

Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context.

So in this PR I have
* Removed the new versions of StreamingContext.getOrCreate() which took SparkContext
* Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6096 from tdas/SPARK-6752 and squashes the following commits:

53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752
f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
---
 .../spark/streaming/StreamingContext.scala    | 49 +------------
 .../api/java/JavaStreamingContext.scala       | 45 ------------
 .../apache/spark/streaming/JavaAPISuite.java  | 25 +------
 .../streaming/StreamingContextSuite.scala     | 70 ++-----------------
 4 files changed, 9 insertions(+), 180 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 407cab45ed..1d2ecdd341 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
     if (sc_ != null) {
       sc_
     } else if (isCheckpointPresent) {
-      new SparkContext(cp_.createSparkConf())
+      SparkContext.getOrCreate(cp_.createSparkConf())
     } else {
       throw new SparkException("Cannot create StreamingContext without a SparkContext")
     }
@@ -750,53 +750,6 @@ object StreamingContext extends Logging {
     checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
   }
 
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
-   * that the SparkConf configuration in the checkpoint data will not be restored as the
-   * SparkContext has already been created.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new StreamingContext using the given SparkContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: SparkContext => StreamingContext,
-      sparkContext: SparkContext
-    ): StreamingContext = {
-    getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
-   * that the SparkConf configuration in the checkpoint data will not be restored as the
-   * SparkContext has already been created.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new StreamingContext using the given SparkContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   * @param createOnError  Whether to create a new StreamingContext if there is an
-   *                       error in reading checkpoint data. By default, an exception will be
-   *                       thrown on error.
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: SparkContext => StreamingContext,
-      sparkContext: SparkContext,
-      createOnError: Boolean
-    ): StreamingContext = {
-    val checkpointOption = CheckpointReader.read(
-      checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
-    checkpointOption.map(new StreamingContext(sparkContext, _, null))
-                    .getOrElse(creatingFunc(sparkContext))
-  }
-
   /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8fbed2c50..b639b94d5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -804,51 +804,6 @@ object JavaStreamingContext {
     new JavaStreamingContext(ssc)
   }
 
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new JavaStreamingContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
-      sparkContext: JavaSparkContext
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
-      creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
-    }, sparkContext.sc)
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new JavaStreamingContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   * @param createOnError  Whether to create a new JavaStreamingContext if there is an
-   *                       error in reading checkpoint data.
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
-      sparkContext: JavaSparkContext,
-      createOnError: Boolean
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
-      creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
-    }, sparkContext.sc, createOnError)
-    new JavaStreamingContext(ssc)
-  }
-
   /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2e00b980b9..1077b1b2cb 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1766,29 +1766,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
 
-    // Function to create JavaStreamingContext using existing JavaSparkContext
-    // without any output operations (used to detect the new context)
-    Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
-        new Function<JavaSparkContext, JavaStreamingContext>() {
-          public JavaStreamingContext call(JavaSparkContext context) {
-            newContextCreated.set(true);
-            return new JavaStreamingContext(context, Seconds.apply(1));
-          }
-        };
-
-    JavaSparkContext sc = new JavaSparkContext(conf);
-    newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
-    Assert.assertTrue("new context not created", newContextCreated.get());
-    ssc.stop(false);
-
     newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
-    Assert.assertTrue("new context not created", newContextCreated.get());
-    ssc.stop(false);
-
-    newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+    JavaSparkContext sc = new JavaSparkContext(conf);
+    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+        new org.apache.hadoop.conf.Configuration());
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
   }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5f93332896..4b12affbb0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
       ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
-      assert(ssc.conf.get("someKey") === "someValue")
-    }
-  }
-
-  test("getOrCreate with existing SparkContext") {
-    val conf = new SparkConf().setMaster(master).setAppName(appName)
-    sc = new SparkContext(conf)
-
-    // Function to create StreamingContext that has a config to identify it to be new context
-    var newContextCreated = false
-    def creatingFunction(sparkContext: SparkContext): StreamingContext = {
-      newContextCreated = true
-      new StreamingContext(sparkContext, batchDuration)
-    }
-
-    // Call ssc.stop(stopSparkContext = false) after a body of cody
-    def testGetOrCreate(body: => Unit): Unit = {
-      newContextCreated = false
-      try {
-        body
-      } finally {
-        if (ssc != null) {
-          ssc.stop(stopSparkContext = false)
-        }
-        ssc = null
-      }
-    }
-
-    val emptyPath = Utils.createTempDir().getAbsolutePath()
-
-    // getOrCreate should create new context with empty path
-    testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
-      assert(ssc != null, "no context created")
-      assert(newContextCreated, "new context not created")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+      assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
     }
 
-    val corrutedCheckpointPath = createCorruptedCheckpoint()
-
-    // getOrCreate should throw exception with fake checkpoint file and createOnError = false
-    intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
-    }
-
-    // getOrCreate should throw exception with fake checkpoint file
-    intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
-    }
-
-    // getOrCreate should create new context with fake checkpoint file and createOnError = true
-    testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
-      assert(ssc != null, "no context created")
-      assert(newContextCreated, "new context not created")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
-    }
-
-    val checkpointPath = createValidCheckpoint()
-
-    // StreamingContext.getOrCreate should recover context with checkpoint path
+    // getOrCreate should recover StreamingContext with existing SparkContext
     testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
+      sc = new SparkContext(conf)
+      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
-      assert(!ssc.conf.contains("someKey"),
-        "recovered StreamingContext unexpectedly has old config")
+      assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
     }
   }
 
-- 
GitLab