diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 688e051e1fee70b8e78b41df9cb793ff637835db..87dd27a2b1aedf3ebb733ea476c7e573fe674194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -59,12 +59,14 @@ class StreamExecution( * Tracks how much data we have processed and committed to the sink or state store from each * input source. */ + @volatile private[sql] var committedOffsets = new StreamProgress /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. */ + @volatile private var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ @@ -111,7 +113,8 @@ class StreamExecution( /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { - sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray + val localAvailableOffsets = availableOffsets + sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray } /** Returns current status of the sink. */ @@ -228,7 +231,7 @@ class StreamExecution( * Queries all of the sources to see if any new data is available. When there is new data the * batchId counter is incremented and a new log entry is written with the newest offsets. */ - private def constructNextBatch(): Boolean = { + private def constructNextBatch(): Unit = { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" @@ -241,7 +244,15 @@ class StreamExecution( } availableOffsets ++= newData - if (dataAvailable) { + val hasNewData = awaitBatchLock.synchronized { + if (dataAvailable) { + true + } else { + noNewData = true + false + } + } + if (hasNewData) { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set @@ -254,15 +265,11 @@ class StreamExecution( } currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") - true } else { - noNewData = true awaitBatchLock.synchronized { // Wake up any threads that are waiting for the stream to progress. awaitBatchLock.notifyAll() } - - false } } @@ -353,7 +360,10 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset + def notDone = { + val localCommittedOffsets = committedOffsets + !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset + } while (notDone) { logInfo(s"Waiting until $newOffset at $source") @@ -365,13 +375,17 @@ class StreamExecution( /** A flag to indicate that a batch has completed with no new data available. */ @volatile private var noNewData = false - override def processAllAvailable(): Unit = { + override def processAllAvailable(): Unit = awaitBatchLock.synchronized { noNewData = false - while (!noNewData) { - awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } - if (streamDeathCause != null) { throw streamDeathCause } + while (true) { + awaitBatchLock.wait(10000) + if (streamDeathCause != null) { + throw streamDeathCause + } + if (noNewData) { + return + } } - if (streamDeathCause != null) { throw streamDeathCause } } override def awaitTermination(): Unit = {