Skip to content
Snippets Groups Projects
Commit 9a48e60e authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Michael Armbrust
Browse files

[SPARK-17780][SQL] Report Throwable to user in StreamExecution

## What changes were proposed in this pull request?

When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.

## How was this patch tested?

`test("NoClassDefFoundError from an incompatible source")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15352 from zsxwing/SPARK-17780.
parent 79accf45
No related branches found
No related tags found
No related merge requests found
......@@ -207,13 +207,18 @@ class StreamExecution(
})
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
case NonFatal(e) =>
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
Some(committedOffsets.toCompositeOffset(sources)))
logError(s"Query $name terminated with error", e)
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
if (!NonFatal(e)) {
throw e
}
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
......
......@@ -17,10 +17,12 @@
package org.apache.spark.sql.streaming
import scala.reflect.ClassTag
import scala.util.control.ControlThrowable
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ManualClock
......@@ -236,6 +238,33 @@ class StreamSuite extends StreamTest {
}
}
testQuietly("fatal errors from a source should be sent to the user") {
for (e <- Seq(
new VirtualMachineError {},
new ThreadDeath,
new LinkageError,
new ControlThrowable {}
)) {
val source = new Source {
override def getOffset: Option[Offset] = {
throw e
}
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
throw e
}
override def schema: StructType = StructType(Array(StructField("value", IntegerType)))
override def stop(): Unit = {}
}
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
testStream(df)(
ExpectFailure()(ClassTag(e.getClass))
)
}
}
test("output mode API in Scala") {
val o1 = OutputMode.Append
assert(o1 === InternalOutputModes.Append)
......
......@@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
}
/** Assert that a body is true */
......@@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
streamDeathCause = e
testThread.interrupt()
}
})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment