From 22014e6fb919a35c31d852b7c2f5b7eb05751208 Mon Sep 17 00:00:00 2001
From: Jason Moore <jasonmoore2k@outlook.com>
Date: Sat, 9 Apr 2016 23:34:57 -0700
Subject: [PATCH] [SPARK-14357][CORE] Properly handle the root cause being a
 commit denied exception

## What changes were proposed in this pull request?

When deciding whether a CommitDeniedException caused a task to fail, consider the root cause of the Exception.

## How was this patch tested?

Added a test suite for the component that extracts the root cause of the error.
Made a distribution after cherry-picking this commit to branch-1.6 and used to run our Spark application that would quite often fail due to the CommitDeniedException.

Author: Jason Moore <jasonmoore2k@outlook.com>

Closes #12228 from jasonmoore2k/SPARK-14357.
---
 .../org/apache/spark/executor/Executor.scala  |  2 +-
 .../org/apache/spark/util/CausedBy.scala      | 36 ++++++++++++
 .../org/apache/spark/util/CausedBySuite.scala | 56 +++++++++++++++++++
 3 files changed, 93 insertions(+), 1 deletion(-)
 create mode 100644 core/src/main/scala/org/apache/spark/util/CausedBy.scala
 create mode 100644 core/src/test/scala/org/apache/spark/util/CausedBySuite.scala

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 09c5733565..afa4d6093a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -321,7 +321,7 @@ private[spark] class Executor(
           logInfo(s"Executor killed $taskName (TID $taskId)")
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
 
-        case cDE: CommitDeniedException =>
+        case CausedBy(cDE: CommitDeniedException) =>
           val reason = cDE.toTaskEndReason
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
diff --git a/core/src/main/scala/org/apache/spark/util/CausedBy.scala b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
new file mode 100644
index 0000000000..73df446d98
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Extractor Object for pulling out the root cause of an error.
+ * If the error contains no cause, it will return the error itself.
+ *
+ * Usage:
+ * try {
+ *   ...
+ * } catch {
+ *   case CausedBy(ex: CommitDeniedException) => ...
+ * }
+ */
+private[spark] object CausedBy {
+
+  def unapply(e: Throwable): Option[Throwable] = {
+    Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
new file mode 100644
index 0000000000..4a80e3f1f4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkFunSuite
+
+class CausedBySuite extends SparkFunSuite {
+
+  test("For an error without a cause, should return the error") {
+    val error = new Exception
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === error)
+  }
+
+  test("For an error with a cause, should return the cause of the error") {
+    val cause = new Exception
+    val error = new Exception(cause)
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === cause)
+  }
+
+  test("For an error with a cause that itself has a cause, return the root cause") {
+    val causeOfCause = new Exception
+    val cause = new Exception(causeOfCause)
+    val error = new Exception(cause)
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === causeOfCause)
+  }
+}
-- 
GitLab