diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index d72dbadc3904f327effddf99594045067be2f529..17d1978dde4d7ffad83621c4c548b844852462b7 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -1,6 +1,7 @@
 # Set everything to be logged to the console
 log4j.rootCategory=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
 
diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index d72dbadc3904f327effddf99594045067be2f529..17d1978dde4d7ffad83621c4c548b844852462b7 100644
--- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
@@ -1,6 +1,7 @@
 # Set everything to be logged to the console
 log4j.rootCategory=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
 
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 31b0773bfe06c6ca4cebc75e4a13430857940dd9..9b043f06dd7349d159ce2ae7d15fae9cefea39c6 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -61,7 +61,8 @@ object Partitioner {
 }
 
 /**
- * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
+ * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
+ * Java's `Object.hashCode`.
  *
  * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
  * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
@@ -84,8 +85,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
 }
 
 /**
- * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
- * Determines the ranges by sampling the RDD passed in.
+ * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
+ * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
  */
 class RangePartitioner[K <% Ordered[K]: ClassTag, V](
     partitions: Int,
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 101a7e6db9df7877f5cb1e1156890f3fb3b250eb..d83c88985a950b85a4e565697b7b71b40cd4816b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,9 +31,9 @@ import scala.reflect.{ClassTag, classTag}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
-FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
-TextInputFormat}
+  TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
 import org.apache.mesos.MesosNativeLibrary
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType,
-ClosureCleaner}
+  ClosureCleaner}
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -116,7 +116,7 @@ class SparkContext(
     throw new SparkException("An application must be set in your configuration")
   }
 
-  if (conf.get("spark.logConf", "false").toBoolean) {
+  if (conf.getBoolean("spark.logConf", false)) {
     logInfo("Spark configuration:\n" + conf.toDebugString)
   }
 
@@ -244,6 +244,10 @@ class SparkContext(
     localProperties.set(new Properties())
   }
 
+  /**
+   * Set a local property that affects jobs submitted from this thread, such as the
+   * Spark fair scheduler pool.
+   */
   def setLocalProperty(key: String, value: String) {
     if (localProperties.get() == null) {
       localProperties.set(new Properties())
@@ -255,6 +259,10 @@ class SparkContext(
     }
   }
 
+  /**
+   * Get a local property set in this thread, or null if it is missing. See
+   * [[org.apache.spark.SparkContext.setLocalProperty]].
+   */
   def getLocalProperty(key: String): String =
     Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
 
@@ -265,7 +273,7 @@ class SparkContext(
   }
 
   /**
-   * Assigns a group id to all the jobs started by this thread until the group id is set to a
+   * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
    * different value or cleared.
    *
    * Often, a unit of execution in an application consists of multiple Spark actions or jobs.
@@ -288,7 +296,7 @@ class SparkContext(
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
   }
 
-  /** Clear the job group id and its description. */
+  /** Clear the current thread's job group ID and its description. */
   def clearJobGroup() {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
@@ -517,15 +525,15 @@ class SparkContext(
   // Methods for creating shared variables
 
   /**
-   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values
-   * to using the `+=` method. Only the driver can access the accumulator's `value`.
+   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
+   * values to using the `+=` method. Only the driver can access the accumulator's `value`.
    */
   def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
     new Accumulator(initialValue, param)
 
   /**
-   * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
-   * Only the driver can access the accumuable's `value`.
+   * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
+   * with `+=`. Only the driver can access the accumuable's `value`.
    * @tparam T accumulator type
    * @tparam R type that can be added to the accumulator
    */
@@ -538,14 +546,16 @@ class SparkContext(
    * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
    * standard mutable collections. So you can use this with mutable Map, Set, etc.
    */
-  def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
+  def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
+      (initialValue: R) = {
     val param = new GrowableAccumulableParam[R,T]
     new Accumulable(initialValue, param)
   }
 
   /**
-   * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for
-   * reading it in distributed functions. The variable will be sent to each cluster only once.
+   * Broadcast a read-only variable to the cluster, returning a
+   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
+   * The variable will be sent to each cluster only once.
    */
   def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
 
@@ -1010,7 +1020,8 @@ object SparkContext {
 
   implicit def stringToText(s: String) = new Text(s)
 
-  private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = {
+  private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
+    : ArrayWritable = {
     def anyToWritable[U <% Writable](u: U): Writable = u
 
     new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
@@ -1033,7 +1044,9 @@ object SparkContext {
 
   implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
 
-  implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+  implicit def bytesWritableConverter() = {
+    simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+  }
 
   implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
 
@@ -1049,7 +1062,8 @@ object SparkContext {
     if (uri != null) {
       val uriStr = uri.toString
       if (uriStr.startsWith("jar:file:")) {
-        // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar
+        // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class",
+        // so pull out the /path/foo.jar
         List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
       } else {
         Nil
@@ -1203,7 +1217,7 @@ object SparkContext {
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
         val scheduler = new TaskSchedulerImpl(sc)
-        val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean
+        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
           new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 2e36ccb9a0f076b92c59ac00793b91237320f5a2..e093e2f1620224166e20b5d8a79bd347fad94b72 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -162,7 +162,7 @@ object SparkEnv extends Logging {
         actorSystem.actorOf(Props(newActor), name = name)
       } else {
         val driverHost: String = conf.get("spark.driver.host", "localhost")
-        val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
+        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
         Utils.checkHost(driverHost, "Expected hostname")
         val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
         val timeout = AkkaUtils.lookupTimeout(conf)
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 103a1c205185f1effde97fbac116076221c298b8..618d95015f7475253614dfcfab8cb418bde91a80 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -127,10 +127,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
     cmtr.commitJob(getJobContext())
   }
 
-  def cleanup() {
-    getOutputCommitter().cleanupJob(getJobContext())
-  }
-
   // ********* Private Functions *********
 
   private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index da30cf619a1d0ecfabf501faecc0e2b0f0a64738..b0dedc6f4eb135f5751414921d21eb4fd2110080 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -207,13 +207,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    *  e.g. for the array
    *  [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
    *  e.g 1<=x<10 , 10<=x<20, 20<=x<50
-   *  And on the input of 1 and 50 we would have a histogram of 1,0,0 
-   * 
+   *  And on the input of 1 and 50 we would have a histogram of 1,0,0
+   *
    * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
    * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
    * to true.
    * buckets must be sorted and not contain any duplicates.
-   * buckets array must be at least two elements 
+   * buckets array must be at least two elements
    * All NaN entries are treated the same. If you have a NaN bucket it must be
    * the maximum value of the last position and all NaN entries will be counted
    * in that bucket.
@@ -225,6 +225,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
   def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = {
     srdd.histogram(buckets.map(_.toDouble), evenBuckets)
   }
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaDoubleRDD = {
+    srdd.setName(name)
+    this
+  }
 }
 
 object JavaDoubleRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 55c87450ac65ae5256cba054cf4ce7684773c592..0fb7e195b34c4a9b530e2f718fa98f86d391303d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -647,6 +647,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
   def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
     rdd.countApproxDistinctByKey(relativeSD, numPartitions)
   }
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaPairRDD[K, V] = {
+    rdd.setName(name)
+    this
+  }
 }
 
 object JavaPairRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 037cd1c774691e6cd2122d03dea488d1a57f0517..7d48ce01cf2cc9606c786e14018ad8bacf2f7876 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -127,6 +127,12 @@ JavaRDDLike[T, JavaRDD[T]] {
     wrapRDD(rdd.subtract(other, p))
 
   override def toString = rdd.toString
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaRDD[T] = {
+    rdd.setName(name)
+    this
+  }
 }
 
 object JavaRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 924d8af0602f4a7fa8e33b637ebd5af23059ad81..ebbbbd88061a1df7c205638f0872d7b9f724e107 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -244,6 +244,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
     new java.util.ArrayList(arr)
   }
 
+  /**
+   * Return an array that contains all of the elements in this RDD.
+   */
+  def toArray(): JList[T] = collect()
+
   /**
    * Return an array that contains all of the elements in a specific partition of this RDD.
    */
@@ -455,4 +460,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
 
+  def name(): String = rdd.name
 }
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e93b10fd7eecb71b59908a54a765d0e23b7da7b1..7a6f044965027d26bd7af468eb225784c76ccc31 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -425,6 +425,51 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   def clearCallSite() {
     sc.clearCallSite()
   }
+
+  /**
+   * Set a local property that affects jobs submitted from this thread, such as the
+   * Spark fair scheduler pool.
+   */
+  def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value)
+
+  /**
+   * Get a local property set in this thread, or null if it is missing. See
+   * [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
+   */
+  def getLocalProperty(key: String): String = sc.getLocalProperty(key)
+
+  /**
+   * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+   * different value or cleared.
+   *
+   * Often, a unit of execution in an application consists of multiple Spark actions or jobs.
+   * Application programmers can use this method to group all those jobs together and give a
+   * group description. Once set, the Spark web UI will associate such jobs with this group.
+   *
+   * The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]]
+   * to cancel all running jobs in this group. For example,
+   * {{{
+   * // In the main thread:
+   * sc.setJobGroup("some_job_to_cancel", "some job description");
+   * rdd.map(...).count();
+   *
+   * // In a separate thread:
+   * sc.cancelJobGroup("some_job_to_cancel");
+   * }}}
+   */
+  def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
+
+  /** Clear the current thread's job group ID and its description. */
+  def clearJobGroup(): Unit = sc.clearJobGroup()
+
+  /**
+   * Cancel active jobs for the specified group. See
+   * [[org.apache.spark.api.java.JavaSparkContext.setJobGroup]] for more information.
+   */
+  def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId)
+
+  /** Cancel all jobs that have been scheduled or are running. */
+  def cancelAllJobs(): Unit = sc.cancelAllJobs()
 }
 
 object JavaSparkContext {
@@ -436,5 +481,12 @@ object JavaSparkContext {
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to SparkContext.
    */
-  def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
+  def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
+
+  /**
+   * Find the JAR that contains the class of a particular object, to make it easy for users
+   * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
+   * your driver program.
+   */
+  def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray
 }
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 32cc70e8c9dda4d98e44b8d5165ad5375507e460..40c519b5bd9708e904b3e0104cc21511a6c73326 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
     accumulator: Accumulator[JList[Array[Byte]]])
   extends RDD[Array[Byte]](parent) {
 
-  val bufferSize = conf.get("spark.buffer.size", "65536").toInt
+  val bufferSize = conf.getInt("spark.buffer.size", 65536)
 
   override def getPartitions = parent.partitions
 
@@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
 
   Utils.checkHost(serverHost, "Expected hostname")
 
-  val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt
+  val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)
 
   override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
 
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index db596d5fcc05413debab51d1ca9dd8dca0321090..0eacda3d7dc2b4422bd0a46f1b54078c95c2a431 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging {
   def initialize(isDriver: Boolean, conf: SparkConf) {
     synchronized {
       if (!initialized) {
-        bufferSize = conf.get("spark.buffer.size", "65536").toInt
-        compress = conf.get("spark.broadcast.compress", "true").toBoolean
+        bufferSize = conf.getInt("spark.buffer.size", 65536)
+        compress = conf.getBoolean("spark.broadcast.compress", true)
         if (isDriver) {
           createServer(conf)
           conf.set("spark.httpBroadcast.uri",  serverUri)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 95309382786a902fceedfeac5e712e8ca021b952..fdf92eca4f25e84aa0c81f33cc1e7c2441ceecfa 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -180,7 +180,7 @@ extends Logging {
     initialized = false
   }
 
-  lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024
+  lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
 
   def blockifyObject[T](obj: T): TorrentInfo = {
     val byteArray = Utils.serialize[T](obj)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index cd3f3ebefc1390536ff104f30b4969bcdbec74c2..d9ea96afcf52a2e4719f571c1c27e5dbf91c6389 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -46,9 +46,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val conf = new SparkConf
 
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
-  val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
-  val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+  val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
+  val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
+  val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
   val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
   val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 273bacded6fb4fdf28b33af63054314cbc7ba0a4..5182dcbb2abfdccc0f1bd5468d105a917f83df84 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -59,7 +59,7 @@ private[spark] class Worker(
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4
+  val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index a1e98845f6a848b8ab0651aab5e3b0d484d32a02..59801773205bdab204e25bb227995361feebf812 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt
+    val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
     new SnappyOutputStream(s, blockSize)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 46c40d0a2a02959ffb50ea7541c7092ac0a9bab2..e6e01783c889524c561f0addc6f136c91f6eae57 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
   private val selector = SelectorProvider.provider.openSelector()
 
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.get("spark.core.connection.handler.threads.min", "20").toInt,
-    conf.get("spark.core.connection.handler.threads.max", "60").toInt,
-    conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.getInt("spark.core.connection.handler.threads.min", 20),
+    conf.getInt("spark.core.connection.handler.threads.max", 60),
+    conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.get("spark.core.connection.io.threads.min", "4").toInt,
-    conf.get("spark.core.connection.io.threads.max", "32").toInt,
-    conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.getInt("spark.core.connection.io.threads.min", 4),
+    conf.getInt("spark.core.connection.io.threads.max", 32),
+    conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.get("spark.core.connection.connect.threads.min", "1").toInt,
-    conf.get("spark.core.connection.connect.threads.max", "8").toInt,
-    conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.getInt("spark.core.connection.connect.threads.min", 1),
+    conf.getInt("spark.core.connection.connect.threads.max", 8),
+    conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index b729eb11c514268798ecd21a679b6abbb3202e90..d87157e12c4876201371663ee94dc1f4a8c6a32d 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
       resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
 
     val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt
+    val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 60000)
     val fc = new FileClient(handler, connectTimeout)
 
     try {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 6d4f46125f1a60bf872dfe27baad4cf1d07070ad..83109d1a6f853f0aa9c98c3eed86c8165557f3ca 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging {
       throw new IOException("Checkpoint failed: temporary path " +
         tempOutputPath + " already exists")
     }
-    val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
+    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
 
     val fileOutputStream = if (blockSize < 0) {
       fs.create(tempOutputPath, false, bufferSize)
@@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging {
     ): Iterator[T] = {
     val env = SparkEnv.get
     val fs = path.getFileSystem(broadcastedConf.value.value)
-    val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
+    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
     val deserializeStream = serializer.deserializeStream(fileInputStream)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2662d48c84a2eb60b83ae495633226cb17a7446f..73d15b90822addbb26062193e2f323c7f41a525a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -76,7 +76,7 @@ class NewHadoopRDD[K, V](
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
       val conf = confBroadcast.value.value
-      val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
+      val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
       val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
       val format = inputFormatClass.newInstance
       if (format.isInstanceOf[Configurable]) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 04a8d05988f168a2a294c939aa70e9c8ef78824c..c118ddfc0138f86bd300344e4d239150d3d0c7f8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -18,35 +18,34 @@
 package org.apache.spark.rdd
 
 import java.nio.ByteBuffer
-import java.util.Date
 import java.text.SimpleDateFormat
+import java.util.Date
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.{mutable, Map}
+import scala.collection.Map
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 import scala.reflect.{ClassTag, classTag}
 
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
 import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
 
+// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
+import org.apache.hadoop.mapred.SparkHadoopWriter
+import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
-import org.apache.spark.Aggregator
-import org.apache.spark.Partitioner
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.util.SerializableHyperLogLog
 
@@ -120,9 +119,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
     // Serialize the zero value to a byte array so that we can get a new clone of it on each key
@@ -138,18 +137,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
     foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
   }
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
     foldByKey(zeroValue, defaultPartitioner(self))(func)
@@ -226,7 +225,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Return approximate number of distinct values for each key in this RDD. 
+   * Return approximate number of distinct values for each key in this RDD.
    * The accuracy of approximation can be controlled through the relative standard deviation
    * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
    * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
@@ -579,7 +578,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def saveAsHadoopFile[F <: OutputFormat[K, V]](
       path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec)
+    val runtimeClass = fm.runtimeClass
+    saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec)
   }
 
   /**
@@ -599,7 +599,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = self.context.hadoopConfiguration) {
+      conf: Configuration = self.context.hadoopConfiguration)
+  {
     val job = new NewAPIHadoopJob(conf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
@@ -613,7 +614,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
       /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.partitionId, attemptNumber)
+      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+        attemptNumber)
       val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
       val format = outputFormatClass.newInstance
       val committer = format.getOutputCommitter(hadoopContext)
@@ -632,13 +634,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
      * however we're only going to use this local OutputCommitter for
      * setupJob/commitJob, so we just use a dummy "map" task.
      */
-    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
+    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
     val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
     jobCommitter.setupJob(jobTaskContext)
     val count = self.context.runJob(self, writeShard _).sum
     jobCommitter.commitJob(jobTaskContext)
-    jobCommitter.cleanupJob(jobTaskContext)
   }
 
   /**
@@ -668,7 +669,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       codec: Option[Class[_ <: CompressionCodec]] = None) {
     conf.setOutputKeyClass(keyClass)
     conf.setOutputValueClass(valueClass)
-    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+    // Doesn't work in Scala 2.9 due to what may be a generics bug
+    // TODO: Should we uncomment this for Scala 2.10?
+    // conf.setOutputFormat(outputFormatClass)
     conf.set("mapred.output.format.class", outputFormatClass.getName)
     for (c <- codec) {
       conf.setCompressMapOutput(true)
@@ -702,7 +705,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       throw new SparkException("Output value class not set")
     }
 
-    logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+    logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
+      valueClass.getSimpleName+ ")")
 
     val writer = new SparkHadoopWriter(conf)
     writer.preSetup()
@@ -728,7 +732,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
 
     self.context.runJob(self, writeToFile _)
     writer.commitJob()
-    writer.cleanup()
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3f41b662799873694958b2b699563c0627cab499..f9dc12eee3291ffaaa5a9290176039c4d14a5242 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -23,7 +23,6 @@ import scala.collection.Map
 import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.mutable.ArrayBuffer
 
-import scala.collection.mutable.HashMap
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.hadoop.io.BytesWritable
@@ -52,11 +51,13 @@ import org.apache.spark._
  * partitioned collection of elements that can be operated on in parallel. This class contains the
  * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
  * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
- * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains
- * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]]
- * contains operations available on RDDs that can be saved as SequenceFiles. These operations are
- * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit
- * conversions when you `import org.apache.spark.SparkContext._`.
+ * pairs, such as `groupByKey` and `join`;
+ * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
+ * Doubles; and
+ * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
+ * can be saved as SequenceFiles.
+ * These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
+ * through implicit conversions when you `import org.apache.spark.SparkContext._`.
  *
  * Internally, each RDD is characterized by five main properties:
  *
@@ -235,12 +236,9 @@ abstract class RDD[T: ClassTag](
   /**
    * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
    */
-  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
-    if (isCheckpointed) {
-      firstParent[T].iterator(split, context)
-    } else {
-      compute(split, context)
-    }
+  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
+  {
+    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
   }
 
   // Transformations (return a new RDD)
@@ -268,6 +266,9 @@ abstract class RDD[T: ClassTag](
   def distinct(numPartitions: Int): RDD[T] =
     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
 
+  /**
+   * Return a new RDD containing the distinct elements in this RDD.
+   */
   def distinct(): RDD[T] = distinct(partitions.size)
 
   /**
@@ -280,7 +281,7 @@ abstract class RDD[T: ClassTag](
    * which can avoid performing a shuffle.
    */
   def repartition(numPartitions: Int): RDD[T] = {
-    coalesce(numPartitions, true)
+    coalesce(numPartitions, shuffle = true)
   }
 
   /**
@@ -646,7 +647,8 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+   * Reduces the elements of this RDD using the specified commutative and
+   * associative binary operator.
    */
   def reduce(f: (T, T) => T): T = {
     val cleanF = sc.clean(f)
@@ -953,7 +955,7 @@ abstract class RDD[T: ClassTag](
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
   /** Record user function generating this RDD. */
-  @transient private[spark] val origin = sc.getCallSite
+  @transient private[spark] val origin = sc.getCallSite()
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index e22b1e53e80482149fc19d7c65d5930a71a352d8..c52d6175d28fa8775a6c88da0df887c8ee79a35e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
 private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends Logging {
 
-  private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt
+  private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
   private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
     THREADS, "Result resolver thread")
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0c8ed6275991a4cd7f8def626244c1f73ed8cd28..d4f74d3e1854344af2186d063d80090760b4bf40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl(
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
-  def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt)
+  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
 
   val conf = sc.conf
 
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong
+  val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
 
   // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong
+  val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
 
   // TaskSetManagers are not thread safe, so any access to one should be synchronized
   // on this class.
@@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl(
   override def start() {
     backend.start()
 
-    if (!isLocal && conf.get("spark.speculation", "false").toBoolean) {
+    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6dd1469d8f801b829a739f878bec88c1b8746b97..a10e5397ad4a607de4347bc8ac623f550c711a7e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -57,11 +57,11 @@ private[spark] class TaskSetManager(
   val conf = sched.sc.conf
 
   // CPUs to request per task
-  val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt
+  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble
-  val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble
+  val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
+  val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -116,7 +116,7 @@ private[spark] class TaskSetManager(
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
-    conf.get("spark.logging.exceptionPrintInterval", "10000").toLong
+    conf.getLong("spark.logging.exceptionPrintInterval", 10000)
 
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2f5bcafe40394d24a28bfd66915bb98436652f2f..8d596a76c224c079ab7a5729c9689686beb5c329 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -63,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
       // Periodically revive offers to allow delay scheduling to work
-      val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong
+      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
       import context.dispatcher
       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
     }
@@ -209,8 +209,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   }
 
   override def defaultParallelism(): Int = {
-    conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse(
-      math.max(totalCoreCount.get(), 2))
+    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
   }
 
   // Called by subclasses when notified of a lost worker
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index b44d1e43c85c770b5f7d3d76c5d29d556be2e93d..d99c76117c168a63597d56482f4eeb90cd9e3a9a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend(
   val tmpPath = new Path(driverFilePath + "_tmp")
   val filePath = new Path(driverFilePath)
 
-  val maxCores = conf.get("spark.simr.executor.cores", "1").toInt
+  val maxCores = conf.getInt("spark.simr.executor.cores", 1)
 
   override def start() {
     super.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d46fceba8918a1cc76deea05b938315072f86ff3..e16d60c54cc1cb1d8b2c3a6189867e8e4a186385 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     "Spark home is not set; set it through the spark.home system " +
     "property, the SPARK_HOME environment variable or the SparkContext constructor"))
 
-  val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt
+  val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
 
   var nextMesosTaskId = 0
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index ae8d527352f733b5f8e002372b0a422e0b33c8c6..b428c82a48f5969afca3ef317a189dd4533f012d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend(
   }
 
   // TODO: query Mesos for number of cores
-  override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt
+  override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
 }
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index a24a3b04b87ccf57f365143781b98226b4c256a0..c14cd4755698776cff65a4abe24be5f1a2b26d5c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -36,7 +36,7 @@ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
  */
 class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
   private val bufferSize = {
-    conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+    conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
   }
 
   def newKryoOutput() = new KryoOutput(bufferSize)
@@ -48,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
 
     // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
     // Do this before we invoke the user registrator so the user registrator can override this.
-    kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean)
+    kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true))
 
     for (cls <- KryoSerializer.toRegister) kryo.register(cls)
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 47478631a11f0289282250fe9ba4d00622479236..4fa2ab96d97255c0d8235e0077697a33caa97889 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
         fetchRequestsSync.put(request)
       }
 
-      copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt)
+      copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
       logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
         Utils.getUsedTimeMs(startTime))
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6d2cda97b04ebb5c9e08b2ee33c6450a2499a449..c56e2ca2df08c502c10a5153eb45cbc513201848 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
 
   // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
   private val nettyPort: Int = {
-    val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean
-    val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt
+    val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
+    val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0)
     if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
@@ -72,14 +72,14 @@ private[spark] class BlockManager(
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
   val maxBytesInFlight =
-    conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+    conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024
 
   // Whether to compress broadcast variables that are stored
-  val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean
+  val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
   // Whether to compress shuffle output that are stored
-  val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean
+  val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
   // Whether to compress RDD partitions that are stored serialized
-  val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean
+  val compressRdds = conf.getBoolean("spark.rdd.compress", false)
 
   val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
 
@@ -443,7 +443,7 @@ private[spark] class BlockManager(
       : BlockFetcherIterator = {
 
     val iter =
-      if (conf.get("spark.shuffle.use.netty", "false").toBoolean) {
+      if (conf.getBoolean("spark.shuffle.use.netty", false)) {
         new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
       } else {
         new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean
+    val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
     new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
   }
 
@@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging {
   val ID_GENERATOR = new IdGenerator
 
   def getMaxMemory(conf: SparkConf): Long = {
-    val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble
+    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66)
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
 
   def getHeartBeatFrequency(conf: SparkConf): Long =
-    conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+    conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) / 4
 
   def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
-    conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+    conf.getBoolean("spark.test.disableBlockManagerHeartBeat", false)
 
   /**
    * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 51a29ed8ef81ad2818b86f5fad6bbc6a874c2378..c54e4f2664753c8a8f3f79b0a9e4dd2c98604612 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -30,8 +30,8 @@ import org.apache.spark.util.AkkaUtils
 private[spark]
 class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
 
-  val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
-  val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
+  val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
+  val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 58452d96574c9b2d78dc148b14d91e0a2b50769e..2c1a4e2f5d3a18ce86faa39831a6403d8ea422e6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -348,14 +348,19 @@ object BlockManagerMasterActor {
 
       if (storageLevel.isValid) {
         // isValid means it is either stored in-memory or on-disk.
-        _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
+        // But the memSize here indicates the data size in or dropped from memory,
+        // and the diskSize here indicates the data size in or dropped to disk.
+        // They can be both larger than 0, when a block is dropped from memory to disk.
+        // Therefore, a safe way to set BlockStatus is to set its info in accurate modes.
         if (storageLevel.useMemory) {
+          _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0))
           _remainingMem -= memSize
           logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
             blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
             Utils.bytesToString(_remainingMem)))
         }
         if (storageLevel.useDisk) {
+          _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize))
           logInfo("Added %s on disk on %s (size: %s)".format(
             blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
         }
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 55dcb3742c9677829a900779686142a4f3e6559d..edc1133172d2a4a336525945534bb21e37c8203a 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
-  private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt
+  private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64)
 
   // Create one local directory for each path mentioned in spark.local.dir; then, inside this
   // directory, create multiple subdirectories that we will hash files into, in order to avoid
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 39dc7bb19afeed7bd87a230f7a9b9ac23221675a..e2b24298a55e89b3410d8bace91e026795d8f8d4 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -64,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
-    conf.get("spark.shuffle.consolidateFiles", "false").toBoolean
+    conf.getBoolean("spark.shuffle.consolidateFiles", false)
 
-  private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+  private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
 
   /**
    * Contains all the state related to a particular shuffle. This includes a pool of unused
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b7b87250b98ee86adf9adc3abe2b6cac733f9bf9..bcd282445050da5b81a8b6072abc12ca577fc0f5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
  */
 private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
   // How many stages to remember
-  val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt
+  val RETAINED_STAGES = sc.conf.getInt("spark.ui.retainedStages", 1000)
   val DEFAULT_POOL_NAME = "default"
 
   val stageIdToPool = new HashMap[Int, String]()
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 3f009a8998cbd7ed1afdecbba4b408b3abd3cee4..761d378c7fd8b5f264151f54a2a78aac435e6368 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,13 +44,13 @@ private[spark] object AkkaUtils {
   def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
     conf: SparkConf): (ActorSystem, Int) = {
 
-    val akkaThreads   = conf.get("spark.akka.threads", "4").toInt
-    val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt
+    val akkaThreads   = conf.getInt("spark.akka.threads", 4)
+    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
 
-    val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
+    val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
 
-    val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
-    val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+    val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10)
+    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
     val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
     if (!akkaLogLifecycleEvents) {
       // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
@@ -58,12 +58,12 @@ private[spark] object AkkaUtils {
       Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
     }
 
-    val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
+    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
 
-    val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
+    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600)
     val akkaFailureDetector =
-      conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
-    val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
+      conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
+    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
 
     val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
       ConfigFactory.parseString(
@@ -103,7 +103,7 @@ private[spark] object AkkaUtils {
 
   /** Returns the default Spark timeout to use for Akka ask operations. */
   def askTimeout(conf: SparkConf): FiniteDuration = {
-    Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
+    Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds")
   }
 
   /** Returns the default Spark timeout to use for Akka remote actor lookup. */
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index aa7f52cafbf37f5f3e45a286652b8308fee82216..3d1e90a3522a4cce1d566e95b2689f9a22167493 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -74,7 +74,7 @@ object MetadataCleanerType extends Enumeration {
 // initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
 object MetadataCleaner {
   def getDelaySeconds(conf: SparkConf) = {
-    conf.get("spark.cleaner.ttl", "3500").toInt
+    conf.getInt("spark.cleaner.ttl", 3500)
   }
 
   def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
index e9907e6c855aea4cc945f98c48b59293a441a1d8..08b31ac64f290561d5c5b21edb032e81315099f1 100644
--- a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
@@ -91,4 +91,4 @@ private[spark] object XORShiftRandom {
 
   }
 
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 1eec6726f48bc0900c64c97b0bb9edaaa641705f..c9f6cc5d079b5909459ec4fb843c57c9d0354aef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -83,7 +83,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   private val conf = new SparkConf
 
-  val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong
+  val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
   val MAX_TASK_FAILURES = 4
 
   test("TaskSet with no preferences") {
diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
index b78367b6cac028b217abea0fcbb79c08ab270045..f1d7b61b31e635ba816fdabbcda780363a03ae49 100644
--- a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
@@ -73,4 +73,4 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
 
   }
 
-}
\ No newline at end of file
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 1d6c3d16333c5ebc86a348113ca98a265bbc94c4..6717757781974b35157d57d94f8397061bd5e3e7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -130,7 +130,7 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td>spark.ui.retained_stages</td>
+  <td>spark.ui.retainedStages</td>
   <td>1000</td>
   <td>
     How many stages the Spark UI remembers before garbage collecting.
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 717071d72c9b9ef93eb567a45cda24c82fd22bd1..b20627010798a9fe8461a661c281f12847593eab 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -114,6 +114,8 @@ For example:
     SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
     MASTER=yarn-client ./bin/spark-shell
 
+You can also send extra files to yarn cluster for worker to use by exporting SPARK_YARN_DIST_FILES=file1,file2... etc.
+
 # Building Spark for Hadoop/YARN 2.2.x
 
 See [Building Spark with Maven](building-with-maven.html) for instructions on how to build Spark using the Maven process.
diff --git a/make-distribution.sh b/make-distribution.sh
index 1a3a5d0209ccf8a295338e26735dfac06b41408c..e6b5956d1e7e25efd0c5bb49cb6e549e0dc01f63 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -44,7 +44,7 @@ DISTDIR="$FWDIR/dist"
 # Get version from SBT
 export TERM=dumb   # Prevents color codes in SBT output
 
-VERSIONSTRING=$FWDIR/sbt/sbt "show version"
+VERSIONSTRING=$($FWDIR/sbt/sbt "show version")
 
 if [ $? == -1 ] ;then
     echo -e "You need sbt installed and available on your path."
diff --git a/pom.xml b/pom.xml
index c2b1a7795a25657694d7525b78f6b373945b106b..6e2dd33d4971667ca29d2139042c41df3444faa3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,11 @@
         <enabled>false</enabled>
       </snapshots>
     </repository>
+    <repository>
+      <id>cloudera-repo</id>
+      <name>Cloudera Repository</name>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+    </repository>
   </repositories>
 
   <dependencyManagement>
diff --git a/sbt/sbt b/sbt/sbt
index 7f47d90cf11bbcef036fd4c5d863fcf8fd4e2052..62ead8a69dbf69d21ee4b5a5e3fa9dcc86006451 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -25,37 +25,26 @@ URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/s
 URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
 JAR=sbt/sbt-launch-${SBT_VERSION}.jar
 
-printf "Checking for system sbt ["
-if hash sbt 2>/dev/null; then 
-  printf "FOUND]\n"
-  # Use System SBT
-  sbt "$@"
-else
-  printf "NOT FOUND]\n"
-  # Download sbt or use already downloaded
-  if [ ! -d .sbtlib ]; then
-    mkdir .sbtlib
-  fi
-  if [ ! -f ${JAR} ]; then
-    # Download
-    printf "Attempting to fetch sbt\n"
-    if hash curl 2>/dev/null; then
-      curl --progress-bar ${URL1} > ${JAR} || curl --progress-bar ${URL2} > ${JAR}
-    elif hash wget 2>/dev/null; then
-      wget --progress=bar ${URL1} -O ${JAR} || wget --progress=bar ${URL2} -O ${JAR}
-    else
-      printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
-      exit -1
-    fi
-  fi
-  if [ ! -f ${JAR} ]; then
-    # We failed to download
-    printf "Our attempt to download sbt locally to ${JAR} failed. Please install sbt manually from http://www.scala-sbt.org/\n"
+# Download sbt launch jar if it hasn't been downloaded yet
+if [ ! -f ${JAR} ]; then
+  # Download
+  printf "Attempting to fetch sbt\n"
+  if hash curl 2>/dev/null; then
+    curl --progress-bar ${URL1} > ${JAR} || curl --progress-bar ${URL2} > ${JAR}
+  elif hash wget 2>/dev/null; then
+    wget --progress=bar ${URL1} -O ${JAR} || wget --progress=bar ${URL2} -O ${JAR}
+  else
+    printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
     exit -1
   fi
-  printf "Launching sbt from ${JAR}\n"
-  java \
-    -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
-    -jar ${JAR} \
-    "$@"
 fi
+if [ ! -f ${JAR} ]; then
+  # We failed to download
+  printf "Our attempt to download sbt locally to ${JAR} failed. Please install sbt manually from http://www.scala-sbt.org/\n"
+  exit -1
+fi
+printf "Launching sbt from ${JAR}\n"
+java \
+  -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
+  -jar ${JAR} \
+  "$@"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 27d474c0a0459a3aa556ba8e7bea3bb967396dbc..d41f726f8322c9275fdebbfefc0d925702cf344f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,7 +175,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   private class NetworkReceiverActor extends Actor {
     logInfo("Attempting to register with tracker")
     val ip = env.conf.get("spark.driver.host", "localhost")
-    val port = env.conf.get("spark.driver.port", "7077").toInt
+    val port = env.conf.getInt("spark.driver.port", 7077)
     val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
     val tracker = env.actorSystem.actorSelection(url)
     val timeout = 5.seconds
@@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
     case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
-    val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
+    val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
     val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
     val blockStorageLevel = storageLevel
     val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 7341bfbc99399b94a1143e12752c1120bf3fbdb3..c8ee93bf5bde706f90cb23c6cd11f64635da9023 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -38,4 +38,4 @@ class Job(val time: Time, func: () => _) {
   }
 
   override def toString = id
-}
\ No newline at end of file
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 917b4c57f62f893c3a89bc764782414c5c3f454a..2fa6853ae0613968b35806032438d3388714116b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -110,7 +110,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     // or if the property is defined set it to that time
     if (clock.isInstanceOf[ManualClock]) {
       val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
-      val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
+      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
       clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
     }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9304fc1a9338d6b7336a607ccbd09346fcc6ce9b..30c070c274d85e0f487447fb467828c68f7d8d99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -31,7 +31,7 @@ private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
 
   val jobSets = new ConcurrentHashMap[Time, JobSet]
-  val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
+  val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
   val executor = Executors.newFixedThreadPool(numConcurrentJobs)
   val generator = new JobGenerator(this)
   val listenerBus = new StreamingListenerBus()
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6abb4d501779fd3193c3050d9eb1991576db0178..23781ea35c670803b4625bff4087866bdaf269ea 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -102,7 +102,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
       (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
       (args.userJar == null) -> "Error: You must specify a user jar!",
       (args.userClass == null) -> "Error: You must specify a user class!",
-      (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+      (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
       (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
         "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
       (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 324ef4616fe268b4005a2ec014fb0a8636d4e2e2..4b1b5da048df4d801dacb24f7df18245e98735ac 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -39,6 +39,7 @@ private[spark] class YarnClientSchedulerBackend(
     val defaultWorkerNumber = "1"
 
     val userJar = System.getenv("SPARK_YARN_APP_JAR")
+    val distFiles = System.getenv("SPARK_YARN_DIST_FILES")
     var workerCores = System.getenv("SPARK_WORKER_CORES")
     var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
     var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
@@ -64,7 +65,8 @@ private[spark] class YarnClientSchedulerBackend(
       "--worker-memory", workerMemory,
       "--worker-cores", workerCores,
       "--num-workers", workerNumber,
-      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher",
+      "--files", distFiles
     )
 
     val args = new ClientArguments(argsArray, conf)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 440ad5cde54eb4c8812bbb510329a2928c0a051e..be323d77835a8892eb3e481eba83f31f7dc3e8b9 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -122,7 +122,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
       (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
       (args.userJar == null) -> "Error: You must specify a user jar!",
       (args.userClass == null) -> "Error: You must specify a user class!",
-      (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+      (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
       (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
         "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
       (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +