Skip to content
Snippets Groups Projects
Commit b8e46b60 authored by Josh Rosen's avatar Josh Rosen
Browse files

Abort job if result exceeds Akka frame size; add test.

parent 44071910
No related branches found
No related tags found
No related merge requests found
...@@ -20,3 +20,5 @@ case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, re ...@@ -20,3 +20,5 @@ case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, re
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason
private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
\ No newline at end of file
...@@ -115,7 +115,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert ...@@ -115,7 +115,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val serializedResult = ser.serialize(result) val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
if (serializedResult.limit >= (akkaFrameSize - 1024)) { if (serializedResult.limit >= (akkaFrameSize - 1024)) {
throw new SparkException("Result for " + taskId + " exceeded Akka frame size") context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
return
} }
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId) logInfo("Finished task ID " + taskId)
......
...@@ -492,6 +492,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe ...@@ -492,6 +492,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
sched.taskSetFinished(this) sched.taskSetFinished(this)
return return
case taskResultTooBig: TaskResultTooBigFailure =>
logInfo("Loss was due to task %s result exceeding Akka frame size;" +
"aborting job".format(tid))
abort("Task %s result exceeded Akka frame size".format(tid))
return
case ef: ExceptionFailure => case ef: ExceptionFailure =>
val key = ef.exception.toString val key = ef.exception.toString
val now = System.currentTimeMillis val now = System.currentTimeMillis
......
...@@ -277,6 +277,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter ...@@ -277,6 +277,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
} }
} }
} }
test("job should fail if TaskResult exceeds Akka frame size") {
// We must use local-cluster mode since results are returned differently
// when running under LocalScheduler:
sc = new SparkContext("local-cluster[1,1,512]", "test")
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
val exception = intercept[SparkException] {
rdd.reduce((x, y) => x)
}
exception.getMessage should endWith("result exceeded Akka frame size")
}
} }
object DistributedSuite { object DistributedSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment