diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 2a69a6c5e879028bcf0e50c211ed08f2ea774cac..1181371ab425a940e2f32ef2ac2ea5504b92a20d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -37,24 +37,24 @@ private[spark] class Pool(
 
   val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
   val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
-  var weight = initWeight
-  var minShare = initMinShare
+  val weight = initWeight
+  val minShare = initMinShare
   var runningTasks = 0
-  var priority = 0
+  val priority = 0
 
   // A pool's stage id is used to break the tie in scheduling.
   var stageId = -1
-  var name = poolName
+  val name = poolName
   var parent: Pool = null
 
-  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+  private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
     schedulingMode match {
       case SchedulingMode.FAIR =>
         new FairSchedulingAlgorithm()
       case SchedulingMode.FIFO =>
         new FIFOSchedulingAlgorithm()
       case _ =>
-        val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
+        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
         throw new IllegalArgumentException(msg)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bfbcfa1aa386ff8f0480fb79a53343e9a43f0006..8257c70d672a03d72573df58520fd673051673ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -59,6 +59,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
   extends TaskScheduler with Logging
 {
 
+  import TaskSchedulerImpl._
+
   def this(sc: SparkContext) = {
     this(
       sc,
@@ -130,17 +132,18 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
   val mapOutputTracker = SparkEnv.get.mapOutputTracker
 
-  var schedulableBuilder: SchedulableBuilder = null
-  var rootPool: Pool = null
+  private var schedulableBuilder: SchedulableBuilder = null
   // default scheduler is FIFO
-  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
+  private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
   val schedulingMode: SchedulingMode = try {
     SchedulingMode.withName(schedulingModeConf.toUpperCase)
   } catch {
     case e: java.util.NoSuchElementException =>
-      throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
+      throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
   }
 
+  val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
+
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
 
@@ -150,8 +153,6 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
   def initialize(backend: SchedulerBackend) {
     this.backend = backend
-    // temporarily set rootPool name to empty
-    rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
         case SchedulingMode.FIFO =>
@@ -159,7 +160,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
         case SchedulingMode.FAIR =>
           new FairSchedulableBuilder(rootPool, conf)
         case _ =>
-          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
+          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
+          s"$schedulingMode")
       }
     }
     schedulableBuilder.buildPools()
@@ -683,6 +685,9 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
 
 private[spark] object TaskSchedulerImpl {
+
+  val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
+
   /**
    * Used to balance containers across hosts.
    *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 11633bef3cfc7657fe4ee5d9d9fabf4fcb2607ba..fd93a1f5c5d2a189e8c4d8948db6af78d5bb40c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -78,16 +78,16 @@ private[spark] class TaskSetManager(
   private val numFailures = new Array[Int](numTasks)
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
-  var tasksSuccessful = 0
+  private[scheduler] var tasksSuccessful = 0
 
-  var weight = 1
-  var minShare = 0
+  val weight = 1
+  val minShare = 0
   var priority = taskSet.priority
   var stageId = taskSet.stageId
   val name = "TaskSet_" + taskSet.id
   var parent: Pool = null
-  var totalResultSize = 0L
-  var calculatedTasks = 0
+  private var totalResultSize = 0L
+  private var calculatedTasks = 0
 
   private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
     blacklistTracker.map { _ =>
@@ -95,7 +95,7 @@ private[spark] class TaskSetManager(
     }
   }
 
-  val runningTasksSet = new HashSet[Long]
+  private[scheduler] val runningTasksSet = new HashSet[Long]
 
   override def runningTasks: Int = runningTasksSet.size
 
@@ -105,7 +105,7 @@ private[spark] class TaskSetManager(
   // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
   // state in order to continue to track and account for the running tasks.
   // TODO: We should kill any running task attempts when the task set manager becomes a zombie.
-  var isZombie = false
+  private[scheduler] var isZombie = false
 
   // Set of pending tasks for each executor. These collections are actually
   // treated as stacks, in which new tasks are added to the end of the
@@ -129,17 +129,17 @@ private[spark] class TaskSetManager(
   private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
 
   // Set containing pending tasks with no locality preferences.
-  var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
+  private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
 
   // Set containing all pending tasks (also used as a stack, as above).
-  val allPendingTasks = new ArrayBuffer[Int]
+  private val allPendingTasks = new ArrayBuffer[Int]
 
   // Tasks that can be speculated. Since these will be a small fraction of total
   // tasks, we'll just hold them in a HashSet.
-  val speculatableTasks = new HashSet[Int]
+  private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task ID)
-  val taskInfos = new HashMap[Long, TaskInfo]
+  private val taskInfos = new HashMap[Long, TaskInfo]
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
@@ -148,7 +148,7 @@ private[spark] class TaskSetManager(
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
   // was printed. This should ideally be an LRU map that can drop old exceptions automatically.
-  val recentExceptions = HashMap[String, (Int, Long)]()
+  private val recentExceptions = HashMap[String, (Int, Long)]()
 
   // Figure out the current map output tracker epoch and set it on all tasks
   val epoch = sched.mapOutputTracker.getEpoch
@@ -169,20 +169,22 @@ private[spark] class TaskSetManager(
    * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
    * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
    */
-  var myLocalityLevels = computeValidLocalityLevels()
-  var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
+  private[scheduler] var myLocalityLevels = computeValidLocalityLevels()
+
+  // Time to wait at each level
+  private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)
 
   // Delay scheduling variables: we keep track of our current locality level and the time we
   // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
   // We then move down if we manage to launch a "more local" task.
-  var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
-  var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task at this level
+  private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
+  private var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task at this level
 
   override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null
 
   override def schedulingMode: SchedulingMode = SchedulingMode.NONE
 
-  var emittedTaskSizeWarning = false
+  private[scheduler] var emittedTaskSizeWarning = false
 
   /** Add a task to all the pending-task lists that it should be on. */
   private def addPendingTask(index: Int) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index dfad5db68a91403c98e453f9460c10400613248f..a9389003d5db8124649410fdb52aa45b50fad23e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -110,8 +110,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
   val cancelledStages = new HashSet[Int]()
 
   val taskScheduler = new TaskScheduler() {
-    override def rootPool: Pool = null
-    override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+    override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+    override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
     override def start() = {}
     override def stop() = {}
     override def executorHeartbeatReceived(
@@ -542,8 +542,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     // make sure that the DAGScheduler doesn't crash when the TaskScheduler
     // doesn't implement killTask()
     val noKillTaskScheduler = new TaskScheduler() {
-      override def rootPool: Pool = null
-      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+      override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+      override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
       override def start(): Unit = {}
       override def stop(): Unit = {}
       override def submitTasks(taskSet: TaskSet): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index e87cebf0cf358918a7ed542a7a64df4ea19a0388..37c124a726be25662c18972fb2c54e940f1579f2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -73,8 +73,8 @@ private class DummySchedulerBackend extends SchedulerBackend {
 
 private class DummyTaskScheduler extends TaskScheduler {
   var initialized = false
-  override def rootPool: Pool = null
-  override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+  override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+  override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
   override def start(): Unit = {}
   override def stop(): Unit = {}
   override def submitTasks(taskSet: TaskSet): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index cddff3dd35861e3dff41136a4b769b496b12a4cb..4901062a7855376e684eae9505d8313ae4197f75 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -286,6 +286,12 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
   }
 
+  test("Pool should throw IllegalArgumentException when schedulingMode is not supported") {
+    intercept[IllegalArgumentException] {
+      new Pool("TestPool", SchedulingMode.NONE, 0, 1)
+    }
+  }
+
   private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
                          expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
     val selectedPool = rootPool.getSchedulableByName(poolName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9ae0bcd9b886096245c6d371001ea028bf1482c4..8b9d45f734cda345176f98170ace26db7d61dce5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -75,9 +75,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
 
   def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
     val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
-    confs.foreach { case (k, v) =>
-      conf.set(k, v)
-    }
+    confs.foreach { case (k, v) => conf.set(k, v) }
     sc = new SparkContext(conf)
     taskScheduler = new TaskSchedulerImpl(sc)
     setupHelper()
@@ -904,4 +902,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(taskDescs.size === 1)
     assert(taskDescs.head.executorId === "exec2")
   }
+
+  test("TaskScheduler should throw IllegalArgumentException when schedulingMode is not supported") {
+    intercept[IllegalArgumentException] {
+      val taskScheduler = setupScheduler(
+        TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY -> SchedulingMode.NONE.toString)
+      taskScheduler.initialize(new FakeSchedulerBackend)
+    }
+  }
 }