From 731462a04f8e33ac507ad19b4270c783a012a33e Mon Sep 17 00:00:00 2001
From: Xianyang Liu <xianyang.liu@intel.com>
Date: Thu, 25 May 2017 15:47:59 +0800
Subject: [PATCH] [SPARK-20250][CORE] Improper OOM error when a task been
 killed while spilling data

## What changes were proposed in this pull request?

Currently, when a task is calling spill() but it receives a killing request from driver (e.g., speculative task), the `TaskMemoryManager` will throw an `OOM` exception.  And we don't catch `Fatal` exception when a error caused by `Thread.interrupt`. So for `ClosedByInterruptException`, we should throw `RuntimeException` instead of `OutOfMemoryError`.

https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK

## How was this patch tested?

Existing unit tests.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #18090 from ConeyLiu/SPARK-20250.
---
 .../java/org/apache/spark/memory/TaskMemoryManager.java  | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 5f91411749..761ba9de65 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -19,6 +19,7 @@ package org.apache.spark.memory;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -184,6 +185,10 @@ public class TaskMemoryManager {
                 break;
               }
             }
+          } catch (ClosedByInterruptException e) {
+            // This called by user to kill a task (e.g: speculative task).
+            logger.error("error while calling spill() on " + c, e);
+            throw new RuntimeException(e.getMessage());
           } catch (IOException e) {
             logger.error("error while calling spill() on " + c, e);
             throw new OutOfMemoryError("error while calling spill() on " + c + " : "
@@ -201,6 +206,10 @@ public class TaskMemoryManager {
               Utils.bytesToString(released), consumer);
             got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
           }
+        } catch (ClosedByInterruptException e) {
+          // This called by user to kill a task (e.g: speculative task).
+          logger.error("error while calling spill() on " + consumer, e);
+          throw new RuntimeException(e.getMessage());
         } catch (IOException e) {
           logger.error("error while calling spill() on " + consumer, e);
           throw new OutOfMemoryError("error while calling spill() on " + consumer + " : "
-- 
GitLab