Skip to content
Snippets Groups Projects
Commit 57579934 authored by hhd's avatar hhd
Browse files

Emit warning when task size > 100KB

parent 0e2109dd
No related branches found
No related tags found
No related merge requests found
......@@ -110,6 +110,9 @@ class DAGScheduler(
// resubmit failed stages
val POLL_TIMEOUT = 10L
// Warns the user if a stage contains a task with size greater than this value (in KB)
val TASK_SIZE_TO_WARN = 100
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
override def preStart() {
context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
......@@ -430,6 +433,18 @@ class DAGScheduler(
handleExecutorLost(execId)
case BeginEvent(task, taskInfo) =>
for (
job <- idToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) {
stageInfo.emittedTaskSizeWarning = true
logWarning(("Stage %d (%s) contains a task of very large " +
"size (%d KB). The maximum recommended task size is %d KB.").format(
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
}
}
listenerBus.post(SparkListenerTaskStart(task, taskInfo))
case GettingResultEvent(task, taskInfo) =>
......
......@@ -33,4 +33,5 @@ class StageInfo(
val name = stage.name
val numPartitions = stage.numPartitions
val numTasks = stage.numTasks
var emittedTaskSizeWarning = false
}
......@@ -46,6 +46,8 @@ class TaskInfo(
var failed = false
var serializedSize: Int = 0
def markGettingResult(time: Long = System.currentTimeMillis) {
gettingResultTime = time
}
......
......@@ -377,6 +377,7 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
info.serializedSize = serializedTask.limit
if (taskAttempts(index).size == 1)
taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment