From d33b8a2a0ffd6d085cbd8de22863a1f35c106270 Mon Sep 17 00:00:00 2001
From: Reynold Xin <reynoldx@gmail.com>
Date: Tue, 23 Jul 2013 20:28:39 -0700
Subject: [PATCH] Added comments on task closure serialization.

---
 .../scala/spark/scheduler/cluster/ClusterTaskSetManager.scala   | 2 ++
 .../main/scala/spark/scheduler/local/LocalTaskSetManager.scala  | 2 ++
 2 files changed, 4 insertions(+)

diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index d934293b70..f64818876b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -515,6 +515,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
           }
           // Serialize and return the task
           val startTime = System.currentTimeMillis
+          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+          // we assume the task can be serialized without exceptions.
           val serializedTask = Task.serializeWithDependencies(
             task, sched.sc.addedFiles, sched.sc.addedJars, ser)
           val timeTaken = System.currentTimeMillis - startTime
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index bbce9eda64..a9b49cad0e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -114,6 +114,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
           val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
             TaskLocality.NODE_LOCAL)
           taskInfos(taskId) = info
+          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+          // we assume the task can be serialized without exceptions.
           val bytes = Task.serializeWithDependencies(
             task, sched.sc.addedFiles, sched.sc.addedJars, ser)
           logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
-- 
GitLab