Skip to content
Snippets Groups Projects
Commit 03459545 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

SPARK-738: Spark should detect and squash nonserializable exceptions

parent 63e1999f
No related branches found
No related tags found
No related merge requests found
package spark.executor
import java.io.{File, FileOutputStream}
import java.io.{NotSerializableException, File, FileOutputStream}
import java.net.{URI, URL, URLClassLoader}
import java.util.concurrent._
......@@ -123,7 +123,19 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
case t: Throwable => {
val reason = ExceptionFailure(t)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
val serReason =
try {
ser.serialize(reason)
}
catch {
case e: NotSerializableException => {
val message = "Spark caught unserializable exn: " + t.toString
val throwable = new Exception(message)
throwable.setStackTrace(t.getStackTrace)
ser.serialize(new ExceptionFailure(throwable))
}
}
context.statusUpdate(taskId, TaskState.FAILED, serReason)
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
......
......@@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]"
......@@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
System.clearProperty("spark.storage.memoryFraction")
}
test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
// and the scheduler throws a SparkException.
// numSlaves must be less than numPartitions
val numSlaves = 3
val numPartitions = 10
sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
val data = sc.parallelize(1 to 100, numPartitions).map(x => (x, x)).
map(x => throw new NotSerializableExn(new NotSerializableClass))
intercept[SparkException] {
data.count()
}
resetSparkContext()
}
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment