diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 2b99b8a5af25045cc8353c99861d3cee01f12ac0..51b3e4d5e09362cd1dfff9e9edfa85e90fac99a3 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.TaskCompletionListener +import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener} /** @@ -41,7 +41,7 @@ class TaskContext( val attemptId: Long, val runningLocally: Boolean = false, private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty) - extends Serializable { + extends Serializable with Logging { @deprecated("use partitionId", "0.8.1") def splitId = partitionId @@ -103,8 +103,20 @@ class TaskContext( /** Marks the task as completed and triggers the listeners. */ private[spark] def markTaskCompleted(): Unit = { completed = true + val errorMsgs = new ArrayBuffer[String](2) // Process complete callbacks in the reverse order of registration - onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) } + onCompleteCallbacks.reverse.foreach { listener => + try { + listener.onTaskCompletion(this) + } catch { + case e: Throwable => + errorMsgs += e.getMessage + logError("Error in TaskCompletionListener", e) + } + } + if (errorMsgs.nonEmpty) { + throw new TaskCompletionListenerException(errorMsgs) + } } /** Marks the task for interruption, i.e. cancellation. */ diff --git a/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala new file mode 100644 index 0000000000000000000000000000000000000000..f64e069cd1724a4474b3ae7c0185289615766956 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * Exception thrown when there is an exception in + * executing the callback in TaskCompletionListener. + */ +private[spark] +class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception { + + override def getMessage: String = { + if (errorMessages.size == 1) { + errorMessages.head + } else { + errorMessages.zipWithIndex.map { case (msg, i) => s"Exception $i: $msg" }.mkString("\n") + } + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index db2ad829a48f90b3b1bb95dbb941c7f5d6621d5f..faba5508c906c394b413247e2fc8de93af82a0d4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -17,16 +17,20 @@ package org.apache.spark.scheduler +import org.mockito.Mockito._ +import org.mockito.Matchers.any + import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.util.Utils +import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener} + class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - test("Calls executeOnCompleteCallbacks after failure") { + test("calls TaskCompletionListener after failure") { TaskContextSuite.completed = false sc = new SparkContext("local", "test") val rdd = new RDD[String](sc, List()) { @@ -45,6 +49,20 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte } assert(TaskContextSuite.completed === true) } + + test("all TaskCompletionListeners should be called even if some fail") { + val context = new TaskContext(0, 0, 0) + val listener = mock(classOf[TaskCompletionListener]) + context.addTaskCompletionListener(_ => throw new Exception("blah")) + context.addTaskCompletionListener(listener) + context.addTaskCompletionListener(_ => throw new Exception("blah")) + + intercept[TaskCompletionListenerException] { + context.markTaskCompleted() + } + + verify(listener, times(1)).onTaskCompletion(any()) + } } private object TaskContextSuite {