Skip to content
Snippets Groups Projects
Commit 5912cc49 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #610 from JoshRosen/spark-747

Throw exception if TaskResult exceeds Akka frame size
parents 8d78c5f8 b8e46b60
No related branches found
No related tags found
No related merge requests found
......@@ -28,3 +28,5 @@ private[spark] case class ExceptionFailure(
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
......@@ -72,6 +72,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
......@@ -113,6 +114,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
if (serializedResult.limit >= (akkaFrameSize - 1024)) {
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
return
}
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
......
......@@ -542,6 +542,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
sched.taskSetFinished(this)
return
case taskResultTooBig: TaskResultTooBigFailure =>
logInfo("Loss was due to task %s result exceeding Akka frame size;" +
"aborting job".format(tid))
abort("Task %s result exceeded Akka frame size".format(tid))
return
case ef: ExceptionFailure =>
val key = ef.description
val now = System.currentTimeMillis
......
......@@ -301,6 +301,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
}
test("job should fail if TaskResult exceeds Akka frame size") {
// We must use local-cluster mode since results are returned differently
// when running under LocalScheduler:
sc = new SparkContext("local-cluster[1,1,512]", "test")
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
val exception = intercept[SparkException] {
rdd.reduce((x, y) => x)
}
exception.getMessage should endWith("result exceeded Akka frame size")
}
}
object DistributedSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment