diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 039c8719e2867fcddb6cdd319caa32bd0e0312ff..53e18c4bcec23ba3ef85509a571fe7feb5ce625c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.util.{RedirectThread, Utils}
 
 /**
- * A main class used by spark-submit to launch Python applications. It executes python as a
+ * A main class used to launch Python applications. It executes python as a
  * subprocess and then has it connect back to the JVM to access system properties, etc.
  */
 object PythonRunner {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c240bcd705d93a4e4755b26e6122f0adf9ce702f..02021be9f93d487af4612024b2d228bca508ad9b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -23,6 +23,8 @@ import java.net.URL
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.executor.ExecutorURLClassLoader
 import org.apache.spark.util.Utils
 
@@ -134,12 +136,27 @@ object SparkSubmit {
       }
     }
 
+    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+
+    // Require all python files to be local, so we can add them to the PYTHONPATH
+    // In YARN cluster mode, python files are distributed as regular files, which can be non-local
+    if (args.isPython && !isYarnCluster) {
+      if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
+        printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
+      }
+      val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
+      if (nonLocalPyFiles.nonEmpty) {
+        printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
+      }
+    }
+
     // The following modes are not supported or applicable
     (clusterManager, deployMode) match {
       case (MESOS, CLUSTER) =>
         printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
-      case (_, CLUSTER) if args.isPython =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
+      case (STANDALONE, CLUSTER) if args.isPython =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for python " +
+          "applications on standalone clusters.")
       case (_, CLUSTER) if isShell(args.primaryResource) =>
         printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
       case (_, CLUSTER) if isSqlShell(args.mainClass) =>
@@ -150,7 +167,7 @@ object SparkSubmit {
     }
 
     // If we're running a python app, set the main class to our specific python runner
-    if (args.isPython) {
+    if (args.isPython && deployMode == CLIENT) {
       if (args.primaryResource == PYSPARK_SHELL) {
         args.mainClass = "py4j.GatewayServer"
         args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -167,6 +184,13 @@ object SparkSubmit {
       }
     }
 
+    // In yarn-cluster mode for a python app, add primary resource and pyFiles to files
+    // that can be distributed with the job
+    if (args.isPython && isYarnCluster) {
+      args.files = mergeFileLists(args.files, args.primaryResource)
+      args.files = mergeFileLists(args.files, args.pyFiles)
+    }
+
     // Special flag to avoid deprecation warnings at the client
     sysProps("SPARK_SUBMIT") = "true"
 
@@ -245,7 +269,6 @@ object SparkSubmit {
     // Add the application jar automatically so the user doesn't have to call sc.addJar
     // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
     // For python files, the primary resource is already distributed as a regular file
-    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
     if (!isYarnCluster && !args.isPython) {
       var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
       if (isUserJar(args.primaryResource)) {
@@ -270,10 +293,22 @@ object SparkSubmit {
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
     if (isYarnCluster) {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
-      if (args.primaryResource != SPARK_INTERNAL) {
-        childArgs += ("--jar", args.primaryResource)
+      if (args.isPython) {
+        val mainPyFile = new Path(args.primaryResource).getName
+        childArgs += ("--primary-py-file", mainPyFile)
+        if (args.pyFiles != null) {
+          // These files will be distributed to each machine's working directory, so strip the
+          // path prefix
+          val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
+          childArgs += ("--py-files", pyFilesNames)
+        }
+        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
+      } else {
+        if (args.primaryResource != SPARK_INTERNAL) {
+          childArgs += ("--jar", args.primaryResource)
+        }
+        childArgs += ("--class", args.mainClass)
       }
-      childArgs += ("--class", args.mainClass)
       if (args.childArgs != null) {
         args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
       }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 81ec08cb6d5010780efbe150b537304b5b0fdbe5..73e921fd83ef2e3225f53afb7b1d12a45965f69d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -179,18 +179,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
       SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
     }
 
-    // Require all python files to be local, so we can add them to the PYTHONPATH
-    if (isPython) {
-      if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
-        SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
-      }
-      val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
-      if (nonLocalPyFiles.nonEmpty) {
-        SparkSubmit.printErrorAndExit(
-          s"Only local additional python files are supported: $nonLocalPyFiles")
-      }
-    }
-
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
       if (!hasHadoopEnv && !Utils.isTesting) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d3e327b2497b7b2c28412c6440825ed82ea56e55..eb328b2b8ac500a63c1f92b7bbc6abf9377f0395 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.SparkException
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -135,7 +135,7 @@ private[spark] class ApplicationMaster(
         .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
 
       // Call this to force generation of secret so it gets populated into the
-      // Hadoop UGI. This has to happen before the startUserClass which does a
+      // Hadoop UGI. This has to happen before the startUserApplication which does a
       // doAs in order for the credentials to be passed on to the executor containers.
       val securityMgr = new SecurityManager(sparkConf)
 
@@ -254,7 +254,7 @@ private[spark] class ApplicationMaster(
 
   private def runDriver(securityMgr: SecurityManager): Unit = {
     addAmIpFilter()
-    userClassThread = startUserClass()
+    userClassThread = startUserApplication()
 
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
@@ -448,9 +448,13 @@ private[spark] class ApplicationMaster(
    *
    * Returns the user thread that was started.
    */
-  private def startUserClass(): Thread = {
-    logInfo("Starting the user JAR in a separate Thread")
+  private def startUserApplication(): Thread = {
+    logInfo("Starting the user application in a separate Thread")
     System.setProperty("spark.executor.instances", args.numExecutors.toString)
+    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+      System.setProperty("spark.submit.pyFiles",
+        PythonRunner.formatPaths(args.pyFiles).mkString(","))
+    }
     val mainMethod = Class.forName(args.userClass, false,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index d76a63276d752d7615fe43d2cecb1bb7b97b3536..e1a992af3aae792c6bba088542ac4343ef767378 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,6 +24,8 @@ import collection.mutable.ArrayBuffer
 class ApplicationMasterArguments(val args: Array[String]) {
   var userJar: String = null
   var userClass: String = null
+  var primaryPyFile: String = null
+  var pyFiles: String = null
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
   var executorCores = 1
@@ -48,6 +50,14 @@ class ApplicationMasterArguments(val args: Array[String]) {
           userClass = value
           args = tail
 
+        case ("--primary-py-file") :: value :: tail =>
+          primaryPyFile = value
+          args = tail
+
+        case ("--py-files") :: value :: tail =>
+          pyFiles = value
+          args = tail
+
         case ("--args" | "--arg") :: value :: tail =>
           userArgsBuffer += value
           args = tail
@@ -81,6 +91,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
       |Options:
       |  --jar JAR_PATH       Path to your application's JAR file
       |  --class CLASS_NAME   Name of your application's main class
+      |  --primary-py-file    A main Python file
+      |  --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
+      |                       place on the PYTHONPATH for Python apps.
       |  --args ARGS          Arguments to be passed to your application's main class.
       |                       Multiple invocations are possible, each will be passed in order.
       |  --num-executors NUM    Number of executors to start (Default: 2)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1a18e6509ef263833ceba23c3330d6d47c0811bb..91e8574e94e2fdef820f6898cc36d60f060ad623 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,7 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
 import scala.util.{Try, Success, Failure}
 
 import com.google.common.base.Objects
@@ -477,17 +477,32 @@ private[spark] class Client(
       } else {
         Nil
       }
+    val primaryPyFile =
+      if (args.primaryPyFile != null) {
+        Seq("--primary-py-file", args.primaryPyFile)
+      } else {
+        Nil
+      }
+    val pyFiles =
+      if (args.pyFiles != null) {
+        Seq("--py-files", args.pyFiles)
+      } else {
+        Nil
+      }
     val amClass =
       if (isClusterMode) {
         Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
       } else {
         Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
       }
+    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+      args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
+    }
     val userArgs = args.userArgs.flatMap { arg =>
       Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
     }
     val amArgs =
-      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++
         Seq(
           "--executor-memory", args.executorMemory.toString + "m",
           "--executor-cores", args.executorCores.toString,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 5eb2023802dfc1cd4e612e16a40ef7ab88953eb4..3bc7eb1abf34134f476ea1de72ac3bf204ab2b5c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var archives: String = null
   var userJar: String = null
   var userClass: String = null
-  var userArgs: Seq[String] = Seq[String]()
+  var pyFiles: String = null
+  var primaryPyFile: String = null
+  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
   var executorMemory = 1024 // MB
   var executorCores = 1
   var numExecutors = DEFAULT_NUMBER_EXECUTORS
@@ -132,7 +134,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   }
 
   private def parseArgs(inputArgs: List[String]): Unit = {
-    val userArgsBuffer = new ArrayBuffer[String]()
     var args = inputArgs
 
     while (!args.isEmpty) {
@@ -145,11 +146,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           userClass = value
           args = tail
 
+        case ("--primary-py-file") :: value :: tail =>
+          primaryPyFile = value
+          args = tail
+
         case ("--args" | "--arg") :: value :: tail =>
           if (args(0) == "--args") {
             println("--args is deprecated. Use --arg instead.")
           }
-          userArgsBuffer += value
+          userArgs += value
           args = tail
 
         case ("--master-class" | "--am-class") :: value :: tail =>
@@ -205,6 +210,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           addJars = value
           args = tail
 
+        case ("--py-files") :: value :: tail =>
+          pyFiles = value
+          args = tail
+
         case ("--files") :: value :: tail =>
           files = value
           args = tail
@@ -219,8 +228,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           throw new IllegalArgumentException(getUsageMessage(args))
       }
     }
-
-    userArgs = userArgsBuffer.readOnly
   }
 
   private def getUsageMessage(unknownParam: List[String] = null): String = {
@@ -232,6 +239,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       |  --jar JAR_PATH           Path to your application's JAR file (required in yarn-cluster
       |                           mode)
       |  --class CLASS_NAME       Name of your application's main class (required)
+      |  --primary-py-file        A main Python file
       |  --arg ARG                Argument to be passed to your application's main class.
       |                           Multiple invocations are possible, each will be passed in order.
       |  --num-executors NUM      Number of executors to start (Default: 2)
@@ -244,6 +252,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       |                           'default')
       |  --addJars jars           Comma separated list of local jars that want SparkContext.addJar
       |                           to work with.
+      |  --py-files PY_FILES      Comma-separated list of .zip, .egg, or .py files to
+      |                           place on the PYTHONPATH for Python apps.
       |  --files files            Comma separated list of files to be distributed with the job.
       |  --archives archives      Comma separated list of archives to be distributed with the job.
       """.stripMargin
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d79b85e867fcd5b02423944008af0e9b3a5bbde6..7165918e1bfcf1862c0a32f15983ad9dedb18739 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -45,6 +45,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
     |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
     """.stripMargin
 
+  private val TEST_PYFILE = """
+    |import sys
+    |from operator import add
+    |
+    |from pyspark import SparkConf , SparkContext
+    |if __name__ == "__main__":
+    |    if len(sys.argv) != 3:
+    |        print >> sys.stderr, "Usage: test.py [master] [result file]"
+    |        exit(-1)
+    |    conf = SparkConf()
+    |    conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
+    |    sc = SparkContext(conf=conf)
+    |    status = open(sys.argv[2],'w')
+    |    result = "failure"
+    |    rdd = sc.parallelize(range(10))
+    |    cnt = rdd.count()
+    |    if cnt == 10:
+    |        result = "success"
+    |    status.write(result)
+    |    status.close()
+    |    sc.stop()
+    """.stripMargin
+
   private var yarnCluster: MiniYARNCluster = _
   private var tempDir: File = _
   private var fakeSparkJar: File = _
@@ -98,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
     }
 
     fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+    val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+    sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" ->  sparkHome)
+    sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
     sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
     sys.props += ("spark.executor.instances" -> "1")
     sys.props += ("spark.driver.extraClassPath" -> childClasspath)
@@ -146,6 +172,24 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
     assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
   }
 
+  test("run Python application in yarn-cluster mode") {
+    val primaryPyFile = new File(tempDir, "test.py")
+    Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+    val pyFile = new File(tempDir, "test2.py")
+    Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+    var result = File.createTempFile("result", null, tempDir)
+
+    val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
+      "--primary-py-file", primaryPyFile.getAbsolutePath(),
+      "--py-files", pyFile.getAbsolutePath(),
+      "--arg", "yarn-cluster",
+      "--arg", result.getAbsolutePath(),
+      "--name", "python test in yarn-cluster mode",
+      "--num-executors", "1")
+    Client.main(args)
+    checkResult(result)
+  }
+
   /**
    * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
    * any sort of error when the job process finishes successfully, but the job itself fails. So