Skip to content
Snippets Groups Projects
Commit 330ada17 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #207 from henrydavidge/master

Log a warning if a task's serialized size is very big

As per Reynold's instructions, we now create a warning level log entry if a task's serialized size is too big. "Too big" is currently defined as 100kb. This warning message is generated at most once for each stage.
parents 615213fb 57579934
No related branches found
No related tags found
No related merge requests found
......@@ -110,6 +110,9 @@ class DAGScheduler(
// resubmit failed stages
val POLL_TIMEOUT = 10L
// Warns the user if a stage contains a task with size greater than this value (in KB)
val TASK_SIZE_TO_WARN = 100
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
override def preStart() {
context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
......@@ -430,6 +433,18 @@ class DAGScheduler(
handleExecutorLost(execId)
case BeginEvent(task, taskInfo) =>
for (
job <- idToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) {
stageInfo.emittedTaskSizeWarning = true
logWarning(("Stage %d (%s) contains a task of very large " +
"size (%d KB). The maximum recommended task size is %d KB.").format(
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
}
}
listenerBus.post(SparkListenerTaskStart(task, taskInfo))
case GettingResultEvent(task, taskInfo) =>
......
......@@ -33,4 +33,5 @@ class StageInfo(
val name = stage.name
val numPartitions = stage.numPartitions
val numTasks = stage.numTasks
var emittedTaskSizeWarning = false
}
......@@ -46,6 +46,8 @@ class TaskInfo(
var failed = false
var serializedSize: Int = 0
def markGettingResult(time: Long = System.currentTimeMillis) {
gettingResultTime = time
}
......
......@@ -377,6 +377,7 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
info.serializedSize = serializedTask.limit
if (taskAttempts(index).size == 1)
taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment