From f0d50fd547c247df06470d68cd5245e3027e89a2 Mon Sep 17 00:00:00 2001
From: Tyson Condie <tcondie@gmail.com>
Date: Thu, 9 Mar 2017 23:02:13 -0800
Subject: [PATCH] [SPARK-19891][SS] Await Batch Lock notified on stream
 execution exit

## What changes were proposed in this pull request?

We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.

## How was this patch tested?

Current tests that throw exceptions at runtime will finish faster as a result of this update.

zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17231 from tcondie/kafka-writer.

(cherry picked from commit 501b7111997bc74754663348967104181b43319b)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
---
 .../spark/sql/execution/streaming/StreamExecution.scala    | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index dd80a28b52..b380db0f9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -348,6 +348,13 @@ class StreamExecution(
           }
         }
       } finally {
+        awaitBatchLock.lock()
+        try {
+          // Wake up any threads that are waiting for the stream to progress.
+          awaitBatchLockCondition.signalAll()
+        } finally {
+          awaitBatchLock.unlock()
+        }
         terminationLatch.countDown()
       }
     }
-- 
GitLab