diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 2f7e6d98f8490840965359f2b6056ce3a2f7c474..35b31f45a7cb32d40dacf4e0bfa4db54c9bf8f35 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -32,10 +32,22 @@ import spark.storage.{BlockManager, BlockManagerMaster} import spark.util.{MetadataCleaner, TimeStampedHashMap} /** - * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for - * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal - * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster - * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). + * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of + * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a + * minimal schedule to run the job. It then submits stages as TaskSets to an underlying + * TaskScheduler implementation that runs them on the cluster. + * + * In addition to coming up with a DAG of stages, this class also determines the preferred + * locations to run each task on, based on the current cache status, and passes these to the + * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being + * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are + * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task + * a small number of times before cancelling the whole stage. + * + * THREADING: This class runs all its logic in a single thread executing the run() method, to which + * events are submitted using a synchonized queue (eventQueue). The public API methods, such as + * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods + * should be private. */ private[spark] class DAGScheduler( diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index e88edc5b2ac3f7689f19f3d95f468e149ff99d41..679d899b472a280805f29abcb901548ddef253c8 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -33,7 +33,17 @@ import java.util.{TimerTask, Timer} /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call - * start(), then submit task sets through the runTasks method. + * initialize() and start(), then submit task sets through the runTasks method. + * + * This class can work with multiple types of clusters by acting through a SchedulerBackend. + * It handles common logic, like determining a scheduling order across jobs, waking up to launch + * speculative tasks, etc. + * + * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple + * threads, so it needs locks in public API methods to maintain its state. In addition, some + * SchedulerBackends sycnchronize on themselves when they want to send events here, and then + * acquire a lock on us, so we need to make sure that we don't try to lock the backend while + * we are holding a lock on ourselves. */ private[spark] class ClusterScheduler(val sc: SparkContext) extends TaskScheduler diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index 91de25254c073077493c792ee4e86831209016f9..1d57732f5dd4880e6788977f3547aebc17367dfa 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -37,7 +37,14 @@ import spark.TaskResultTooBigFailure /** - * Schedules the tasks within a single TaskSet in the ClusterScheduler. + * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of + * the status of each task, retries tasks if they fail (up to a limited number of times), and + * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces + * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, + * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). + * + * THREADING: This class is designed to only be called from code with a lock on the + * ClusterScheduler (e.g. its event handlers). It should not be called from other threads. */ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {