Skip to content
Snippets Groups Projects
Commit 659f635d authored by Devaraj K's avatar Devaraj K Committed by Shixiong Zhu
Browse files

[SPARK-14234][CORE] Executor crashes for TaskRunner thread interruption

## What changes were proposed in this pull request?
Resetting the task interruption status before updating the task status.

## How was this patch tested?
I have verified it manually by running multiple applications, Executor doesn't crash and updates the status to the driver without any exceptions with the patch changes.

Author: Devaraj K <devaraj@apache.org>

Closes #12031 from devaraj-kavali/SPARK-14234.
parent f5623b46
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,7 @@ import java.net.URL ...@@ -23,6 +23,7 @@ import java.net.URL
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Properties import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.{ArrayBuffer, HashMap}
...@@ -194,6 +195,10 @@ private[spark] class Executor( ...@@ -194,6 +195,10 @@ private[spark] class Executor(
/** Whether this task has been killed. */ /** Whether this task has been killed. */
@volatile private var killed = false @volatile private var killed = false
/** Whether this task has been finished. */
@GuardedBy("TaskRunner.this")
private var finished = false
/** How much the JVM process has spent in GC when the task starts to run. */ /** How much the JVM process has spent in GC when the task starts to run. */
@volatile var startGCTime: Long = _ @volatile var startGCTime: Long = _
...@@ -207,10 +212,25 @@ private[spark] class Executor( ...@@ -207,10 +212,25 @@ private[spark] class Executor(
logInfo(s"Executor is trying to kill $taskName (TID $taskId)") logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
killed = true killed = true
if (task != null) { if (task != null) {
task.kill(interruptThread) synchronized {
if (!finished) {
task.kill(interruptThread)
}
}
} }
} }
/**
* Set the finished flag to true and clear the current thread's interrupt status
*/
private def setTaskFinishedAndClearInterruptStatus(): Unit = synchronized {
this.finished = true
// SPARK-14234 - Reset the interrupted status of the thread to avoid the
// ClosedByInterruptException during execBackend.statusUpdate which causes
// Executor to crash
Thread.interrupted()
}
override def run(): Unit = { override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis() val deserializeStartTime = System.currentTimeMillis()
...@@ -336,14 +356,17 @@ private[spark] class Executor( ...@@ -336,14 +356,17 @@ private[spark] class Executor(
} catch { } catch {
case ffe: FetchFailedException => case ffe: FetchFailedException =>
val reason = ffe.toTaskEndReason val reason = ffe.toTaskEndReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case _: TaskKilledException | _: InterruptedException if task.killed => case _: TaskKilledException | _: InterruptedException if task.killed =>
logInfo(s"Executor killed $taskName (TID $taskId)") logInfo(s"Executor killed $taskName (TID $taskId)")
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case CausedBy(cDE: CommitDeniedException) => case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason val reason = cDE.toTaskEndReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case t: Throwable => case t: Throwable =>
...@@ -373,6 +396,7 @@ private[spark] class Executor( ...@@ -373,6 +396,7 @@ private[spark] class Executor(
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
} }
} }
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
// Don't forcibly exit unless the exception was inherently fatal, to avoid // Don't forcibly exit unless the exception was inherently fatal, to avoid
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment