diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 09c57335650c0c9df35830b29d3b1a8cc9203f8f..afa4d6093a7f27cf9f46b65cb3f3e89c55a99cd7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -321,7 +321,7 @@ private[spark] class Executor(
           logInfo(s"Executor killed $taskName (TID $taskId)")
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
 
-        case cDE: CommitDeniedException =>
+        case CausedBy(cDE: CommitDeniedException) =>
           val reason = cDE.toTaskEndReason
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
diff --git a/core/src/main/scala/org/apache/spark/util/CausedBy.scala b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
new file mode 100644
index 0000000000000000000000000000000000000000..73df446d981cba9cc5ecf972afeb074d41577e53
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Extractor Object for pulling out the root cause of an error.
+ * If the error contains no cause, it will return the error itself.
+ *
+ * Usage:
+ * try {
+ *   ...
+ * } catch {
+ *   case CausedBy(ex: CommitDeniedException) => ...
+ * }
+ */
+private[spark] object CausedBy {
+
+  def unapply(e: Throwable): Option[Throwable] = {
+    Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4a80e3f1f452d1b565b8b55382ae77a50ace2960
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkFunSuite
+
+class CausedBySuite extends SparkFunSuite {
+
+  test("For an error without a cause, should return the error") {
+    val error = new Exception
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === error)
+  }
+
+  test("For an error with a cause, should return the cause of the error") {
+    val cause = new Exception
+    val error = new Exception(cause)
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === cause)
+  }
+
+  test("For an error with a cause that itself has a cause, return the root cause") {
+    val causeOfCause = new Exception
+    val cause = new Exception(causeOfCause)
+    val error = new Exception(cause)
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === causeOfCause)
+  }
+}