From 07e2a17d1cb7eade93d482d18a2079e9e6f40f57 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Wed, 21 Dec 2016 22:02:57 -0800 Subject: [PATCH] [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created ## What changes were proposed in this pull request? This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created. In addition, this PR also fixes the following issues in `StreamingQueryException`: - `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message. - Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace. ## How was this patch tested? The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16322 from zsxwing/SPARK-18907. (cherry picked from commit ff7d82a207e8bef7779c27378f7a50a138627341) Signed-off-by: Shixiong Zhu <shixiong@databricks.com> --- .../execution/streaming/StreamExecution.scala | 141 ++++++++++++------ .../streaming/StreamingQueryException.scala | 28 +--- .../sql/streaming/FileStreamSourceSuite.scala | 39 +++-- .../spark/sql/streaming/StreamSuite.scala | 3 +- .../spark/sql/streaming/StreamTest.scala | 52 +++++-- .../StreamingQueryListenerSuite.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 7 files changed, 165 insertions(+), 104 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e05200df50..a35950e2dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ @@ -67,6 +66,7 @@ class StreamExecution( private val awaitBatchLock = new ReentrantLock(true) private val awaitBatchLockCondition = awaitBatchLock.newCondition() + private val initializationLatch = new CountDownLatch(1) private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) @@ -118,9 +118,22 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + /** + * All stream sources present in the query plan. This will be set when generating logical plan. + */ + @volatile protected var sources: Seq[Source] = Seq.empty + + /** + * A list of unique sources in the query plan. This will be set when generating logical plan. + */ + @volatile private var uniqueSources: Seq[Source] = Seq.empty + override lazy val logicalPlan: LogicalPlan = { + assert(microBatchThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + + s"but the current thread was ${Thread.currentThread}") var nextSourceId = 0L - analyzedPlan.transform { + val _logicalPlan = analyzedPlan.transform { case StreamingRelation(dataSource, _, output) => // Materialize source to avoid creating it in every batch val metadataPath = s"$checkpointRoot/sources/$nextSourceId" @@ -130,22 +143,18 @@ class StreamExecution( // "df.logicalPlan" has already used attributes of the previous `output`. StreamingExecutionRelation(source, output) } + sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } + uniqueSources = sources.distinct + _logicalPlan } - /** All stream sources present in the query plan. */ - protected lazy val sources = - logicalPlan.collect { case s: StreamingExecutionRelation => s.source } - - /** A list of unique sources in the query plan. */ - private lazy val uniqueSources = sources.distinct - private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) } /** Defines the internal state of execution */ @volatile - private var state: State = INITIALIZED + private var state: State = INITIALIZING @volatile var lastExecution: QueryExecution = _ @@ -186,8 +195,11 @@ class StreamExecution( */ val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) + /** Whether all fields of the query have been initialized */ + private def isInitialized: Boolean = state != INITIALIZING + /** Whether the query is currently active or not */ - override def isActive: Boolean = state == ACTIVE + override def isActive: Boolean = state != TERMINATED /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */ override def exception: Option[StreamingQueryException] = Option(streamDeathCause) @@ -216,9 +228,6 @@ class StreamExecution( */ private def runBatches(): Unit = { try { - // Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners, - // so must mark this as ACTIVE first. - state = ACTIVE if (sparkSession.sessionState.conf.streamingMetricsEnabled) { sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } @@ -235,6 +244,9 @@ class StreamExecution( updateStatusMessage("Initializing sources") // force initialization of the logical plan so that the sources can be created logicalPlan + state = ACTIVE + // Unblock `awaitInitialization` + initializationLatch.countDown() triggerExecutor.execute(() => { startTrigger() @@ -282,7 +294,7 @@ class StreamExecution( updateStatusMessage("Stopped") case e: Throwable => streamDeathCause = new StreamingQueryException( - this, + toDebugString(includeLogicalPlan = isInitialized), s"Query $prettyIdString terminated with exception: ${e.getMessage}", e, committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, @@ -295,17 +307,25 @@ class StreamExecution( throw e } } finally { - state = TERMINATED - currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) + // Release latches to unblock the user codes since exception can happen in any place and we + // may not get a chance to release them + startLatch.countDown() + initializationLatch.countDown() - // Update metrics and status - sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics) + try { + state = TERMINATED + currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) - // Notify others - sparkSession.streams.notifyQueryTermination(StreamExecution.this) - postEvent( - new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) - terminationLatch.countDown() + // Update metrics and status + sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics) + + // Notify others + sparkSession.streams.notifyQueryTermination(StreamExecution.this) + postEvent( + new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) + } finally { + terminationLatch.countDown() + } } } @@ -537,6 +557,7 @@ class StreamExecution( * least the given `Offset`. This method is intended for use primarily when writing tests. */ private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { + assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset @@ -559,7 +580,38 @@ class StreamExecution( /** A flag to indicate that a batch has completed with no new data available. */ @volatile private var noNewData = false + /** + * Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause + * dead-lock, e.g., calling any await APIs in `StreamingQueryListener.onQueryStarted` will block + * the stream thread forever. + */ + private def assertAwaitThread(): Unit = { + if (microBatchThread eq Thread.currentThread) { + throw new IllegalStateException( + "Cannot wait for a query state from the same thread that is running the query") + } + } + + /** + * Await until all fields of the query have been initialized. + */ + def awaitInitialization(timeoutMs: Long): Unit = { + assertAwaitThread() + require(timeoutMs > 0, "Timeout has to be positive") + if (streamDeathCause != null) { + throw streamDeathCause + } + initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS) + if (streamDeathCause != null) { + throw streamDeathCause + } + } + override def processAllAvailable(): Unit = { + assertAwaitThread() + if (streamDeathCause != null) { + throw streamDeathCause + } awaitBatchLock.lock() try { noNewData = false @@ -578,9 +630,7 @@ class StreamExecution( } override def awaitTermination(): Unit = { - if (state == INITIALIZED) { - throw new IllegalStateException("Cannot wait for termination on a query that has not started") - } + assertAwaitThread() terminationLatch.await() if (streamDeathCause != null) { throw streamDeathCause @@ -588,9 +638,7 @@ class StreamExecution( } override def awaitTermination(timeoutMs: Long): Boolean = { - if (state == INITIALIZED) { - throw new IllegalStateException("Cannot wait for termination on a query that has not started") - } + assertAwaitThread() require(timeoutMs > 0, "Timeout has to be positive") terminationLatch.await(timeoutMs, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { @@ -623,27 +671,24 @@ class StreamExecution( s"Streaming Query $prettyIdString [state = $state]" } - def toDebugString: String = { - val deathCauseStr = if (streamDeathCause != null) { - "Error:\n" + stackTraceToString(streamDeathCause.cause) - } else "" - s""" - |=== Streaming Query === - |Identifier: $prettyIdString - |Current Offsets: $committedOffsets - | - |Current State: $state - |Thread State: ${microBatchThread.getState} - | - |Logical Plan: - |$logicalPlan - | - |$deathCauseStr - """.stripMargin + private def toDebugString(includeLogicalPlan: Boolean): String = { + val debugString = + s"""|=== Streaming Query === + |Identifier: $prettyIdString + |Current Committed Offsets: $committedOffsets + |Current Available Offsets: $availableOffsets + | + |Current State: $state + |Thread State: ${microBatchThread.getState}""".stripMargin + if (includeLogicalPlan) { + debugString + s"\n\nLogical Plan:\n$logicalPlan" + } else { + debugString + } } trait State - case object INITIALIZED extends State + case object INITIALIZING extends State case object ACTIVE extends State case object TERMINATED extends State } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index a96150aa89..c53c29591a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecution} /** * :: Experimental :: @@ -31,35 +30,18 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut * @since 2.0.0 */ @Experimental -class StreamingQueryException private( - causeString: String, +class StreamingQueryException private[sql]( + private val queryDebugString: String, val message: String, val cause: Throwable, val startOffset: String, val endOffset: String) extends Exception(message, cause) { - private[sql] def this( - query: StreamingQuery, - message: String, - cause: Throwable, - startOffset: String, - endOffset: String) { - this( - // scalastyle:off - s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")} - | - |${query.asInstanceOf[StreamExecution].toDebugString} - """.stripMargin, - // scalastyle:on - message, - cause, - startOffset, - endOffset) - } - /** Time when the exception occurred */ val time: Long = System.currentTimeMillis - override def toString(): String = causeString + override def toString(): String = + s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} + |$queryDebugString""".stripMargin } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 55d927a857..8a9fa94bea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -815,21 +815,31 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("max files per trigger - incorrect values") { - withTempDir { case src => - def testMaxFilePerTriggerValue(value: String): Unit = { - val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath) - val e = intercept[IllegalArgumentException] { - testStream(df)() - } - Seq("maxFilesPerTrigger", value, "positive integer").foreach { s => - assert(e.getMessage.contains(s)) + val testTable = "maxFilesPerTrigger_test" + withTable(testTable) { + withTempDir { case src => + def testMaxFilePerTriggerValue(value: String): Unit = { + val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath) + val e = intercept[StreamingQueryException] { + // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source + val q = df.writeStream.format("memory").queryName(testTable).start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + assert(e.getCause.isInstanceOf[IllegalArgumentException]) + Seq("maxFilesPerTrigger", value, "positive integer").foreach { s => + assert(e.getMessage.contains(s)) + } } - } - testMaxFilePerTriggerValue("not-a-integer") - testMaxFilePerTriggerValue("-1") - testMaxFilePerTriggerValue("0") - testMaxFilePerTriggerValue("10.1") + testMaxFilePerTriggerValue("not-a-integer") + testMaxFilePerTriggerValue("-1") + testMaxFilePerTriggerValue("0") + testMaxFilePerTriggerValue("10.1") + } } } @@ -1202,7 +1212,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { } } -/** Fake FileSystem to test whether the method `fs.exists` is called during +/** + * Fake FileSystem to test whether the method `fs.exists` is called during * `DataSource.resolveRelation`. */ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b8fa82d9b4..34b0ee8064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -259,8 +259,9 @@ class StreamSuite extends StreamTest { override def stop(): Unit = {} } val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source)) + // These error are fatal errors and should be ignored in `testStream` to not fail the test. testStream(df)( - ExpectFailure()(ClassTag(e.getClass)) + ExpectFailure(isFatalError = true)(ClassTag(e.getClass)) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 6fbbbb1f8e..709050d29b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -167,10 +167,17 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** Advance the trigger clock's time manually. */ case class AdvanceManualClock(timeToAdd: Long) extends StreamAction - /** Signals that a failure is expected and should not kill the test. */ - case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { + /** + * Signals that a failure is expected and should not kill the test. + * + * @param isFatalError if this is a fatal error. If so, the error should also be caught by + * UncaughtExceptionHandler. + */ + case class ExpectFailure[T <: Throwable : ClassTag]( + isFatalError: Boolean = false) extends StreamAction { val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] - override def toString(): String = s"ExpectFailure[${causeClass.getName}]" + override def toString(): String = + s"ExpectFailure[${causeClass.getName}, isFatalError: $isFatalError]" } /** Assert that a body is true */ @@ -240,7 +247,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val resetConfValues = mutable.Map[String, Option[String]]() @volatile - var streamDeathCause: Throwable = null + var streamThreadDeathCause: Throwable = null // If the test doesn't manually start the stream, we do it automatically at the beginning. val startedManually = @@ -271,7 +278,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { |Output Mode: $outputMode |Stream state: $currentOffsets |Thread state: $threadState - |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""} + |${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""} | |== Sink == |${sink.toDebugString} @@ -360,9 +367,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable): Unit = { - streamDeathCause = e + streamThreadDeathCause = e } }) + // Wait until the initialization finishes, because some tests need to use `logicalPlan` + // after starting the query. + currentStream.awaitInitialization(streamingTimeout.toMillis) case AdvanceManualClock(timeToAdd) => verify(currentStream != null, @@ -396,8 +406,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { currentStream.exception.map(_.toString()).getOrElse("")) } catch { case _: InterruptedException => - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out while stopping and waiting for microbatchthread to terminate.") + case e: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest( + "Timed out while stopping and waiting for microbatchthread to terminate.", e) case t: Throwable => failTest("Error while stopping stream", t) } finally { @@ -421,16 +432,24 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { verify(exception.cause.getClass === ef.causeClass, "incorrect cause in exception returned by query.exception()\n" + s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}") + if (ef.isFatalError) { + // This is a fatal error, `streamThreadDeathCause` should be set to this error in + // UncaughtExceptionHandler. + verify(streamThreadDeathCause != null && + streamThreadDeathCause.getClass === ef.causeClass, + "UncaughtExceptionHandler didn't receive the correct error\n" + + s"\tExpected: ${ef.causeClass}\n\tReturned: $streamThreadDeathCause") + streamThreadDeathCause = null + } } catch { case _: InterruptedException => - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out while waiting for failure") + case e: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest("Timed out while waiting for failure", e) case t: Throwable => failTest("Error while checking stream failure", t) } finally { lastStream = currentStream currentStream = null - streamDeathCause = null } case a: AssertOnQuery => @@ -508,11 +527,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } pos += 1 } + if (streamThreadDeathCause != null) { + failTest("Stream Thread Died", streamThreadDeathCause) + } } catch { - case _: InterruptedException if streamDeathCause != null => - failTest("Stream Thread Died") - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out waiting for stream") + case _: InterruptedException if streamThreadDeathCause != null => + failTest("Stream Thread Died", streamThreadDeathCause) + case e: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest("Timed out waiting for stream", e) } finally { if (currentStream != null && currentStream.microBatchThread.isAlive) { currentStream.stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index a057d1d36c..4596aa1d34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -111,7 +111,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { StartStream(ProcessingTime(100), triggerClock = clock), AddData(inputData, 0), AdvanceManualClock(100), - ExpectFailure[SparkException], + ExpectFailure[SparkException](), AssertOnQuery { query => eventually(Timeout(streamingTimeout)) { assert(listener.terminationEvent !== null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 6c4bb35ccb..1525ad5fd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -142,7 +142,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { StartStream(), AssertOnQuery(_.isActive === true), AddData(inputData, 0), - ExpectFailure[SparkException], + ExpectFailure[SparkException](), AssertOnQuery(_.isActive === false), TestAwaitTermination(ExpectException[SparkException]), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), @@ -306,7 +306,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { StartStream(ProcessingTime(100), triggerClock = clock), AddData(inputData, 0), AdvanceManualClock(100), - ExpectFailure[SparkException], + ExpectFailure[SparkException](), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message.startsWith("Terminated with exception")) -- GitLab