From 59cccbda489f25add3e10997e950de7e88704aa7 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Fri, 28 Oct 2016 20:14:38 -0700
Subject: [PATCH] [SPARK-18164][SQL] ForeachSink should fail the Spark job if
 `process` throws exception

## What changes were proposed in this pull request?

Fixed the issue that ForeachSink didn't rethrow the exception.

## How was this patch tested?

The fixed unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15674 from zsxwing/foreach-sink-error.
---
 .../sql/execution/streaming/ForeachSink.scala |  7 ++-----
 .../streaming/ForeachSinkSuite.scala          | 19 ++++++++++++++-----
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 082664aa23..24f98b9211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
       }
     datasetWithIncrementalExecution.foreachPartition { iter =>
       if (writer.open(TaskContext.getPartitionId(), batchId)) {
-        var isFailed = false
         try {
           while (iter.hasNext) {
             writer.process(iter.next())
           }
         } catch {
           case e: Throwable =>
-            isFailed = true
             writer.close(e)
+            throw e
         }
-        if (!isFailed) {
-          writer.close(null)
-        }
+        writer.close(null)
       } else {
         writer.close(null)
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 7928b8e877..9e05921611 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -23,8 +23,9 @@ import scala.collection.mutable
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.ForeachWriter
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
@@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
     }
   }
 
-  test("foreach with error") {
+  testQuietly("foreach with error") {
     withTempDir { checkpointDir =>
       val input = MemoryStream[Int]
       val query = input.toDS().repartition(1).writeStream
@@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
           }
         }).start()
       input.addData(1, 2, 3, 4)
-      query.processAllAvailable()
+
+      // Error in `process` should fail the Spark job
+      val e = intercept[StreamingQueryException] {
+        query.processAllAvailable()
+      }
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getCause.getMessage === "error")
+      assert(query.isActive === false)
 
       val allEvents = ForeachSinkSuite.allEvents()
       assert(allEvents.size === 1)
       assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0))
-      assert(allEvents(0)(1) ===  ForeachSinkSuite.Process(value = 1))
+      assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
+
+      // `close` should be called with the error
       val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]
       assert(errorEvent.error.get.isInstanceOf[RuntimeException])
       assert(errorEvent.error.get.getMessage === "error")
-      query.stop()
     }
   }
 }
-- 
GitLab