From 9a48e60e6319d85f2c3be3a3c608dab135e18a73 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Thu, 6 Oct 2016 12:51:12 -0700
Subject: [PATCH] [SPARK-17780][SQL] Report Throwable to user in
 StreamExecution

## What changes were proposed in this pull request?

When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.

## How was this patch tested?

`test("NoClassDefFoundError from an incompatible source")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15352 from zsxwing/SPARK-17780.
---
 .../execution/streaming/StreamExecution.scala |  7 ++++-
 .../spark/sql/streaming/StreamSuite.scala     | 31 ++++++++++++++++++-
 .../spark/sql/streaming/StreamTest.scala      |  3 +-
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b3a0d6ad0b..333239f875 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -207,13 +207,18 @@ class StreamExecution(
       })
     } catch {
       case _: InterruptedException if state == TERMINATED => // interrupted by stop()
-      case NonFatal(e) =>
+      case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
           Some(committedOffsets.toCompositeOffset(sources)))
         logError(s"Query $name terminated with error", e)
+        // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
+        // handle them
+        if (!NonFatal(e)) {
+          throw e
+        }
     } finally {
       state = TERMINATED
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 1caafb9d74..cdbad901db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.streaming
 
+import scala.reflect.ClassTag
+import scala.util.control.ControlThrowable
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.ManualClock
 
@@ -236,6 +238,33 @@ class StreamSuite extends StreamTest {
     }
   }
 
+  testQuietly("fatal errors from a source should be sent to the user") {
+    for (e <- Seq(
+      new VirtualMachineError {},
+      new ThreadDeath,
+      new LinkageError,
+      new ControlThrowable {}
+    )) {
+      val source = new Source {
+        override def getOffset: Option[Offset] = {
+          throw e
+        }
+
+        override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+          throw e
+        }
+
+        override def schema: StructType = StructType(Array(StructField("value", IntegerType)))
+
+        override def stop(): Unit = {}
+      }
+      val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
+      testStream(df)(
+        ExpectFailure()(ClassTag(e.getClass))
+      )
+    }
+  }
+
   test("output mode API in Scala") {
     val o1 = OutputMode.Append
     assert(o1 === InternalOutputModes.Append)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 09140a1d6e..fa13d385cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
   /** Signals that a failure is expected and should not kill the test. */
   case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
     val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
-    override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
+    override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
   }
 
   /** Assert that a body is true */
@@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
               new UncaughtExceptionHandler {
                 override def uncaughtException(t: Thread, e: Throwable): Unit = {
                   streamDeathCause = e
-                  testThread.interrupt()
                 }
               })
 
-- 
GitLab