diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b3a0d6ad0bd4c3acbaf0cedc7920c011c2bdf784..333239f875bd3e24d5bbbc78e9511ea4b47b8154 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -207,13 +207,18 @@ class StreamExecution( }) } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() - case NonFatal(e) => + case e: Throwable => streamDeathCause = new StreamingQueryException( this, s"Query $name terminated with exception: ${e.getMessage}", e, Some(committedOffsets.toCompositeOffset(sources))) logError(s"Query $name terminated with error", e) + // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to + // handle them + if (!NonFatal(e)) { + throw e + } } finally { state = TERMINATED sparkSession.streams.notifyQueryTermination(StreamExecution.this) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 1caafb9d7444086d6020ac4ddc057a061141314e..cdbad901dba8ebd7b337c7bb4df6af6e71b685e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.streaming +import scala.reflect.ClassTag +import scala.util.control.ControlThrowable + import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.StreamSourceProvider -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.ManualClock @@ -236,6 +238,33 @@ class StreamSuite extends StreamTest { } } + testQuietly("fatal errors from a source should be sent to the user") { + for (e <- Seq( + new VirtualMachineError {}, + new ThreadDeath, + new LinkageError, + new ControlThrowable {} + )) { + val source = new Source { + override def getOffset: Option[Offset] = { + throw e + } + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + throw e + } + + override def schema: StructType = StructType(Array(StructField("value", IntegerType))) + + override def stop(): Unit = {} + } + val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source)) + testStream(df)( + ExpectFailure()(ClassTag(e.getClass)) + ) + } + } + test("output mode API in Scala") { val o1 = OutputMode.Append assert(o1 === InternalOutputModes.Append) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 09140a1d6e76b876666cabb9fa1b49b7ae012738..fa13d385cce75eb329f8ee58b181d8a1311a122e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** Signals that a failure is expected and should not kill the test. */ case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] - override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]" + override def toString(): String = s"ExpectFailure[${causeClass.getName}]" } /** Assert that a body is true */ @@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable): Unit = { streamDeathCause = e - testThread.interrupt() } })