diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 1b0b7890b3b009bb876bac7c0f9db8e7c6f29504..7829f5e887995e02ba739d7a994a32d5648b09b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { * safe remember duration which can be used to perform cleanup operations. */ def getMaxInputStreamRememberDuration(): Duration = { - inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds } + // If an InputDStream is not used, its `rememberDuration` will be null and we can ignore them + inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds) } @throws(classOf[IOException]) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index c7a877142b374cfec536e9141b845f26b727399a..860fac29c0ee0a1161c36750dc1a867c3517e786 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo "Please don't use queueStream when checkpointing is enabled.")) } + test("Creating an InputDStream but not using it should not crash") { + ssc = new StreamingContext(master, appName, batchDuration) + val input1 = addInputStream(ssc) + val input2 = addInputStream(ssc) + val output = new TestOutputStream(input2) + output.register() + val batchCount = new BatchCounter(ssc) + ssc.start() + // Just wait for completing 2 batches to make sure it triggers + // `DStream.getMaxInputStreamRememberDuration` + batchCount.waitUntilBatchesCompleted(2, 10000) + // Throw the exception if crash + ssc.awaitTerminationOrTimeout(1) + ssc.stop() + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1)