diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 420c54bc9a0d4b56f0b8c466f77de0a15a0f8be9..c5da4535627cf57774e5158b797cf9f66309b562 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -20,3 +20,5 @@ case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, re
 private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
 
 private[spark] case class OtherFailure(message: String) extends TaskEndReason
+
+private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
\ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 718f0ff5bcf8d3282562edc269af9f53161912a2..9ec4eb6e88c4485b1855ef0bb74b31156206133f 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -115,7 +115,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
         val serializedResult = ser.serialize(result)
         logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
         if (serializedResult.limit >= (akkaFrameSize - 1024)) {
-          throw new SparkException("Result for " + taskId + " exceeded Akka frame size")
+          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
+          return
         }
         context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
         logInfo("Finished task ID " + taskId)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 27e713e2c43dd5d0629f37cf22cb46d47a012ed1..df7f0eafffef17bb9383a16fab2df7af210a69f7 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -492,6 +492,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
             sched.taskSetFinished(this)
             return
 
+          case taskResultTooBig: TaskResultTooBigFailure =>
+            logInfo("Loss was due to task %s result exceeding Akka frame size;" +
+                    "aborting job".format(tid))
+            abort("Task %s result exceeded Akka frame size".format(tid))
+            return
+
           case ef: ExceptionFailure =>
             val key = ef.exception.toString
             val now = System.currentTimeMillis
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 4df3bb5b6793b10af3bdea91c9083b192e99e5c6..9f58999cbe6af697cd671db42fd0fa030b8d8d45 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -277,6 +277,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       }
     }
   }
+
+  test("job should fail if TaskResult exceeds Akka frame size") {
+    // We must use local-cluster mode since results are returned differently
+    // when running under LocalScheduler:
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    val akkaFrameSize =
+      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+    val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
+    val exception = intercept[SparkException] {
+      rdd.reduce((x, y) => x)
+    }
+    exception.getMessage should endWith("result exceeded Akka frame size")
+  }
 }
 
 object DistributedSuite {