Skip to content
Snippets Groups Projects
Commit 22014e6f authored by Jason Moore's avatar Jason Moore Committed by Andrew Or
Browse files

[SPARK-14357][CORE] Properly handle the root cause being a commit denied exception

## What changes were proposed in this pull request?

When deciding whether a CommitDeniedException caused a task to fail, consider the root cause of the Exception.

## How was this patch tested?

Added a test suite for the component that extracts the root cause of the error.
Made a distribution after cherry-picking this commit to branch-1.6 and used to run our Spark application that would quite often fail due to the CommitDeniedException.

Author: Jason Moore <jasonmoore2k@outlook.com>

Closes #12228 from jasonmoore2k/SPARK-14357.
parent 2c95e4e9
No related branches found
No related tags found
Loading
...@@ -321,7 +321,7 @@ private[spark] class Executor( ...@@ -321,7 +321,7 @@ private[spark] class Executor(
logInfo(s"Executor killed $taskName (TID $taskId)") logInfo(s"Executor killed $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case cDE: CommitDeniedException => case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
/**
* Extractor Object for pulling out the root cause of an error.
* If the error contains no cause, it will return the error itself.
*
* Usage:
* try {
* ...
* } catch {
* case CausedBy(ex: CommitDeniedException) => ...
* }
*/
private[spark] object CausedBy {
def unapply(e: Throwable): Option[Throwable] = {
Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import org.apache.spark.SparkFunSuite
class CausedBySuite extends SparkFunSuite {
test("For an error without a cause, should return the error") {
val error = new Exception
val causedBy = error match {
case CausedBy(e) => e
}
assert(causedBy === error)
}
test("For an error with a cause, should return the cause of the error") {
val cause = new Exception
val error = new Exception(cause)
val causedBy = error match {
case CausedBy(e) => e
}
assert(causedBy === cause)
}
test("For an error with a cause that itself has a cause, return the root cause") {
val causeOfCause = new Exception
val cause = new Exception(causeOfCause)
val error = new Exception(cause)
val causedBy = error match {
case CausedBy(e) => e
}
assert(causedBy === causeOfCause)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment