diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2c04e4ddfbcb73a4f9754924981c4e45da02c669..86ac307fc84badee04da79e64f6553e07debf540 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -410,10 +410,10 @@ private[spark] object Utils extends Logging {
     // Decompress the file if it's a .tar or .tar.gz
     if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
       logInfo("Untarring " + fileName)
-      Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+      executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
     } else if (fileName.endsWith(".tar")) {
       logInfo("Untarring " + fileName)
-      Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+      executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
     }
     // Make the file executable - That's necessary for scripts
     FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
@@ -956,25 +956,25 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Execute a command in the given working directory, throwing an exception if it completes
-   * with an exit code other than 0.
+   * Execute a command and return the process running the command.
    */
-  def execute(command: Seq[String], workingDir: File) {
-    val process = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-        .redirectErrorStream(true)
-        .start()
-    new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-    val exitCode = process.waitFor()
-    if (exitCode != 0) {
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
+  def executeCommand(
+      command: Seq[String],
+      workingDir: File = new File("."),
+      extraEnvironment: Map[String, String] = Map.empty,
+      redirectStderr: Boolean = true): Process = {
+    val builder = new ProcessBuilder(command: _*).directory(workingDir)
+    val environment = builder.environment()
+    for ((key, value) <- extraEnvironment) {
+      environment.put(key, value)
+    }
+    val process = builder.start()
+    if (redirectStderr) {
+      val threadName = "redirect stderr for command " + command(0)
+      def log(s: String): Unit = logInfo(s)
+      processStreamByLine(threadName, process.getErrorStream, log)
     }
+    process
   }
 
   /**
@@ -983,31 +983,13 @@ private[spark] object Utils extends Logging {
   def executeAndGetOutput(
       command: Seq[String],
       workingDir: File = new File("."),
-      extraEnvironment: Map[String, String] = Map.empty): String = {
-    val builder = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-    val environment = builder.environment()
-    for ((key, value) <- extraEnvironment) {
-      environment.put(key, value)
-    }
-
-    val process = builder.start()
-    new Thread("read stderr for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
-          logInfo(line)
-        }
-      }
-    }.start()
+      extraEnvironment: Map[String, String] = Map.empty,
+      redirectStderr: Boolean = true): String = {
+    val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
     val output = new StringBuffer
-    val stdoutThread = new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
-          output.append(line)
-        }
-      }
-    }
-    stdoutThread.start()
+    val threadName = "read stdout for " + command(0)
+    def appendToOutput(s: String): Unit = output.append(s)
+    val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
     val exitCode = process.waitFor()
     stdoutThread.join()   // Wait for it to finish reading output
     if (exitCode != 0) {
@@ -1017,6 +999,25 @@ private[spark] object Utils extends Logging {
     output.toString
   }
 
+  /**
+   * Return and start a daemon thread that processes the content of the input stream line by line.
+   */
+  def processStreamByLine(
+      threadName: String,
+      inputStream: InputStream,
+      processLine: String => Unit): Thread = {
+    val t = new Thread(threadName) {
+      override def run() {
+        for (line <- Source.fromInputStream(inputStream).getLines()) {
+          processLine(line)
+        }
+      }
+    }
+    t.setDaemon(true)
+    t.start()
+    t
+  }
+
   /**
    * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
    * default UncaughtExceptionHandler
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 8a54360e81795f5f603bf61b8e69033eade9ae3e..9bd5dfec8703a19090c140e00c4ea0521a8c44c6 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -28,31 +28,29 @@ import org.apache.spark.util.Utils
 
 class DriverSuite extends FunSuite with Timeouts {
 
-  test("driver should exit after finishing") {
+  test("driver should exit after finishing without cleanup (SPARK-530)") {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
-    val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
+    val masters = Table("master", "local", "local-cluster[2,1,512]")
     forAll(masters) { (master: String) =>
-      failAfter(60 seconds) {
-        Utils.executeAndGetOutput(
-          Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
-          new File(sparkHome),
-          Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
-      }
+      val process = Utils.executeCommand(
+        Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
+        new File(sparkHome),
+        Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+      failAfter(60 seconds) { process.waitFor() }
+      // Ensure we still kill the process in case it timed out
+      process.destroy()
     }
   }
 }
 
 /**
- * Program that creates a Spark driver but doesn't call SparkContext.stop() or
- * Sys.exit() after finishing.
+ * Program that creates a Spark driver but doesn't call SparkContext#stop() or
+ * sys.exit() after finishing.
  */
 object DriverWithoutCleanup {
   def main(args: Array[String]) {
     Utils.configTestLog4j("INFO")
-    // Bind the web UI to an ephemeral port in order to avoid conflicts with other tests running on
-    // the same machine (we shouldn't just disable the UI here, since that might mask bugs):
-    val conf = new SparkConf().set("spark.ui.port", "0")
+    val conf = new SparkConf
     val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
     sc.parallelize(1 to 100, 4).count()
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 065b7534cece66048b23db6363c9f84fef9454aa..82628ad3abd995948dfad0652db71a1f9fcf9ef2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,25 +21,28 @@ import java.io._
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
+
 import org.apache.spark._
 import org.apache.spark.deploy.SparkSubmit._
 import org.apache.spark.util.{ResetSystemProperties, Utils}
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
 
 // Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
 // of properties that neeed to be cleared after tests.
-class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
+class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties with Timeouts {
   def beforeAll() {
     System.setProperty("spark.testing", "true")
   }
 
-  val noOpOutputStream = new OutputStream {
+  private val noOpOutputStream = new OutputStream {
     def write(b: Int) = {}
   }
 
   /** Simple PrintStream that reads data into a buffer */
-  class BufferPrintStream extends PrintStream(noOpOutputStream) {
+  private class BufferPrintStream extends PrintStream(noOpOutputStream) {
     var lineBuffer = ArrayBuffer[String]()
     override def println(line: String) {
       lineBuffer += line
@@ -47,7 +50,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
   }
 
   /** Returns true if the script exits and the given search string is printed. */
-  def testPrematureExit(input: Array[String], searchString: String) = {
+  private def testPrematureExit(input: Array[String], searchString: String) = {
     val printStream = new BufferPrintStream()
     SparkSubmit.printStream = printStream
 
@@ -290,7 +293,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
       "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
       "--name", "testApp",
       "--master", "local",
-      "--conf", "spark.ui.enabled=false",
       unusedJar.toString)
     runSparkSubmit(args)
   }
@@ -305,7 +307,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
       "--name", "testApp",
       "--master", "local-cluster[2,1,512]",
       "--jars", jarsString,
-      "--conf", "spark.ui.enabled=false",
       unusedJar.toString)
     runSparkSubmit(args)
   }
@@ -430,15 +431,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
   }
 
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
-  def runSparkSubmit(args: Seq[String]): String = {
+  private def runSparkSubmit(args: Seq[String]): Unit = {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    Utils.executeAndGetOutput(
+    val process = Utils.executeCommand(
       Seq("./bin/spark-submit") ++ args,
       new File(sparkHome),
       Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+    failAfter(60 seconds) { process.waitFor() }
+    // Ensure we still kill the process in case it timed out
+    process.destroy()
   }
 
-  def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
+  private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
     val tmpDir = Utils.createTempDir()
 
     val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")