diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index ca793eb4021a64dccddfb73c6ada2d5feaeeed59..8140cba0841e519b6a0269638cb2207e8adf6b15 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -28,3 +28,5 @@ private[spark] case class ExceptionFailure( extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason + +private[spark] case class TaskResultTooBigFailure() extends TaskEndReason diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index da20b8454441c737c34cd6c561702bc45db367e2..890938d48b667ee769097e2c6044334a96abb80c 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -72,6 +72,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env) + private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size") // Start worker thread pool val threadPool = new ThreadPoolExecutor( @@ -113,6 +114,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) val serializedResult = ser.serialize(result) logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) + if (serializedResult.limit >= (akkaFrameSize - 1024)) { + context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure())) + return + } context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) } catch { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index c69f3bdb7f56476b1801e12c1b9f005996399ac8..db5869db632d8d2454a4f9033cbbaa5779f01544 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -542,6 +542,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe sched.taskSetFinished(this) return + case taskResultTooBig: TaskResultTooBigFailure => + logInfo("Loss was due to task %s result exceeding Akka frame size;" + + "aborting job".format(tid)) + abort("Task %s result exceeded Akka frame size".format(tid)) + return + case ef: ExceptionFailure => val key = ef.description val now = System.currentTimeMillis diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 06a94ed24c0b9da703682d7924000ba119debb2a..068bb6ca4f42077fbff02e4a9589b21a57f7de85 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -301,6 +301,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } } + + test("job should fail if TaskResult exceeds Akka frame size") { + // We must use local-cluster mode since results are returned differently + // when running under LocalScheduler: + sc = new SparkContext("local-cluster[1,1,512]", "test") + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)} + val exception = intercept[SparkException] { + rdd.reduce((x, y) => x) + } + exception.getMessage should endWith("result exceeded Akka frame size") + } } object DistributedSuite {