diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a0eae774268edf5f8694a0e4321807d190f63f8e..b8978e25a02d2c0585e38a80e1f1bd4465cedd26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -324,55 +324,20 @@ object SparkSubmit {
         // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
         args.mainClass = "org.apache.spark.deploy.PythonRunner"
         args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
-        args.files = mergeFileLists(args.files, args.primaryResource)
+        if (clusterManager != YARN) {
+          // The YARN backend distributes the primary file differently, so don't merge it.
+          args.files = mergeFileLists(args.files, args.primaryResource)
+        }
+      }
+      if (clusterManager != YARN) {
+        // The YARN backend handles python files differently, so don't merge the lists.
+        args.files = mergeFileLists(args.files, args.pyFiles)
       }
-      args.files = mergeFileLists(args.files, args.pyFiles)
       if (args.pyFiles != null) {
         sysProps("spark.submit.pyFiles") = args.pyFiles
       }
     }
 
-    // In yarn mode for a python app, add pyspark archives to files
-    // that can be distributed with the job
-    if (args.isPython && clusterManager == YARN) {
-      var pyArchives: String = null
-      val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
-      if (pyArchivesEnvOpt.isDefined) {
-        pyArchives = pyArchivesEnvOpt.get
-      } else {
-        if (!sys.env.contains("SPARK_HOME")) {
-          printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
-        }
-        val pythonPath = new ArrayBuffer[String]
-        for (sparkHome <- sys.env.get("SPARK_HOME")) {
-          val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
-          val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
-          if (!pyArchivesFile.exists()) {
-            printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
-          }
-          val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
-          if (!py4jFile.exists()) {
-            printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
-              "in yarn mode.")
-          }
-          pythonPath += pyArchivesFile.getAbsolutePath()
-          pythonPath += py4jFile.getAbsolutePath()
-        }
-        pyArchives = pythonPath.mkString(",")
-      }
-
-      pyArchives = pyArchives.split(",").map { localPath =>
-        val localURI = Utils.resolveURI(localPath)
-        if (localURI.getScheme != "local") {
-          args.files = mergeFileLists(args.files, localURI.toString)
-          new Path(localPath).getName
-        } else {
-          localURI.getPath
-        }
-      }.mkString(File.pathSeparator)
-      sysProps("spark.submit.pyArchives") = pyArchives
-    }
-
     // If we're running a R app, set the main class to our specific R runner
     if (args.isR && deployMode == CLIENT) {
       if (args.primaryResource == SPARKR_SHELL) {
@@ -386,19 +351,10 @@ object SparkSubmit {
       }
     }
 
-    if (isYarnCluster) {
-      // In yarn-cluster mode for a python app, add primary resource and pyFiles to files
-      // that can be distributed with the job
-      if (args.isPython) {
-        args.files = mergeFileLists(args.files, args.primaryResource)
-        args.files = mergeFileLists(args.files, args.pyFiles)
-      }
-
+    if (isYarnCluster && args.isR) {
       // In yarn-cluster mode for a R app, add primary resource to files
       // that can be distributed with the job
-      if (args.isR) {
-        args.files = mergeFileLists(args.files, args.primaryResource)
-      }
+      args.files = mergeFileLists(args.files, args.primaryResource)
     }
 
     // Special flag to avoid deprecation warnings at the client
@@ -515,17 +471,18 @@ object SparkSubmit {
       }
     }
 
+    // Let YARN know it's a pyspark app, so it distributes needed libraries.
+    if (clusterManager == YARN && args.isPython) {
+      sysProps.put("spark.yarn.isPython", "true")
+    }
+
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
     if (isYarnCluster) {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
       if (args.isPython) {
-        val mainPyFile = new Path(args.primaryResource).getName
-        childArgs += ("--primary-py-file", mainPyFile)
+        childArgs += ("--primary-py-file", args.primaryResource)
         if (args.pyFiles != null) {
-          // These files will be distributed to each machine's working directory, so strip the
-          // path prefix
-          val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
-          childArgs += ("--py-files", pyFilesNames)
+          childArgs += ("--py-files", args.pyFiles)
         }
         childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
       } else if (args.isR) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 002d7b6eaf498c8c858699a32d1f8e9aff6364c1..83dafa4a125d299b818f4382cf2df99ce1ae798c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.spark.rpc._
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.SparkException
-import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -46,6 +46,14 @@ private[spark] class ApplicationMaster(
     client: YarnRMClient)
   extends Logging {
 
+  // Load the properties file with the Spark configuration and set entries as system properties,
+  // so that user code run inside the AM also has access to them.
+  if (args.propertiesFile != null) {
+    Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
+      sys.props(k) = v
+    }
+  }
+
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
 
@@ -490,9 +498,11 @@ private[spark] class ApplicationMaster(
         new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
       }
 
+    var userArgs = args.userArgs
     if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
-      System.setProperty("spark.submit.pyFiles",
-        PythonRunner.formatPaths(args.pyFiles).mkString(","))
+      // When running pyspark, the app is run using PythonRunner. The second argument is the list
+      // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
+      userArgs = Seq(args.primaryPyFile, "") ++ userArgs
     }
     if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
       // TODO(davies): add R dependencies here
@@ -503,9 +513,7 @@ private[spark] class ApplicationMaster(
     val userThread = new Thread {
       override def run() {
         try {
-          val mainArgs = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
+          mainMethod.invoke(null, userArgs.toArray)
           finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
           logDebug("Done running users class")
         } catch {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index ae6dc1094d724d4e7fedb9ed46b14d1d9ee807ba..68e9f6b4db7f4bbd27722983d9c8d3e1c085cc7e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userClass: String = null
   var primaryPyFile: String = null
   var primaryRFile: String = null
-  var pyFiles: String = null
-  var userArgs: Seq[String] = Seq[String]()
+  var userArgs: Seq[String] = Nil
   var executorMemory = 1024
   var executorCores = 1
   var numExecutors = DEFAULT_NUMBER_EXECUTORS
+  var propertiesFile: String = null
 
   parseArgs(args.toList)
 
@@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
           primaryRFile = value
           args = tail
 
-        case ("--py-files") :: value :: tail =>
-          pyFiles = value
-          args = tail
-
         case ("--args" | "--arg") :: value :: tail =>
           userArgsBuffer += value
           args = tail
@@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
           executorCores = value
           args = tail
 
+        case ("--properties-file") :: value :: tail =>
+          propertiesFile = value
+          args = tail
+
         case _ =>
           printUsageAndExit(1, args)
       }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f4d43214b08ca33fd73755396b2ba2b41aeb5d29..ec9402afff32992656fcfc9fa8d4cb4ca8ba5404 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
+  OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
 import java.security.PrivilegedExceptionAction
-import java.util.UUID
+import java.util.{Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConversions._
@@ -29,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
 import scala.reflect.runtime.universe
 import scala.util.{Try, Success, Failure}
 
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.base.Objects
 import com.google.common.io.Files
 
@@ -247,7 +249,9 @@ private[spark] class Client(
    * This is used for setting up a container launch context for our ApplicationMaster.
    * Exposed for testing.
    */
-  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+  def prepareLocalResources(
+      appStagingDir: String,
+      pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
     logInfo("Preparing resources for our AM container")
     // Upload Spark and the application JAR to the remote file system if necessary,
     // and add them as local resources to the application master.
@@ -277,20 +281,6 @@ private[spark] class Client(
           "for alternatives.")
     }
 
-    // If we passed in a keytab, make sure we copy the keytab to the staging directory on
-    // HDFS, and setup the relevant environment vars, so the AM can login again.
-    if (loginFromKeytab) {
-      logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
-        " via the YARN Secure Distributed Cache.")
-      val localUri = new URI(args.keytab)
-      val localPath = getQualifiedLocalPath(localUri, hadoopConf)
-      val destinationPath = copyFileToRemote(dst, localPath, replication)
-      val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
-      distCacheMgr.addResource(
-        destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
-        sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
-    }
-
     def addDistributedUri(uri: URI): Boolean = {
       val uriStr = uri.toString()
       if (distributedUris.contains(uriStr)) {
@@ -302,6 +292,57 @@ private[spark] class Client(
       }
     }
 
+    /**
+     * Distribute a file to the cluster.
+     *
+     * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
+     * to HDFS (if not already there) and added to the application's distributed cache.
+     *
+     * @param path URI of the file to distribute.
+     * @param resType Type of resource being distributed.
+     * @param destName Name of the file in the distributed cache.
+     * @param targetDir Subdirectory where to place the file.
+     * @param appMasterOnly Whether to distribute only to the AM.
+     * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
+     *         localized path for non-local paths, or the input `path` for local paths.
+     *         The localized path will be null if the URI has already been added to the cache.
+     */
+    def distribute(
+        path: String,
+        resType: LocalResourceType = LocalResourceType.FILE,
+        destName: Option[String] = None,
+        targetDir: Option[String] = None,
+        appMasterOnly: Boolean = false): (Boolean, String) = {
+      val localURI = new URI(path.trim())
+      if (localURI.getScheme != LOCAL_SCHEME) {
+        if (addDistributedUri(localURI)) {
+          val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+          val linkname = targetDir.map(_ + "/").getOrElse("") +
+            destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+          val destPath = copyFileToRemote(dst, localPath, replication)
+          distCacheMgr.addResource(
+            fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
+            appMasterOnly = appMasterOnly)
+          (false, linkname)
+        } else {
+          (false, null)
+        }
+      } else {
+        (true, path.trim())
+      }
+    }
+
+    // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+    // HDFS, and setup the relevant environment vars, so the AM can login again.
+    if (loginFromKeytab) {
+      logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+        " via the YARN Secure Distributed Cache.")
+      val (_, localizedPath) = distribute(args.keytab,
+        destName = Some(sparkConf.get("spark.yarn.keytab")),
+        appMasterOnly = true)
+      require(localizedPath != null, "Keytab file already distributed.")
+    }
+
     /**
      * Copy the given main resource to the distributed cache if the scheme is not "local".
      * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
@@ -314,33 +355,18 @@ private[spark] class Client(
       (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
       (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
       ("log4j.properties", oldLog4jConf.orNull, null)
-    ).foreach { case (destName, _localPath, confKey) =>
-      val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (!localPath.isEmpty()) {
-        val localURI = new URI(localPath)
-        if (localURI.getScheme != LOCAL_SCHEME) {
-          if (addDistributedUri(localURI)) {
-            val src = getQualifiedLocalPath(localURI, hadoopConf)
-            val destPath = copyFileToRemote(dst, src, replication)
-            val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
-            distCacheMgr.addResource(destFs, hadoopConf, destPath,
-              localResources, LocalResourceType.FILE, destName, statCache)
-          }
-        } else if (confKey != null) {
+    ).foreach { case (destName, path, confKey) =>
+      if (path != null && !path.trim().isEmpty()) {
+        val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
+        if (isLocal && confKey != null) {
+          require(localizedPath != null, s"Path $path already distributed.")
           // If the resource is intended for local use only, handle this downstream
           // by setting the appropriate property
-          sparkConf.set(confKey, localPath)
+          sparkConf.set(confKey, localizedPath)
         }
       }
     }
 
-    createConfArchive().foreach { file =>
-      require(addDistributedUri(file.toURI()))
-      val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
-      distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
-        LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
-    }
-
     /**
      * Do the same for any additional resources passed in through ClientArguments.
      * Each resource category is represented by a 3-tuple of:
@@ -356,21 +382,10 @@ private[spark] class Client(
     ).foreach { case (flist, resType, addToClasspath) =>
       if (flist != null && !flist.isEmpty()) {
         flist.split(',').foreach { file =>
-          val localURI = new URI(file.trim())
-          if (localURI.getScheme != LOCAL_SCHEME) {
-            if (addDistributedUri(localURI)) {
-              val localPath = new Path(localURI)
-              val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-              val destPath = copyFileToRemote(dst, localPath, replication)
-              distCacheMgr.addResource(
-                fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
-              if (addToClasspath) {
-                cachedSecondaryJarLinks += linkname
-              }
-            }
-          } else if (addToClasspath) {
-            // Resource is intended for local use only and should be added to the class path
-            cachedSecondaryJarLinks += file.trim()
+          val (_, localizedPath) = distribute(file, resType = resType)
+          require(localizedPath != null)
+          if (addToClasspath) {
+            cachedSecondaryJarLinks += localizedPath
           }
         }
       }
@@ -379,11 +394,31 @@ private[spark] class Client(
       sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
     }
 
+    if (isClusterMode && args.primaryPyFile != null) {
+      distribute(args.primaryPyFile, appMasterOnly = true)
+    }
+
+    pySparkArchives.foreach { f => distribute(f) }
+
+    // The python files list needs to be treated especially. All files that are not an
+    // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
+    args.pyFiles.foreach { f =>
+      val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
+      distribute(f, targetDir = targetDir)
+    }
+
+    // Distribute an archive with Hadoop and Spark configuration for the AM.
+    val (_, confLocalizedPath) = distribute(createConfArchive().getAbsolutePath(),
+      resType = LocalResourceType.ARCHIVE,
+      destName = Some(LOCALIZED_CONF_DIR),
+      appMasterOnly = true)
+    require(confLocalizedPath != null)
+
     localResources
   }
 
   /**
-   * Create an archive with the Hadoop config files for distribution.
+   * Create an archive with the config files for distribution.
    *
    * These are only used by the AM, since executors will use the configuration object broadcast by
    * the driver. The files are zipped and added to the job as an archive, so that YARN will explode
@@ -395,8 +430,11 @@ private[spark] class Client(
    *
    * Currently this makes a shallow copy of the conf directory. If there are cases where a
    * Hadoop config directory contains subdirectories, this code will have to be fixed.
+   *
+   * The archive also contains some Spark configuration. Namely, it saves the contents of
+   * SparkConf in a file to be loaded by the AM process.
    */
-  private def createConfArchive(): Option[File] = {
+  private def createConfArchive(): File = {
     val hadoopConfFiles = new HashMap[String, File]()
     Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
       sys.env.get(envKey).foreach { path =>
@@ -411,28 +449,32 @@ private[spark] class Client(
       }
     }
 
-    if (!hadoopConfFiles.isEmpty) {
-      val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
-        new File(Utils.getLocalDir(sparkConf)))
+    val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
+      new File(Utils.getLocalDir(sparkConf)))
+    val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
 
-      val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
-      try {
-        hadoopConfStream.setLevel(0)
-        hadoopConfFiles.foreach { case (name, file) =>
-          if (file.canRead()) {
-            hadoopConfStream.putNextEntry(new ZipEntry(name))
-            Files.copy(file, hadoopConfStream)
-            hadoopConfStream.closeEntry()
-          }
+    try {
+      confStream.setLevel(0)
+      hadoopConfFiles.foreach { case (name, file) =>
+        if (file.canRead()) {
+          confStream.putNextEntry(new ZipEntry(name))
+          Files.copy(file, confStream)
+          confStream.closeEntry()
         }
-      } finally {
-        hadoopConfStream.close()
       }
 
-      Some(hadoopConfArchive)
-    } else {
-      None
+      // Save Spark configuration to a file in the archive.
+      val props = new Properties()
+      sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+      confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
+      val writer = new OutputStreamWriter(confStream, UTF_8)
+      props.store(writer, "Spark configuration.")
+      writer.flush()
+      confStream.closeEntry()
+    } finally {
+      confStream.close()
     }
+    confArchive
   }
 
   /**
@@ -460,7 +502,9 @@ private[spark] class Client(
   /**
    * Set up the environment for launching our ApplicationMaster container.
    */
-  private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+  private def setupLaunchEnv(
+      stagingDir: String,
+      pySparkArchives: Seq[String]): HashMap[String, String] = {
     logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
     val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
@@ -478,9 +522,6 @@ private[spark] class Client(
       val renewalInterval = getTokenRenewalInterval(stagingDirPath)
       sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
     }
-    // Set the environment variables to be passed on to the executors.
-    distCacheMgr.setDistFilesEnv(env)
-    distCacheMgr.setDistArchivesEnv(env)
 
     // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
     val amEnvPrefix = "spark.yarn.appMasterEnv."
@@ -497,15 +538,32 @@ private[spark] class Client(
       env("SPARK_YARN_USER_ENV") = userEnvs
     }
 
-    // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
-    // that can be passed on to the ApplicationMaster and the executors.
-    if (sparkConf.contains("spark.submit.pyArchives")) {
-      var pythonPath = sparkConf.get("spark.submit.pyArchives")
-      if (env.contains("PYTHONPATH")) {
-        pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
+    // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
+    // of the container processes too. Add all non-.py files directly to PYTHONPATH.
+    //
+    // NOTE: the code currently does not handle .py files defined with a "local:" scheme.
+    val pythonPath = new ListBuffer[String]()
+    val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+    if (pyFiles.nonEmpty) {
+      pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+        LOCALIZED_PYTHON_DIR)
+    }
+    (pySparkArchives ++ pyArchives).foreach { path =>
+      val uri = new URI(path)
+      if (uri.getScheme != LOCAL_SCHEME) {
+        pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+          new Path(path).getName())
+      } else {
+        pythonPath += uri.getPath()
       }
-      env("PYTHONPATH") = pythonPath
-      sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+    }
+
+    // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
+    if (pythonPath.nonEmpty) {
+      val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
+        .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+      env("PYTHONPATH") = pythonPathStr
+      sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
     }
 
     // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
@@ -555,8 +613,19 @@ private[spark] class Client(
     logInfo("Setting up container launch context for our AM")
     val appId = newAppResponse.getApplicationId
     val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val launchEnv = setupLaunchEnv(appStagingDir)
+    val pySparkArchives =
+      if (sys.props.getOrElse("spark.yarn.isPython", "false").toBoolean) {
+        findPySparkArchives()
+      } else {
+        Nil
+      }
+    val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
+    val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
+
+    // Set the environment variables to be passed on to the executors.
+    distCacheMgr.setDistFilesEnv(launchEnv)
+    distCacheMgr.setDistArchivesEnv(launchEnv)
+
     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
     amContainer.setLocalResources(localResources)
     amContainer.setEnvironment(launchEnv)
@@ -596,13 +665,6 @@ private[spark] class Client(
       javaOpts += "-XX:CMSIncrementalDutyCycle=10"
     }
 
-    // Forward the Spark configuration to the application master / executors.
-    // TODO: it might be nicer to pass these as an internal environment variable rather than
-    // as Java options, due to complications with string parsing of nested quotes.
-    for ((k, v) <- sparkConf.getAll) {
-      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
-    }
-
     // Include driver-specific java options if we are launching a driver
     if (isClusterMode) {
       val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions")
@@ -655,14 +717,8 @@ private[spark] class Client(
         Nil
       }
     val primaryPyFile =
-      if (args.primaryPyFile != null) {
-        Seq("--primary-py-file", args.primaryPyFile)
-      } else {
-        Nil
-      }
-    val pyFiles =
-      if (args.pyFiles != null) {
-        Seq("--py-files", args.pyFiles)
+      if (isClusterMode && args.primaryPyFile != null) {
+        Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
       } else {
         Nil
       }
@@ -678,9 +734,6 @@ private[spark] class Client(
       } else {
         Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
       }
-    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
-      args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
-    }
     if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
       args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
     }
@@ -688,11 +741,13 @@ private[spark] class Client(
       Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
     }
     val amArgs =
-      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++
+      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
         userArgs ++ Seq(
           "--executor-memory", args.executorMemory.toString + "m",
           "--executor-cores", args.executorCores.toString,
-          "--num-executors ", args.numExecutors.toString)
+          "--num-executors ", args.numExecutors.toString,
+          "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+            LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
 
     // Command for the ApplicationMaster
     val commands = prefixEnv ++ Seq(
@@ -857,6 +912,22 @@ private[spark] class Client(
       }
     }
   }
+
+  private def findPySparkArchives(): Seq[String] = {
+    sys.env.get("PYSPARK_ARCHIVES_PATH")
+      .map(_.split(",").toSeq)
+      .getOrElse {
+        val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator)
+        val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+        require(pyArchivesFile.exists(),
+          "pyspark.zip not found; cannot run pyspark application in YARN mode.")
+        val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+        require(py4jFile.exists(),
+          "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.")
+        Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
+      }
+  }
+
 }
 
 object Client extends Logging {
@@ -907,8 +978,14 @@ object Client extends Logging {
   // Distribution-defined classpath to add to processes
   val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
 
-  // Subdirectory where the user's hadoop config files will be placed.
-  val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+  // Subdirectory where the user's Spark and Hadoop config files will be placed.
+  val LOCALIZED_CONF_DIR = "__spark_conf__"
+
+  // Name of the file in the conf archive containing Spark configuration.
+  val SPARK_CONF_FILE = "__spark_conf__.properties"
+
+  // Subdirectory where the user's python files (not archives) will be placed.
+  val LOCALIZED_PYTHON_DIR = "__pyfiles__"
 
   /**
    * Find the user-defined Spark jar if configured, or return the jar containing this
@@ -1033,7 +1110,7 @@ object Client extends Logging {
     if (isAM) {
       addClasspathEntry(
         YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
-          LOCALIZED_HADOOP_CONF_DIR, env)
+          LOCALIZED_CONF_DIR, env)
     }
 
     if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9c7b1b3988082992b2dced7a450392f82e4fb241..35e990602a6cfb8622428fbccaac85696b6ceb94 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var archives: String = null
   var userJar: String = null
   var userClass: String = null
-  var pyFiles: String = null
+  var pyFiles: Seq[String] = Nil
   var primaryPyFile: String = null
   var primaryRFile: String = null
   var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
@@ -228,7 +228,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           args = tail
 
         case ("--py-files") :: value :: tail =>
-          pyFiles = value
+          pyFiles = value.split(",")
           args = tail
 
         case ("--files") :: value :: tail =>
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 99c05329b4d73129f6b9f06a874e4591bc220852..1c8d7ec57635f5ec73a3a54c24e8bb71465fa86d 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -76,7 +76,8 @@ private[spark] class YarnClientSchedulerBackend(
         ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
         ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
         ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
-        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue")
+        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+        ("--py-files", null, "spark.submit.pyFiles")
       )
     // Warn against the following deprecated environment variables: env var -> suggestion
     val deprecatedEnvVars = Map(
@@ -86,7 +87,7 @@ private[spark] class YarnClientSchedulerBackend(
     optionTuples.foreach { case (optionName, envVar, sparkProp) =>
       if (sc.getConf.contains(sparkProp)) {
         extraArgs += (optionName, sc.getConf.get(sparkProp))
-      } else if (System.getenv(envVar) != null) {
+      } else if (envVar != null && System.getenv(envVar) != null) {
         extraArgs += (optionName, System.getenv(envVar))
         if (deprecatedEnvVars.contains(envVar)) {
           logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.")
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 01d33c9ce929709d8c37c02d26536c0158846932..4ec976aa313877c38fb95e814b366ec369a552ca 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -113,7 +113,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
         Environment.PWD.$()
       }
     cp should contain(pwdVar)
-    cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
+    cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
     cp should not contain (Client.SPARK_JAR)
     cp should not contain (Client.APP_JAR)
   }
@@ -129,7 +129,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
 
     val tempDir = Utils.createTempDir()
     try {
-      client.prepareLocalResources(tempDir.getAbsolutePath())
+      client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
       sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
 
       // The non-local path should be propagated by name only, since it will end up in the app's
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 93d587d0cb36a37959ac63741994617ce305dd38..a0f25ba450068ac34e5e8a41c76291184add8e14 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -56,6 +56,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
     """.stripMargin
 
   private val TEST_PYFILE = """
+    |import mod1, mod2
     |import sys
     |from operator import add
     |
@@ -67,7 +68,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
     |    sc = SparkContext(conf=SparkConf())
     |    status = open(sys.argv[1],'w')
     |    result = "failure"
-    |    rdd = sc.parallelize(range(10))
+    |    rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func())
     |    cnt = rdd.count()
     |    if cnt == 10:
     |        result = "success"
@@ -76,6 +77,11 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
     |    sc.stop()
     """.stripMargin
 
+  private val TEST_PYMODULE = """
+    |def func():
+    |    return 42
+    """.stripMargin
+
   private var yarnCluster: MiniYARNCluster = _
   private var tempDir: File = _
   private var fakeSparkJar: File = _
@@ -124,7 +130,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
     logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
 
     fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
-    hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+    hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
     assert(hadoopConfDir.mkdir())
     File.createTempFile("token", ".txt", hadoopConfDir)
   }
@@ -151,26 +157,12 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
     }
   }
 
-  // Enable this once fix SPARK-6700
-  test("run Python application in yarn-cluster mode") {
-    val primaryPyFile = new File(tempDir, "test.py")
-    Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
-    val pyFile = new File(tempDir, "test2.py")
-    Files.write(TEST_PYFILE, pyFile, UTF_8)
-    var result = File.createTempFile("result", null, tempDir)
+  test("run Python application in yarn-client mode") {
+    testPySpark(true)
+  }
 
-    // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
-    // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
-    val sparkHome = sys.props("spark.test.home")
-    val extraConf = Map(
-      "spark.executorEnv.SPARK_HOME" -> sparkHome,
-      "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
-
-    runSpark(false, primaryPyFile.getAbsolutePath(),
-      sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
-      appArgs = Seq(result.getAbsolutePath()),
-      extraConf = extraConf)
-    checkResult(result)
+  test("run Python application in yarn-cluster mode") {
+    testPySpark(false)
   }
 
   test("user class path first in client mode") {
@@ -188,6 +180,33 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
     checkResult(result)
   }
 
+  private def testPySpark(clientMode: Boolean): Unit = {
+    val primaryPyFile = new File(tempDir, "test.py")
+    Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
+
+    val moduleDir =
+      if (clientMode) {
+        // In client-mode, .py files added with --py-files are not visible in the driver.
+        // This is something that the launcher library would have to handle.
+        tempDir
+      } else {
+        val subdir = new File(tempDir, "pyModules")
+        subdir.mkdir()
+        subdir
+      }
+    val pyModule = new File(moduleDir, "mod1.py")
+    Files.write(TEST_PYMODULE, pyModule, UTF_8)
+
+    val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
+    val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
+    val result = File.createTempFile("result", null, tempDir)
+
+    runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+      sparkArgs = Seq("--py-files", pyFiles),
+      appArgs = Seq(result.getAbsolutePath()))
+    checkResult(result)
+  }
+
   private def testUseClassPathFirst(clientMode: Boolean): Unit = {
     // Create a jar file that contains a different version of "test.resource".
     val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)