Skip to content
Snippets Groups Projects
Commit 00e7b09a authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7553] [STREAMING] Added methods to maintain a singleton StreamingContext

In a REPL/notebook environment, its very easy to lose a reference to a StreamingContext by overriding the variable name. So if you happen to execute the following commands
```
val ssc = new StreamingContext(...) // cmd 1
ssc.start() // cmd 2
...
val ssc = new StreamingContext(...) // accidentally run cmd 1 again
```
The value of ssc will be overwritten. Now you can neither start the new context (as only one context can be started), nor stop the previous context (as the reference is lost).
Hence its best to maintain a singleton reference to the active context, so that we never loose reference for the active context.
Since this problem occurs useful in REPL environments, its best to add this as an Experimental support in the Scala API only so that it can be used in Scala REPLs and notebooks.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6070 from tdas/SPARK-7553 and squashes the following commits:

731c9a1 [Tathagata Das] Fixed style
a797171 [Tathagata Das] Added more unit tests
19fc70b [Tathagata Das] Added :: Experimental :: in docs
64706c9 [Tathagata Das] Fixed test
634db5d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7553
3884a25 [Tathagata Das] Fixing test bug
d37a846 [Tathagata Das] Added getActive and getActiveOrCreate
parent 96c4846d
No related branches found
No related tags found
No related merge requests found
......@@ -637,8 +637,10 @@ class StreamingContext private[streaming] (
*/
object StreamingContext extends Logging {
/**
* Lock that guards access to global variables that track active StreamingContext.
* Lock that guards activation of a StreamingContext as well as access to the singleton active
* StreamingContext in getActiveOrCreate().
*/
private val ACTIVATION_LOCK = new Object()
......@@ -661,6 +663,18 @@ object StreamingContext extends Logging {
}
}
/**
* :: Experimental ::
*
* Get the currently active context, if there is one. Active means started but not stopped.
*/
@Experimental
def getActive(): Option[StreamingContext] = {
ACTIVATION_LOCK.synchronized {
Option(activeContext.get())
}
}
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
......@@ -669,6 +683,48 @@ object StreamingContext extends Logging {
DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
}
/**
* :: Experimental ::
*
* Either return the "active" StreamingContext (that is, started but not stopped), or create a
* new StreamingContext that is
* @param creatingFunc Function to create a new StreamingContext
*/
@Experimental
def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext = {
ACTIVATION_LOCK.synchronized {
getActive().getOrElse { creatingFunc() }
}
}
/**
* :: Experimental ::
*
* Either get the currently active StreamingContext (that is, started but not stopped),
* OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data
* does not exist in the provided, then create a new StreamingContext by calling the provided
* `creatingFunc`.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext
* @param hadoopConf Optional Hadoop configuration if necessary for reading from the
* file system
* @param createOnError Optional, whether to create a new StreamingContext if there is an
* error in reading checkpoint data. By default, an exception will be
* thrown on error.
*/
@Experimental
def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext = {
ACTIVATION_LOCK.synchronized {
getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) }
}
}
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
......@@ -694,7 +750,6 @@ object StreamingContext extends Logging {
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
......@@ -761,7 +816,7 @@ object StreamingContext extends Logging {
): SparkContext = {
val conf = SparkContext.updatedConf(
new SparkConf(), master, appName, sparkHome, jars, environment)
createNewSparkContext(conf)
new SparkContext(conf)
}
private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
......
......@@ -41,6 +41,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"
val conf = new SparkConf().setMaster(master).setAppName(appName)
var sc: SparkContext = null
var ssc: StreamingContext = null
......@@ -390,23 +391,23 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(newContextCreated, "new context not created")
}
val corrutedCheckpointPath = createCorruptedCheckpoint()
val corruptedCheckpointPath = createCorruptedCheckpoint()
// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _)
ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
}
// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
corrutedCheckpointPath, creatingFunction _, createOnError = false)
corruptedCheckpointPath, creatingFunction _, createOnError = false)
}
// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
corrutedCheckpointPath, creatingFunction _, createOnError = true)
corruptedCheckpointPath, creatingFunction _, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
......@@ -491,8 +492,145 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
test("getActive and getActiveOrCreate") {
require(StreamingContext.getActive().isEmpty, "context exists from before")
sc = new SparkContext(conf)
var newContextCreated = false
def creatingFunc(): StreamingContext = {
newContextCreated = true
val newSsc = new StreamingContext(sc, batchDuration)
val input = addInputStream(newSsc)
input.foreachRDD { rdd => rdd.count }
newSsc
}
def testGetActiveOrCreate(body: => Unit): Unit = {
newContextCreated = false
try {
body
} finally {
if (ssc != null) {
ssc.stop(stopSparkContext = false)
}
ssc = null
}
}
// getActiveOrCreate should create new context and getActive should return it only
// after starting the context
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(creatingFunc _)
assert(ssc != null, "no context created")
assert(newContextCreated === true, "new context not created")
assert(StreamingContext.getActive().isEmpty,
"new initialized context returned before starting")
ssc.start()
assert(StreamingContext.getActive() === Some(ssc),
"active context not returned")
assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
"active context not returned")
ssc.stop()
assert(StreamingContext.getActive().isEmpty,
"inactive context returned")
assert(StreamingContext.getActiveOrCreate(creatingFunc _) !== ssc,
"inactive context returned")
}
// getActiveOrCreate and getActive should return independently created context after activating
testGetActiveOrCreate {
ssc = creatingFunc() // Create
assert(StreamingContext.getActive().isEmpty,
"new initialized context returned before starting")
ssc.start()
assert(StreamingContext.getActive() === Some(ssc),
"active context not returned")
assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
"active context not returned")
ssc.stop()
assert(StreamingContext.getActive().isEmpty,
"inactive context returned")
}
}
test("getActiveOrCreate with checkpoint") {
// Function to create StreamingContext that has a config to identify it to be new context
var newContextCreated = false
def creatingFunction(): StreamingContext = {
newContextCreated = true
new StreamingContext(conf, batchDuration)
}
// Call ssc.stop after a body of code
def testGetActiveOrCreate(body: => Unit): Unit = {
require(StreamingContext.getActive().isEmpty) // no active context
newContextCreated = false
try {
body
} finally {
if (ssc != null) {
ssc.stop()
}
ssc = null
}
}
val emptyPath = Utils.createTempDir().getAbsolutePath()
val corruptedCheckpointPath = createCorruptedCheckpoint()
val checkpointPath = createValidCheckpoint()
// getActiveOrCreate should return the current active context if there is one
testGetActiveOrCreate {
ssc = new StreamingContext(
conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
addInputStream(ssc).register()
ssc.start()
val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
assert(!newContextCreated, "new context created instead of returning")
assert(returnedSsc.eq(ssc), "returned context is not the activated context")
}
// getActiveOrCreate should create new context with empty path
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
// getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
}
// getActiveOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getActiveOrCreate(
corruptedCheckpointPath, creatingFunction _, createOnError = false)
}
// getActiveOrCreate should create new context with fake
// checkpoint file and createOnError = true
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(
corruptedCheckpointPath, creatingFunction _, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
// getActiveOrCreate should recover context with checkpoint path, and recover old configuration
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue")
}
}
test("multiple streaming contexts") {
sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
sc = new SparkContext(
conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"))
ssc = new StreamingContext(sc, Seconds(1))
val input = addInputStream(ssc)
input.foreachRDD { rdd => rdd.count }
......@@ -522,9 +660,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
def createValidCheckpoint(): String = {
val testDirectory = Utils.createTempDir().getAbsolutePath()
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("someKey", "someValue")
ssc = new StreamingContext(conf, batchDuration)
ssc = new StreamingContext(conf.clone.set("someKey", "someValue"), batchDuration)
ssc.checkpoint(checkpointDirectory)
ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
ssc.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment