From b7be05a203b3e2a307147ea0c6cb0dec03da82a2 Mon Sep 17 00:00:00 2001
From: erenavsarogullari <erenavsarogullari@gmail.com>
Date: Thu, 23 Mar 2017 17:20:52 -0700
Subject: [PATCH] [SPARK-19567][CORE][SCHEDULER] Support some Schedulable
 variables immutability and access

## What changes were proposed in this pull request?
Some `Schedulable` Entities(`Pool` and `TaskSetManager`) variables need refactoring for _immutability_ and _access modifiers_ levels as follows:
- From `var` to `val` (if there is no requirement): This is important to support immutability as much as possible.
  - Sample => `Pool`: `weight`, `minShare`, `priority`, `name` and `taskSetSchedulingAlgorithm`.
- Access modifiers: Specially, `var`s access needs to be restricted from other parts of codebase to prevent potential side effects.
  - `TaskSetManager`: `tasksSuccessful`, `totalResultSize`, `calculatedTasks` etc...

This PR is related with #15604 and has been created seperatedly to keep patch content as isolated and to help the reviewers.

## How was this patch tested?
Added new UTs and existing UT coverage.

Author: erenavsarogullari <erenavsarogullari@gmail.com>

Closes #16905 from erenavsarogullari/SPARK-19567.
---
 .../org/apache/spark/scheduler/Pool.scala     | 12 +++----
 .../spark/scheduler/TaskSchedulerImpl.scala   | 19 ++++++----
 .../spark/scheduler/TaskSetManager.scala      | 36 ++++++++++---------
 .../spark/scheduler/DAGSchedulerSuite.scala   |  8 ++---
 .../ExternalClusterManagerSuite.scala         |  4 +--
 .../apache/spark/scheduler/PoolSuite.scala    |  6 ++++
 .../scheduler/TaskSchedulerImplSuite.scala    | 12 +++++--
 7 files changed, 58 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 2a69a6c5e8..1181371ab4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -37,24 +37,24 @@ private[spark] class Pool(
 
   val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
   val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
-  var weight = initWeight
-  var minShare = initMinShare
+  val weight = initWeight
+  val minShare = initMinShare
   var runningTasks = 0
-  var priority = 0
+  val priority = 0
 
   // A pool's stage id is used to break the tie in scheduling.
   var stageId = -1
-  var name = poolName
+  val name = poolName
   var parent: Pool = null
 
-  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+  private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
     schedulingMode match {
       case SchedulingMode.FAIR =>
         new FairSchedulingAlgorithm()
       case SchedulingMode.FIFO =>
         new FIFOSchedulingAlgorithm()
       case _ =>
-        val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
+        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
         throw new IllegalArgumentException(msg)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bfbcfa1aa3..8257c70d67 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -59,6 +59,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
   extends TaskScheduler with Logging
 {
 
+  import TaskSchedulerImpl._
+
   def this(sc: SparkContext) = {
     this(
       sc,
@@ -130,17 +132,18 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
   val mapOutputTracker = SparkEnv.get.mapOutputTracker
 
-  var schedulableBuilder: SchedulableBuilder = null
-  var rootPool: Pool = null
+  private var schedulableBuilder: SchedulableBuilder = null
   // default scheduler is FIFO
-  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
+  private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
   val schedulingMode: SchedulingMode = try {
     SchedulingMode.withName(schedulingModeConf.toUpperCase)
   } catch {
     case e: java.util.NoSuchElementException =>
-      throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
+      throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
   }
 
+  val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
+
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
 
@@ -150,8 +153,6 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
   def initialize(backend: SchedulerBackend) {
     this.backend = backend
-    // temporarily set rootPool name to empty
-    rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
         case SchedulingMode.FIFO =>
@@ -159,7 +160,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
         case SchedulingMode.FAIR =>
           new FairSchedulableBuilder(rootPool, conf)
         case _ =>
-          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
+          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
+          s"$schedulingMode")
       }
     }
     schedulableBuilder.buildPools()
@@ -683,6 +685,9 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
 
 private[spark] object TaskSchedulerImpl {
+
+  val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
+
   /**
    * Used to balance containers across hosts.
    *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 11633bef3c..fd93a1f5c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -78,16 +78,16 @@ private[spark] class TaskSetManager(
   private val numFailures = new Array[Int](numTasks)
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
-  var tasksSuccessful = 0
+  private[scheduler] var tasksSuccessful = 0
 
-  var weight = 1
-  var minShare = 0
+  val weight = 1
+  val minShare = 0
   var priority = taskSet.priority
   var stageId = taskSet.stageId
   val name = "TaskSet_" + taskSet.id
   var parent: Pool = null
-  var totalResultSize = 0L
-  var calculatedTasks = 0
+  private var totalResultSize = 0L
+  private var calculatedTasks = 0
 
   private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
     blacklistTracker.map { _ =>
@@ -95,7 +95,7 @@ private[spark] class TaskSetManager(
     }
   }
 
-  val runningTasksSet = new HashSet[Long]
+  private[scheduler] val runningTasksSet = new HashSet[Long]
 
   override def runningTasks: Int = runningTasksSet.size
 
@@ -105,7 +105,7 @@ private[spark] class TaskSetManager(
   // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
   // state in order to continue to track and account for the running tasks.
   // TODO: We should kill any running task attempts when the task set manager becomes a zombie.
-  var isZombie = false
+  private[scheduler] var isZombie = false
 
   // Set of pending tasks for each executor. These collections are actually
   // treated as stacks, in which new tasks are added to the end of the
@@ -129,17 +129,17 @@ private[spark] class TaskSetManager(
   private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
 
   // Set containing pending tasks with no locality preferences.
-  var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
+  private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
 
   // Set containing all pending tasks (also used as a stack, as above).
-  val allPendingTasks = new ArrayBuffer[Int]
+  private val allPendingTasks = new ArrayBuffer[Int]
 
   // Tasks that can be speculated. Since these will be a small fraction of total
   // tasks, we'll just hold them in a HashSet.
-  val speculatableTasks = new HashSet[Int]
+  private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task ID)
-  val taskInfos = new HashMap[Long, TaskInfo]
+  private val taskInfos = new HashMap[Long, TaskInfo]
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
@@ -148,7 +148,7 @@ private[spark] class TaskSetManager(
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
   // was printed. This should ideally be an LRU map that can drop old exceptions automatically.
-  val recentExceptions = HashMap[String, (Int, Long)]()
+  private val recentExceptions = HashMap[String, (Int, Long)]()
 
   // Figure out the current map output tracker epoch and set it on all tasks
   val epoch = sched.mapOutputTracker.getEpoch
@@ -169,20 +169,22 @@ private[spark] class TaskSetManager(
    * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
    * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
    */
-  var myLocalityLevels = computeValidLocalityLevels()
-  var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
+  private[scheduler] var myLocalityLevels = computeValidLocalityLevels()
+
+  // Time to wait at each level
+  private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)
 
   // Delay scheduling variables: we keep track of our current locality level and the time we
   // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
   // We then move down if we manage to launch a "more local" task.
-  var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
-  var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task at this level
+  private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
+  private var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task at this level
 
   override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null
 
   override def schedulingMode: SchedulingMode = SchedulingMode.NONE
 
-  var emittedTaskSizeWarning = false
+  private[scheduler] var emittedTaskSizeWarning = false
 
   /** Add a task to all the pending-task lists that it should be on. */
   private def addPendingTask(index: Int) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index dfad5db68a..a9389003d5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -110,8 +110,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
   val cancelledStages = new HashSet[Int]()
 
   val taskScheduler = new TaskScheduler() {
-    override def rootPool: Pool = null
-    override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+    override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+    override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
     override def start() = {}
     override def stop() = {}
     override def executorHeartbeatReceived(
@@ -542,8 +542,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     // make sure that the DAGScheduler doesn't crash when the TaskScheduler
     // doesn't implement killTask()
     val noKillTaskScheduler = new TaskScheduler() {
-      override def rootPool: Pool = null
-      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+      override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+      override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
       override def start(): Unit = {}
       override def stop(): Unit = {}
       override def submitTasks(taskSet: TaskSet): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index e87cebf0cf..37c124a726 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -73,8 +73,8 @@ private class DummySchedulerBackend extends SchedulerBackend {
 
 private class DummyTaskScheduler extends TaskScheduler {
   var initialized = false
-  override def rootPool: Pool = null
-  override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+  override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+  override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
   override def start(): Unit = {}
   override def stop(): Unit = {}
   override def submitTasks(taskSet: TaskSet): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index cddff3dd35..4901062a78 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -286,6 +286,12 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
   }
 
+  test("Pool should throw IllegalArgumentException when schedulingMode is not supported") {
+    intercept[IllegalArgumentException] {
+      new Pool("TestPool", SchedulingMode.NONE, 0, 1)
+    }
+  }
+
   private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
                          expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
     val selectedPool = rootPool.getSchedulableByName(poolName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9ae0bcd9b8..8b9d45f734 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -75,9 +75,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
 
   def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
     val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
-    confs.foreach { case (k, v) =>
-      conf.set(k, v)
-    }
+    confs.foreach { case (k, v) => conf.set(k, v) }
     sc = new SparkContext(conf)
     taskScheduler = new TaskSchedulerImpl(sc)
     setupHelper()
@@ -904,4 +902,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(taskDescs.size === 1)
     assert(taskDescs.head.executorId === "exec2")
   }
+
+  test("TaskScheduler should throw IllegalArgumentException when schedulingMode is not supported") {
+    intercept[IllegalArgumentException] {
+      val taskScheduler = setupScheduler(
+        TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY -> SchedulingMode.NONE.toString)
+      taskScheduler.initialize(new FakeSchedulerBackend)
+    }
+  }
 }
-- 
GitLab