diff --git a/bin/pyspark b/bin/pyspark
index d6810f4686bf56ae9f9caa1c620c2a2633135f21..ed6f8da73035a5176e1f184d42e1a113d725e92c 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -59,12 +59,7 @@ if [ -n "$IPYTHON_OPTS" ]; then
 fi
 
 if [[ "$IPYTHON" = "1" ]] ; then
-  # IPython <1.0.0 doesn't honor PYTHONSTARTUP, while 1.0.0+ does. 
-  # Hence we clear PYTHONSTARTUP and use the -c "%run $IPYTHONSTARTUP" command which works on all versions
-  # We also force interactive mode with "-i"
-  IPYTHONSTARTUP=$PYTHONSTARTUP
-  PYTHONSTARTUP=
-  exec ipython "$IPYTHON_OPTS" -i -c "%run $IPYTHONSTARTUP"
+  exec ipython $IPYTHON_OPTS
 else
   exec "$PYSPARK_PYTHON" "$@"
 fi
diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index 17d1978dde4d7ffad83621c4c548b844852462b7..f7f853559468ae105b3b66cc8d856eaaaa25b044 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -5,5 +5,7 @@ log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
 
-# Ignore messages below warning level from Jetty, because it's a bit verbose
+# Settings to quiet third party logs that are too verbose
 log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index 17d1978dde4d7ffad83621c4c548b844852462b7..f7f853559468ae105b3b66cc8d856eaaaa25b044 100644
--- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
@@ -5,5 +5,7 @@ log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
 
-# Ignore messages below warning level from Jetty, because it's a bit verbose
+# Settings to quiet third party logs that are too verbose
 log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d83c88985a950b85a4e565697b7b71b40cd4816b..139048d5c72f1111909f008b33d5bd68918c6eef 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -677,10 +677,10 @@ class SparkContext(
         key = uri.getScheme match {
           // A JAR file which exists only on the driver node
           case null | "file" =>
-            if (SparkHadoopUtil.get.isYarnMode()) {
-              // In order for this to work on yarn the user must specify the --addjars option to
-              // the client to upload the file into the distributed cache to make it show up in the
-              // current working directory.
+            if (SparkHadoopUtil.get.isYarnMode() && master == "yarn-standalone") {
+              // In order for this to work in yarn standalone mode the user must specify the 
+              // --addjars option to the client to upload the file into the distributed cache 
+              // of the AM to make it show up in the current working directory.
               val fileName = new Path(uri.getPath).getName()
               try {
                 env.httpFileServer.addJar(new File(fileName))
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index dbb0cb90f51862ba44135af89ebe26f534557b46..9485bfd89eb5797c21bdd06ea217c6cb0bb89508 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -67,11 +67,11 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
               <li><strong>User:</strong> {app.desc.user}</li>
               <li><strong>Cores:</strong>
                 {
-                if (app.desc.maxCores == Integer.MAX_VALUE) {
+                if (app.desc.maxCores == None) {
                   "Unlimited (%s granted)".format(app.coresGranted)
                 } else {
                   "%s (%s granted, %s left)".format(
-                    app.desc.maxCores, app.coresGranted, app.coresLeft)
+                    app.desc.maxCores.get, app.coresGranted, app.coresLeft)
                 }
                 }
               </li>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 043e01dbfbf28d930eb08ad6e03a12be5ab5a0f2..38b536023b4504a60eb8ba5f57a95f6d38f569ba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -106,7 +106,7 @@ class DAGScheduler(
   // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
   // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
   // as more failure events come in
-  val RESUBMIT_TIMEOUT = 50.milliseconds
+  val RESUBMIT_TIMEOUT = 200.milliseconds
 
   // The time, in millis, to wake up between polls of the completion queue in order to potentially
   // resubmit failed stages
@@ -196,7 +196,7 @@ class DAGScheduler(
        */
       def receive = {
         case event: DAGSchedulerEvent =>
-          logDebug("Got event of type " + event.getClass.getName)
+          logTrace("Got event of type " + event.getClass.getName)
 
           /**
            * All events are forwarded to `processEvent()`, so that the event processing logic can
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b24298a55e89b3410d8bace91e026795d8f8d4..6e0ff143b7179c78ae715064b1a25a826d257d97 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -64,7 +64,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
-    conf.getBoolean("spark.shuffle.consolidateFiles", false)
+    conf.getBoolean("spark.shuffle.consolidateFiles", true)
 
   private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
 
diff --git a/docs/configuration.md b/docs/configuration.md
index 6717757781974b35157d57d94f8397061bd5e3e7..b1a0e19167d1fa01a5d780b1243e94dfa477f057 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -371,7 +371,7 @@ Apart from these, the following properties are also available, and may be useful
 
 <tr>
   <td>spark.shuffle.consolidateFiles</td>
-  <td>false</td>
+  <td>true</td>
   <td>
     If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.
   </td>
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index dc187b3efec9b7b7b8f6075275a4faf9da40b6fe..c4236f83124b213a9293e2f1dece69d5edc06dd3 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -99,8 +99,9 @@ $ MASTER=local[4] ./bin/pyspark
 
 ## IPython
 
-It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter.
-To do this, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
+It is also possible to launch PySpark in [IPython](http://ipython.org), the 
+enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To 
+use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
 
 {% highlight bash %}
 $ IPYTHON=1 ./bin/pyspark
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index b20627010798a9fe8461a661c281f12847593eab..3bd62646bab060eba316406fa9a1c7a4983a31b1 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -101,7 +101,19 @@ With this mode, your application is actually run on the remote machine where the
 
 With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR
 
-In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh
+Configuration in yarn-client mode:
+
+In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
+
+* `SPARK_YARN_APP_JAR`, Path to your application's JAR file (required)
+* `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2)
+* `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1).
+* `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
+* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
+* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
+* `SPARK_YARN_QUEUE`, The hadoop queue to use for allocation requests (Default: 'default')
+* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.
+* `SPARK_YARN_DIST_ARCHIVES`, Comma separated list of archives to be distributed with the job.
 
 For example:
 
@@ -114,7 +126,6 @@ For example:
     SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
     MASTER=yarn-client ./bin/spark-shell
 
-You can also send extra files to yarn cluster for worker to use by exporting SPARK_YARN_DIST_FILES=file1,file2... etc.
 
 # Building Spark for Hadoop/YARN 2.2.x
 
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index b11cfa667eb9238671b1126357e5d4e5642b8b51..7b5a243e26414ef3d77212f360707ba2c01f460a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -47,6 +47,8 @@ public final class JavaFlumeEventCount {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     String master = args[0];
     String host = args[1];
     int port = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 16b8a948e6154ad7527041efdfb546a2f859d11f..04f62ee2041451db8ad249cd655762d0d5fde503 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -59,6 +59,8 @@ public final class JavaKafkaWordCount {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     // Create the context with a 1 second batch size
     JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
             new Duration(2000), System.getenv("SPARK_HOME"),
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index e96996aa75ce5dc11815716104a55b047d17edb5..349d826ab5df76d02de32d70202ac93b6502bee3 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -38,7 +38,7 @@ import java.util.regex.Pattern;
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ *    `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
  */
 public final class JavaNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
@@ -53,6 +53,8 @@ public final class JavaNetworkWordCount {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     // Create the context with a 1 second batch size
     JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
             new Duration(1000), System.getenv("SPARK_HOME"),
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index e05551ab833010df77cb761a17709583967fa84a..7ef9c6c8f4aaf0cad16304289bda46a4a114fc03 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -41,6 +41,8 @@ public final class JavaQueueStream {
       System.exit(1);
     }
 
+    StreamingExamples.setStreamingLogLevels();
+
     // Create the context
     JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
             System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 4e0058cd707777c32ce3baa4d68e7d6e0d9d1e09..57e1b1f806e82ecfb88b3fa88d0360e77ea28bd4 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -18,17 +18,13 @@
 package org.apache.spark.streaming.examples
 
 import scala.collection.mutable.LinkedList
-import scala.util.Random
 import scala.reflect.ClassTag
+import scala.util.Random
 
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.actor.actorRef2Scala
+import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.receivers.Receiver
 import org.apache.spark.util.AkkaUtils
@@ -147,6 +143,8 @@ object ActorWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Seq(master, host, port) = args.toSeq
 
     // Create the context and set the batch size
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index ae3709b3d97f5561f857b7d6fab8ef0436888cbc..a59be7899dd37de2e50cfcd49e625992af589f2c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.util.IntParam
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
 
 /**
  *  Produces a count of events received from Flume.
@@ -44,6 +44,8 @@ object FlumeEventCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Array(master, host, IntParam(port)) = args
 
     val batchInterval = Milliseconds(2000)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index ea6ea674196a1effb94ab7138998d4076cb41799..704b315ef8b2214ece9bb44ab1518d1838c9dfd5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 
-
 /**
  * Counts words in new text files created in the given directory
  * Usage: HdfsWordCount <master> <directory>
@@ -38,6 +37,8 @@ object HdfsWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     // Create the context
     val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 31a94bd224a45f8f68f177777c7d37b9ff423de7..4a3d81c09a122aadbc5bc9672694a8be78261ce4 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -23,8 +23,8 @@ import kafka.producer._
 
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.util.RawTextHelper._
 import org.apache.spark.streaming.kafka._
+import org.apache.spark.streaming.util.RawTextHelper._
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
@@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._
  */
 object KafkaWordCount {
   def main(args: Array[String]) {
-    
     if (args.length < 5) {
       System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Array(master, zkQuorum, group, topics, numThreads) = args
 
     val ssc =  new StreamingContext(master, "KafkaWordCount", Seconds(2),
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 325290b66f4decbe55ed025a865816d09ecbcc5c..78b49fdcf1eb3a84bd8d4b6814b978c039e9f689 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,12 +17,8 @@
 
 package org.apache.spark.streaming.examples
 
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -43,6 +39,8 @@ object MQTTPublisher {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Seq(brokerUrl, topic) = args.toSeq
 
     try {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 4b896eaccb11c8f14d075686c6ce3607a07091bf..25f7013307fef4d59a93baecca19a97fb197f0e7 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -40,6 +40,8 @@ object NetworkWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     // Create the context with a 1 second batch size
     val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
index 9d640e716bca978b74c76bb5cab3ffef6ad37ebf..4d4968ba6ae3e0d2d90685bc2841d6895c139501 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.streaming.examples
 
+import scala.collection.mutable.SynchronizedQueue
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 
-import scala.collection.mutable.SynchronizedQueue
-
 object QueueStream {
   
   def main(args: Array[String]) {
@@ -30,7 +30,9 @@ object QueueStream {
       System.err.println("Usage: QueueStream <master>")
       System.exit(1)
     }
-    
+
+    StreamingExamples.setStreamingLogLevels()
+
     // Create the context
     val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index c0706d07249824cc740968d141e2d874c9c185c6..3d08d86567a9abe9d813f54811245196953e22c6 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.util.IntParam
 import org.apache.spark.storage.StorageLevel
-
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.util.RawTextHelper
+import org.apache.spark.util.IntParam
 
 /**
  * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
@@ -45,6 +44,8 @@ object RawNetworkGrep {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
 
     // Create the context
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 002db57d592b2a88f491ca1813d95cd9e1a14ca6..1183eba84686bd5712c7668e58af05add060691d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -39,6 +39,8 @@ object StatefulNetworkWordCount {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val updateFunc = (values: Seq[Int], state: Option[Int]) => {
       val currentCount = values.foldLeft(0)(_ + _)
 
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d41d84a980dc73b8f4bcacf56d2a4de2af399c05
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.Logging
+
+import org.apache.log4j.{Level, Logger}
+
+/** Utility functions for Spark Streaming examples. */
+object StreamingExamples extends Logging {
+
+  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+  def setStreamingLogLevels() {
+    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+    if (!log4jInitialized) {
+      // We first log something to initialize Spark's default logging, then we override the
+      // logging level.
+      logInfo("Setting log level to [WARN] for streaming example." +
+        " To override add a custom log4j.properties to the classpath.")
+      Logger.getRootLogger.setLevel(Level.WARN)
+    }
+  }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 3ccdc908e23c43d0c3424a5b6719950e7c658702..80b5a98b142c1218047d957f1dd554e81e316b64 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
 import com.twitter.algebird._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
 
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.twitter._
 
 /**
@@ -51,6 +51,8 @@ object TwitterAlgebirdCMS {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     // CMS parameters
     val DELTA = 1E-3
     val EPS = 0.01
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c7e83e76b00570e721ee1fe965d119178b14528d..cb2f2c51a0cd65c2bf6355d1b6632c617574fb68 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.streaming.examples
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
-import com.twitter.algebird.HyperLogLog._
 import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.twitter._
 
 /**
@@ -44,6 +45,8 @@ object TwitterAlgebirdHLL {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
     val BIT_SIZE = 12
     val (master, filters) = (args.head, args.tail)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index e2b0418d55d2b14b08d50aa1500a6806306a7f5c..16c10feaba2c11bb265feb28fa0307e8c740dc49 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -36,6 +36,8 @@ object TwitterPopularTags {
       System.exit(1)
     }
 
+    StreamingExamples.setStreamingLogLevels()
+
     val (master, filters) = (args.head, args.tail)
 
     val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 03902ec353babfbc6d2991d6af01d8e6ed19d09e..12d2a1084f9002bef4957faf1ea126229219db7e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -76,6 +76,7 @@ object ZeroMQWordCount {
           "In local mode, <master> should be 'local[n]' with n > 1")
       System.exit(1)
     }
+    StreamingExamples.setStreamingLogLevels()
     val Seq(master, url, topic) = args.toSeq
 
     // Create the context and set the batch size
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index 807af199f4fd0813b2961784f1dca9ed88d9a69a..da6b67bcceefe3ad5ad2b8e8690a462b763529b1 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.streaming.examples.clickstream
 
+import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.examples.StreamingExamples
 
 /** Analyses a streaming dataset of web page views. This class demonstrates several types of
   * operators available in Spark streaming.
@@ -36,6 +37,7 @@ object PageViewStream {
                          " errorRatePerZipCode, activeUserCount, popularUsersSeen")
       System.exit(1)
     }
+    StreamingExamples.setStreamingLogLevels()
     val metric = args(0)
     val host = args(1)
     val port = args(2).toInt
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 2cdd45291d9a4c8359b7b0126166493e8cf71709..b98f4a5101a780475b37674470e9c6a9cf6e518e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -332,7 +332,7 @@ abstract class DStream[T: ClassTag] (
   protected[streaming] def clearMetadata(time: Time) {
     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
     generatedRDDs --= oldRDDs.keys
-    logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
     dependencies.foreach(_.clearMetadata(time))
   }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 0cca6d50e6820ea43d40b32f38ad37b0d868f5a1..eee9591ffc383e808978c35a7dce2e356fa2eca4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -104,20 +104,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def getOutputStreams() = this.synchronized { outputStreams.toArray }
 
   def generateJobs(time: Time): Seq[Job] = {
-    logInfo("Generating jobs for time " + time)
-    this.synchronized {
-      val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
-      logInfo("Generated " + jobs.length + " jobs for time " + time)
-      jobs
+    logDebug("Generating jobs for time " + time)
+    val jobs = this.synchronized {
+      outputStreams.flatMap(outputStream => outputStream.generateJob(time))
     }
+    logDebug("Generated " + jobs.length + " jobs for time " + time)
+    jobs
   }
 
   def clearMetadata(time: Time) {
-    logInfo("Clearing metadata for time " + time)
+    logDebug("Clearing metadata for time " + time)
     this.synchronized {
       outputStreams.foreach(_.clearMetadata(time))
     }
-    logInfo("Cleared old metadata for time " + time)
+    logDebug("Cleared old metadata for time " + time)
   }
 
   def updateCheckpointData(time: Time) {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2bb11e54c549af037fc75a2dd92300e3588828b4..2e46d750c4a3801f5d4cc381dcd2f8dd759561a3 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -127,14 +127,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     // local dirs, so lets check both. We assume one of the 2 is set.
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-      .getOrElse(""))
-
-    if (localDirs.isEmpty()) {
-      throw new Exception("Yarn Local dirs can't be empty")
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+ 
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
     }
-    localDirs
-  }
+  } 
 
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index ddfec1a4ac6728e8dd0030a6b5fcf164a1dff42c..62b20b8fbaf19da6cc13d2eb531e4ec78aa6f199 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   def run() {
 
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
@@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
     val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // must be <= timeoutInterval/ 2.
-    // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
-    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    // we want to be reasonably responsive without causing too many requests to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -119,6 +125,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     System.exit(0)
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
+    }
+  }
+
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b1b5da048df4d801dacb24f7df18245e98735ac..22e55e0c60647978d4543d14f78444fdfa0d2e8d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, SparkContext}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
+import scala.collection.mutable.ArrayBuffer
+
 private[spark] class YarnClientSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext)
@@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend(
   var client: Client = null
   var appId: ApplicationId = null
 
+  private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) {
+    Option(System.getenv(optionalParam)) foreach {
+      optParam => {
+        arrayBuf += (optionName, optParam)
+      }
+    }
+  }
+
   override def start() {
     super.start()
 
-    val defalutWorkerCores = "2"
-    val defalutWorkerMemory = "512m"
-    val defaultWorkerNumber = "1"
-
     val userJar = System.getenv("SPARK_YARN_APP_JAR")
-    val distFiles = System.getenv("SPARK_YARN_DIST_FILES")
-    var workerCores = System.getenv("SPARK_WORKER_CORES")
-    var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
-    var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
-
     if (userJar == null)
       throw new SparkException("env SPARK_YARN_APP_JAR is not set")
 
-    if (workerCores == null)
-      workerCores = defalutWorkerCores
-    if (workerMemory == null)
-      workerMemory = defalutWorkerMemory
-    if (workerNumber == null)
-      workerNumber = defaultWorkerNumber
-
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
 
-    val argsArray = Array[String](
+    val argsArrayBuf = new ArrayBuffer[String]()
+    argsArrayBuf += (
       "--class", "notused",
       "--jar", userJar,
       "--args", hostport,
-      "--worker-memory", workerMemory,
-      "--worker-cores", workerCores,
-      "--num-workers", workerNumber,
-      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher",
-      "--files", distFiles
+      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
     )
 
-    val args = new ClientArguments(argsArray, conf)
+    // process any optional arguments, use the defaults already defined in ClientArguments 
+    // if things aren't specified
+    Map("--master-memory" -> "SPARK_MASTER_MEMORY",
+      "--num-workers" -> "SPARK_WORKER_INSTANCES",
+      "--worker-memory" -> "SPARK_WORKER_MEMORY",
+      "--worker-cores" -> "SPARK_WORKER_CORES",
+      "--queue" -> "SPARK_YARN_QUEUE",
+      "--name" -> "SPARK_YARN_APP_NAME",
+      "--files" -> "SPARK_YARN_DIST_FILES",
+      "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
+    .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
+      
+    logDebug("ClientArguments called with: " + argsArrayBuf)
+    val args = new ClientArguments(argsArrayBuf.toArray, conf)
     client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 69ae14ce8385cfb3b978481d90b2681d02a2ae80..4b777d5fa7a283e78744923dce484b13b1cc1431 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -116,14 +116,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     // local dirs, so lets check both. We assume one of the 2 is set.
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-      .getOrElse(""))
-
-    if (localDirs.isEmpty()) {
-      throw new Exception("Yarn Local dirs can't be empty")
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+ 
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
     }
-    localDirs
-  }
+  } 
 
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index be323d77835a8892eb3e481eba83f31f7dc3e8b9..952e963389c0aa7f1b9e3a3a4557ff4736920964 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -99,6 +99,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     appContext.setApplicationName(args.appName)
     appContext.setQueue(args.amQueue)
     appContext.setAMContainerSpec(amContainer)
+    appContext.setApplicationType("SPARK")
 
     // Memory for the ApplicationMaster.
     val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 49248a8516b9cef6941524500965354d9ac30d08..78353224fa4b8b51aea9ba6d56ab04c2e1a479cd 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   def run() {
 
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
     amClient = AMRMClient.createAMRMClient()
     amClient.init(yarnConf)
     amClient.start()
@@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
     val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // must be <= timeoutInterval/ 2.
-    // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
-    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
+    // we want to be reasonably responsive without causing too many requests to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -110,6 +116,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     System.exit(0)
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+ 
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
+    }
+  } 
+
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())