Skip to content
Snippets Groups Projects
Commit 59cccbda authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception

## What changes were proposed in this pull request?

Fixed the issue that ForeachSink didn't rethrow the exception.

## How was this patch tested?

The fixed unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15674 from zsxwing/foreach-sink-error.
parent ac26e9cf
No related branches found
No related tags found
No related merge requests found
......@@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
}
datasetWithIncrementalExecution.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
var isFailed = false
try {
while (iter.hasNext) {
writer.process(iter.next())
}
} catch {
case e: Throwable =>
isFailed = true
writer.close(e)
throw e
}
if (!isFailed) {
writer.close(null)
}
writer.close(null)
} else {
writer.close(null)
}
......
......@@ -23,8 +23,9 @@ import scala.collection.mutable
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
......@@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
}
}
test("foreach with error") {
testQuietly("foreach with error") {
withTempDir { checkpointDir =>
val input = MemoryStream[Int]
val query = input.toDS().repartition(1).writeStream
......@@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
}
}).start()
input.addData(1, 2, 3, 4)
query.processAllAvailable()
// Error in `process` should fail the Spark job
val e = intercept[StreamingQueryException] {
query.processAllAvailable()
}
assert(e.getCause.isInstanceOf[SparkException])
assert(e.getCause.getCause.getMessage === "error")
assert(query.isActive === false)
val allEvents = ForeachSinkSuite.allEvents()
assert(allEvents.size === 1)
assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0))
assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
// `close` should be called with the error
val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]
assert(errorEvent.error.get.isInstanceOf[RuntimeException])
assert(errorEvent.error.get.getMessage === "error")
query.stop()
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment