From b8e46b6074e5ecc1ae4ed22ea32983597c14b683 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@eecs.berkeley.edu>
Date: Thu, 16 May 2013 01:52:40 -0700
Subject: [PATCH] Abort job if result exceeds Akka frame size; add test.

---
 core/src/main/scala/spark/TaskEndReason.scala       |  2 ++
 core/src/main/scala/spark/executor/Executor.scala   |  3 ++-
 .../spark/scheduler/cluster/TaskSetManager.scala    |  6 ++++++
 core/src/test/scala/spark/DistributedSuite.scala    | 13 +++++++++++++
 4 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 420c54bc9a..c5da453562 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -20,3 +20,5 @@ case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, re
 private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
 
 private[spark] case class OtherFailure(message: String) extends TaskEndReason
+
+private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
\ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 718f0ff5bc..9ec4eb6e88 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -115,7 +115,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
         val serializedResult = ser.serialize(result)
         logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
         if (serializedResult.limit >= (akkaFrameSize - 1024)) {
-          throw new SparkException("Result for " + taskId + " exceeded Akka frame size")
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
+          return
         }
         context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
         logInfo("Finished task ID " + taskId)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 27e713e2c4..df7f0eafff 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -492,6 +492,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
             sched.taskSetFinished(this)
             return
 
+          case taskResultTooBig: TaskResultTooBigFailure =>
+            logInfo("Loss was due to task %s result exceeding Akka frame size;" +
+                    "aborting job".format(tid))
+            abort("Task %s result exceeded Akka frame size".format(tid))
+            return
+
           case ef: ExceptionFailure =>
             val key = ef.exception.toString
             val now = System.currentTimeMillis
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 4df3bb5b67..9f58999cbe 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -277,6 +277,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       }
     }
   }
+
+  test("job should fail if TaskResult exceeds Akka frame size") {
+    // We must use local-cluster mode since results are returned differently
+    // when running under LocalScheduler:
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    val akkaFrameSize =
+      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+    val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
+    val exception = intercept[SparkException] {
+      rdd.reduce((x, y) => x)
+    }
+    exception.getMessage should endWith("result exceeded Akka frame size")
+  }
 }
 
 object DistributedSuite {
-- 
GitLab