diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index aaf433b324fecc3991f90e6234a94d685f80b217..473520758557dc835d848c4a624247347ae439d5 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -170,7 +170,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolea
     }
   }
 
-  def cleanup(cleanupTime: Long) {
+  private def cleanup(cleanupTime: Long) {
     mapStatuses.clearOldValues(cleanupTime)
     cachedSerializedStatuses.clearOldValues(cleanupTime)
   }
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index a01774f511007b091cee4a31f847965944815e8d..529f72e9da1c2c618f23306d485d88a0fbd986d8 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -45,13 +45,9 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
           case (jobId, Some(js)) if (js.equalsIgnoreCase("json")) =>
             val future = master ? RequestMasterState
             val jobInfo = for (masterState <- future.mapTo[MasterState]) yield {
-              masterState.activeJobs.find(_.id == jobId) match {
-                case Some(job) => job
-                case _ => masterState.completedJobs.find(_.id == jobId) match {
-                  case Some(job) => job
-                  case _ => null
-                }
-              }
+              masterState.activeJobs.find(_.id == jobId).getOrElse({
+                masterState.completedJobs.find(_.id == jobId).getOrElse(null)
+              })
             }
             respondWithMediaType(MediaTypes.`application/json`) { ctx =>
               ctx.complete(jobInfo.mapTo[JobInfo])
@@ -61,14 +57,10 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
               val future = master ? RequestMasterState
               future.map { state =>
                 val masterState = state.asInstanceOf[MasterState]
-
-                masterState.activeJobs.find(_.id == jobId) match {
-                  case Some(job) => spark.deploy.master.html.job_details.render(job)
-                  case _ => masterState.completedJobs.find(_.id == jobId) match {
-                    case Some(job) => spark.deploy.master.html.job_details.render(job)
-                    case _ => null
-                  }
-                }
+                val job = masterState.activeJobs.find(_.id == jobId).getOrElse({
+                  masterState.completedJobs.find(_.id == jobId).getOrElse(null)
+                })
+                spark.deploy.master.html.job_details.render(job)
               }
             }
         }
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index b130be6a3863a005cee230fee87deea7ce3962e7..14f61f7e87bc8c399e215443cdc969dcf37b6e1f 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -97,7 +97,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     }
   }.start()
 
-  def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
+  private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
     if (!cacheLocs.contains(rdd.id)) {
       val blockIds = rdd.splits.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
       cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map {
@@ -107,7 +107,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     cacheLocs(rdd.id)
   }
 
-  def clearCacheLocs() {
+  private def clearCacheLocs() {
     cacheLocs.clear()
   }
 
@@ -116,7 +116,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * The priority value passed in will be used if the stage doesn't already exist with
    * a lower priority (we assume that priorities always increase across jobs for now).
    */
-  def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
+  private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
     shuffleToMapStage.get(shuffleDep.shuffleId) match {
       case Some(stage) => stage
       case None =>
@@ -131,11 +131,11 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * as a result stage for the final RDD used directly in an action. The stage will also be given
    * the provided priority.
    */
-  def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = {
-    // Kind of ugly: need to register RDDs with the cache and map output tracker here
-    // since we can't do it in the RDD constructor because # of splits is unknown
-    logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
+  private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = {
     if (shuffleDep != None) {
+      // Kind of ugly: need to register RDDs with the cache and map output tracker here
+      // since we can't do it in the RDD constructor because # of splits is unknown
+      logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
       mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.splits.size)
     }
     val id = nextStageId.getAndIncrement()
@@ -148,7 +148,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * Get or create the list of parent stages for a given RDD. The stages will be assigned the
    * provided priority if they haven't already been created with a lower priority.
    */
-  def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
+  private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
     val parents = new HashSet[Stage]
     val visited = new HashSet[RDD[_]]
     def visit(r: RDD[_]) {
@@ -170,7 +170,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     parents.toList
   }
 
-  def getMissingParentStages(stage: Stage): List[Stage] = {
+  private def getMissingParentStages(stage: Stage): List[Stage] = {
     val missing = new HashSet[Stage]
     val visited = new HashSet[RDD[_]]
     def visit(rdd: RDD[_]) {
@@ -241,7 +241,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * events and responds by launching tasks. This runs in a dedicated thread and receives events
    * via the eventQueue.
    */
-  def run() {
+  private def run() {
     SparkEnv.set(env)
 
     while (true) {
@@ -326,7 +326,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * We run the operation in a separate thread just in case it takes a bunch of time, so that we
    * don't block the DAGScheduler event loop or other concurrent jobs.
    */
-  def runLocally(job: ActiveJob) {
+  private def runLocally(job: ActiveJob) {
     logInfo("Computing the requested partition locally")
     new Thread("Local computation of job " + job.runId) {
       override def run() {
@@ -349,13 +349,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     }.start()
   }
 
-  def submitStage(stage: Stage) {
+  /** Submits stage, but first recursively submits any missing parents. */
+  private def submitStage(stage: Stage) {
     logDebug("submitStage(" + stage + ")")
     if (!waiting(stage) && !running(stage) && !failed(stage)) {
       val missing = getMissingParentStages(stage).sortBy(_.id)
       logDebug("missing: " + missing)
       if (missing == Nil) {
-        logInfo("Submitting " + stage + " (" + stage.origin + "), which has no missing parents")
+        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
         submitMissingTasks(stage)
         running += stage
       } else {
@@ -367,7 +368,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     }
   }
 
-  def submitMissingTasks(stage: Stage) {
+  /** Called when stage's parents are available and we can now do its task. */
+  private def submitMissingTasks(stage: Stage) {
     logDebug("submitMissingTasks(" + stage + ")")
     // Get our pending tasks and remember them in our pendingTasks entry
     val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
@@ -388,7 +390,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
       }
     }
     if (tasks.size > 0) {
-      logInfo("Submitting " + tasks.size + " missing tasks from " + stage)
+      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)
       taskSched.submitTasks(
@@ -407,7 +409,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * Responds to a task finishing. This is called inside the event loop so it assumes that it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
    */
-  def handleTaskCompletion(event: CompletionEvent) {
+  private def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
     val stage = idToStage(task.stageId)
 
@@ -492,7 +494,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
                 waiting --= newlyRunnable
                 running ++= newlyRunnable
                 for (stage <- newlyRunnable.sortBy(_.id)) {
-                  logInfo("Submitting " + stage + " (" + stage.origin + "), which is now runnable")
+                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                   submitMissingTasks(stage)
                 }
               }
@@ -541,7 +543,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * Optionally the generation during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
-  def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
+  private def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
     val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
     if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
       failedGeneration(execId) = currentGeneration
@@ -567,7 +569,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
    * Aborts all jobs depending on a particular Stage. This is called in response to a task set
    * being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
    */
-  def abortStage(failedStage: Stage, reason: String) {
+  private def abortStage(failedStage: Stage, reason: String) {
     val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
     for (resultStage <- dependentStages) {
       val job = resultStageToJob(resultStage)
@@ -583,7 +585,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   /**
    * Return true if one of stage's ancestors is target.
    */
-  def stageDependsOn(stage: Stage, target: Stage): Boolean = {
+  private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
     if (stage == target) {
       return true
     }
@@ -610,7 +612,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     visitedRdds.contains(target.rdd)
   }
 
-  def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+  private def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
     // If the partition is cached, return the cache locations
     val cached = getCacheLocs(rdd)(partition)
     if (cached != Nil) {
@@ -636,7 +638,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     return Nil
   }
 
-  def cleanup(cleanupTime: Long) {
+  private def cleanup(cleanupTime: Long) {
     var sizeBefore = idToStage.size
     idToStage.clearOldValues(cleanupTime)
     logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 83641a2a8427b1d86551be7de56edaa8d4ec669a..b701b67c89fba91a5589a276ad0d26956afce162 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -127,7 +127,6 @@ private[spark] class ShuffleMapTask(
         val bucketId = dep.partitioner.getPartition(pair._1)
         buckets(bucketId) += pair
       }
-      val bucketIterators = buckets.map(_.iterator)
 
       val compressedSizes = new Array[Byte](numOutputSplits)
 
@@ -135,7 +134,7 @@ private[spark] class ShuffleMapTask(
       for (i <- 0 until numOutputSplits) {
         val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
         // Get a Scala iterator from Java map
-        val iter: Iterator[(Any, Any)] = bucketIterators(i)
+        val iter: Iterator[(Any, Any)] = buckets(i).iterator
         val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
         compressedSizes(i) = MapOutputTracker.compressSize(size)
       }
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 0b4177805b37412613f2e3dec50a755f77d6c7b4..1e4fbdb8742fdac8d33edb6b1e9ec1b3aaaea4d4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -86,7 +86,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     }
   }
 
-  def submitTasks(taskSet: TaskSet) {
+  override def submitTasks(taskSet: TaskSet) {
     val tasks = taskSet.tasks
     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
     this.synchronized {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 26201ad0dd1a0314d01b11f1d253dee40eadca13..3dabdd76b1aa55404ef660199d05e5e699775038 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,10 +17,7 @@ import java.nio.ByteBuffer
 /**
  * Schedules the tasks within a single TaskSet in the ClusterScheduler.
  */
-private[spark] class TaskSetManager(
-  sched: ClusterScheduler,
-  val taskSet: TaskSet)
-  extends Logging {
+private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Logging {
 
   // Maximum time to wait to run a task in a preferred location (in ms)
   val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
@@ -100,7 +97,7 @@ private[spark] class TaskSetManager(
   }
 
   // Add a task to all the pending-task lists that it should be on.
-  def addPendingTask(index: Int) {
+  private def addPendingTask(index: Int) {
     val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
     if (locations.size == 0) {
       pendingTasksWithNoPrefs += index
@@ -115,7 +112,7 @@ private[spark] class TaskSetManager(
 
   // Return the pending tasks list for a given host, or an empty list if
   // there is no map entry for that host
-  def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
+  private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
     pendingTasksForHost.getOrElse(host, ArrayBuffer())
   }
 
@@ -123,7 +120,7 @@ private[spark] class TaskSetManager(
   // Return None if the list is empty.
   // This method also cleans up any tasks in the list that have already
   // been launched, since we want that to happen lazily.
-  def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
+  private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
     while (!list.isEmpty) {
       val index = list.last
       list.trimEnd(1)
@@ -137,7 +134,7 @@ private[spark] class TaskSetManager(
   // Return a speculative task for a given host if any are available. The task should not have an
   // attempt running on this host, in case the host is slow. In addition, if localOnly is set, the
   // task must have a preference for this host (or no preferred locations at all).
-  def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = {
+  private def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = {
     val hostsAlive = sched.hostsAlive
     speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
     val localTask = speculatableTasks.find {
@@ -162,7 +159,7 @@ private[spark] class TaskSetManager(
 
   // Dequeue a pending task for a given node and return its index.
   // If localOnly is set to false, allow non-local tasks as well.
-  def findTask(host: String, localOnly: Boolean): Option[Int] = {
+  private def findTask(host: String, localOnly: Boolean): Option[Int] = {
     val localTask = findTaskFromList(getPendingTasksForHost(host))
     if (localTask != None) {
       return localTask
@@ -184,7 +181,7 @@ private[spark] class TaskSetManager(
   // Does a host count as a preferred location for a task? This is true if
   // either the task has preferred locations and this host is one, or it has
   // no preferred locations (in which we still count the launch as preferred).
-  def isPreferredLocation(task: Task[_], host: String): Boolean = {
+  private def isPreferredLocation(task: Task[_], host: String): Boolean = {
     val locs = task.preferredLocations
     return (locs.contains(host) || locs.isEmpty)
   }
@@ -335,7 +332,7 @@ private[spark] class TaskSetManager(
         if (numFailures(index) > MAX_TASK_FAILURES) {
           logError("Task %s:%d failed more than %d times; aborting job".format(
             taskSet.id, index, MAX_TASK_FAILURES))
-          abort("Task %d failed more than %d times".format(index, MAX_TASK_FAILURES))
+          abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
         }
       }
     } else {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 9ff7c02097b2eb49ac523694c5d3d24c495b1974..482d1cc85343a97852193b693ee9f7e054944b21 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
     }
 
     def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
-      logInfo("Running task " + idInJob)
+      logInfo("Running " + task)
       // Set the Spark execution environment for the worker thread
       SparkEnv.set(env)
       try {
@@ -80,7 +80,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
         val resultToReturn = ser.deserialize[Any](ser.serialize(result))
         val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
           ser.serialize(Accumulators.values))
-        logInfo("Finished task " + idInJob)
+        logInfo("Finished " + task)
 
         // If the threadpool has not already been shutdown, notify DAGScheduler
         if (!Thread.currentThread().isInterrupted)
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index eaff7ae581ec5dd81b0bcfd9af090ba8f5a9f044..a342d378ffab263f8dc23b58ec7e7f1f204bd045 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -9,12 +9,12 @@ import spark.Logging
  * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
  */
 class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
-  val delaySeconds = MetadataCleaner.getDelaySeconds
-  val periodSeconds = math.max(10, delaySeconds / 10)
-  val timer = new Timer(name + " cleanup timer", true)
+  private val delaySeconds = MetadataCleaner.getDelaySeconds
+  private val periodSeconds = math.max(10, delaySeconds / 10)
+  private val timer = new Timer(name + " cleanup timer", true)
 
-  val task = new TimerTask {
-    def run() {
+  private val task = new TimerTask {
+    override def run() {
       try {
         cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
         logInfo("Ran metadata cleaner for " + name)