Skip to content
Snippets Groups Projects
Commit 44071910 authored by Josh Rosen's avatar Josh Rosen
Browse files

Throw exception if task result exceeds Akka frame size.

This partially addresses SPARK-747.
parent 63e1999f
No related branches found
No related tags found
No related merge requests found
......@@ -72,6 +72,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
......@@ -113,6 +114,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
if (serializedResult.limit >= (akkaFrameSize - 1024)) {
throw new SparkException("Result for " + taskId + " exceeded Akka frame size")
}
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment