diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 344face5e698689c301dade2b026ed8f0186aa5f..f9061b1c715ddd3b4a51e2f3cf05944dcb93eff5 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -1,6 +1,6 @@ package spark.executor -import java.io.{File, FileOutputStream} +import java.io.{NotSerializableException, File, FileOutputStream} import java.net.{URI, URL, URLClassLoader} import java.util.concurrent._ @@ -123,7 +123,19 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert case t: Throwable => { val reason = ExceptionFailure(t) - context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + val serReason = + try { + ser.serialize(reason) + } + catch { + case e: NotSerializableException => { + val message = "Spark caught unserializable exn: " + t.toString + val throwable = new Exception(message) + throwable.setStackTrace(t.getStackTrace) + ser.serialize(new ExceptionFailure(throwable)) + } + } + context.statusUpdate(taskId, TaskState.FAILED, serReason) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 4df3bb5b6793b10af3bdea91c9083b192e99e5c6..8ab0f2cfa27095ae34fb02661ffba05761e96d57 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ import storage.{GetBlock, BlockManagerWorker, StorageLevel} +class NotSerializableClass +class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {} + class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { val clusterUrl = "local-cluster[2,1,512]" @@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter System.clearProperty("spark.storage.memoryFraction") } + test("task throws not serializable exception") { + // Ensures that executors do not crash when an exn is not serializable. If executors crash, + // this test will hang. Correct behavior is that executors don't crash but fail tasks + // and the scheduler throws a SparkException. + + // numSlaves must be less than numPartitions + val numSlaves = 3 + val numPartitions = 10 + + sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test") + val data = sc.parallelize(1 to 100, numPartitions).map(x => (x, x)). + map(x => throw new NotSerializableExn(new NotSerializableClass)) + intercept[SparkException] { + data.count() + } + resetSparkContext() + } + test("local-cluster format") { sc = new SparkContext("local-cluster[2,1,512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2)