Skip to content
Snippets Groups Projects
Commit cf69ce13 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Sean Owen
Browse files

[SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

Just ignored `InputDStream`s that have null `rememberDuration` in `DStreamGraph.getMaxInputStreamRememberDuration`.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9476 from zsxwing/SPARK-11511.
parent 253e87e8
No related branches found
No related tags found
No related merge requests found
...@@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { ...@@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
* safe remember duration which can be used to perform cleanup operations. * safe remember duration which can be used to perform cleanup operations.
*/ */
def getMaxInputStreamRememberDuration(): Duration = { def getMaxInputStreamRememberDuration(): Duration = {
inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds } // If an InputDStream is not used, its `rememberDuration` will be null and we can ignore them
inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds)
} }
@throws(classOf[IOException]) @throws(classOf[IOException])
......
...@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ...@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
"Please don't use queueStream when checkpointing is enabled.")) "Please don't use queueStream when checkpointing is enabled."))
} }
test("Creating an InputDStream but not using it should not crash") {
ssc = new StreamingContext(master, appName, batchDuration)
val input1 = addInputStream(ssc)
val input2 = addInputStream(ssc)
val output = new TestOutputStream(input2)
output.register()
val batchCount = new BatchCounter(ssc)
ssc.start()
// Just wait for completing 2 batches to make sure it triggers
// `DStream.getMaxInputStreamRememberDuration`
batchCount.waitUntilBatchesCompleted(2, 10000)
// Throw the exception if crash
ssc.awaitTerminationOrTimeout(1)
ssc.stop()
}
def addInputStream(s: StreamingContext): DStream[Int] = { def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i) val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1) val inputStream = new TestInputStream(s, input, 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment