From 7536e2849df6d63587fbf16b4ecb5db06fed7125 Mon Sep 17 00:00:00 2001
From: Steve Loughran <stevel@hortonworks.com>
Date: Thu, 13 Apr 2017 15:30:44 -0500
Subject: [PATCH] [SPARK-20038][SQL]
 FileFormatWriter.ExecuteWriteTask.releaseResources() implementations to be
 re-entrant

## What changes were proposed in this pull request?

have the`FileFormatWriter.ExecuteWriteTask.releaseResources()` implementations  set `currentWriter=null` in a finally clause. This guarantees that if the first call to `currentWriter()` throws an exception, the second releaseResources() call made during the task cancel process will not trigger a second attempt to close the stream.

## How was this patch tested?

Tricky. I've been fixing the underlying cause when I saw the problem [HADOOP-14204](https://issues.apache.org/jira/browse/HADOOP-14204), but SPARK-10109 shows I'm not the first to have seen this. I can't replicate it locally any more, my code no longer being broken.

code review, however, should be straightforward

Author: Steve Loughran <stevel@hortonworks.com>

Closes #17364 from steveloughran/stevel/SPARK-20038-close.
---
 .../execution/datasources/FileFormatWriter.scala   | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index bda64d4b91..4ec09bff42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -324,8 +324,11 @@ object FileFormatWriter extends Logging {
 
     override def releaseResources(): Unit = {
       if (currentWriter != null) {
-        currentWriter.close()
-        currentWriter = null
+        try {
+          currentWriter.close()
+        } finally {
+          currentWriter = null
+        }
       }
     }
   }
@@ -459,8 +462,11 @@ object FileFormatWriter extends Logging {
 
     override def releaseResources(): Unit = {
       if (currentWriter != null) {
-        currentWriter.close()
-        currentWriter = null
+        try {
+          currentWriter.close()
+        } finally {
+          currentWriter = null
+        }
       }
     }
   }
-- 
GitLab