diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 012a89a31b046069f9b8c2c9d7c130491af145ab..4c4110812e0a18dc06d30a577335c1c8c32fd4b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -252,6 +252,26 @@ object SparkSubmit { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for packages that include Python code + val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( + args.packages, Option(args.repositories), Option(args.ivyRepoPath)) + if (!resolvedMavenCoordinates.trim.isEmpty) { + if (args.jars == null || args.jars.trim.isEmpty) { + args.jars = resolvedMavenCoordinates + } else { + args.jars += s",$resolvedMavenCoordinates" + } + if (args.isPython) { + if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { + args.pyFiles = resolvedMavenCoordinates + } else { + args.pyFiles += s",$resolvedMavenCoordinates" + } + } + } + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local if (args.isPython && !isYarnCluster) { @@ -307,18 +327,6 @@ object SparkSubmit { // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" - // Resolve maven dependencies if there are any and add classpath to jars - val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( - args.packages, Option(args.repositories), Option(args.ivyRepoPath)) - if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { - args.jars = resolvedMavenCoordinates - } else { - args.jars += s",$resolvedMavenCoordinates" - } - } - // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( @@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils { private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** - * Extracts maven coordinates from a comma-delimited string + * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides + * simplicity for Spark Package users. * @param coordinates Comma-delimited string of maven coordinates * @return Sequence of Maven coordinates */ private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { coordinates.split(",").map { p => - val splits = p.split(":") + val splits = p.replace("/", ":").split(":") require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + s"'groupId:artifactId:version'. The coordinate provided is: $p") require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + @@ -682,6 +692,13 @@ private[spark] object SparkSubmitUtils { br.setName("central") cr.add(br) + val sp: IBiblioResolver = new IBiblioResolver + sp.setM2compatible(true) + sp.setUsepoms(true) + sp.setRoot("http://dl.bintray.com/spark-packages/maven") + sp.setName("spark-packages") + cr.add(sp) + val repositoryList = remoteRepos.getOrElse("") // add any other remote repositories other than maven central if (repositoryList.trim.nonEmpty) { @@ -794,14 +811,19 @@ private[spark] object SparkSubmitUtils { val md = getModuleDescriptor md.setDefaultConf(ivyConfName) - // Add an exclusion rule for Spark + // Add an exclusion rule for Spark and Scala Library val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") val sparkDependencyExcludeRule = new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) sparkDependencyExcludeRule.addConfiguration(ivyConfName) + val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") + val scalaDependencyExcludeRule = + new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) + scalaDependencyExcludeRule.addConfiguration(ivyConfName) // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies md.addExcludeRule(sparkDependencyExcludeRule) + md.addExcludeRule(scalaDependencyExcludeRule) addDependenciesToIvy(md, artifacts, ivyConfName) // resolve dependencies diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 53665350013cd721f23e076c3381753e62350b47..ad62b35f624f64a2f0add0a8690c4e4969b4309a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -57,20 +57,23 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { test("create repo resolvers") { val resolver1 = SparkSubmitUtils.createRepoResolvers(None) - // should have central by default - assert(resolver1.getResolvers.size() === 1) + // should have central and spark-packages by default + assert(resolver1.getResolvers.size() === 2) assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central") + assert(resolver1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "spark-packages") val repos = "a/1,b/2,c/3" val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos)) - assert(resolver2.getResolvers.size() === 4) + assert(resolver2.getResolvers.size() === 5) val expected = repos.split(",").map(r => s"$r/") resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) => if (i == 0) { assert(resolver.getName === "central") + } else if (i == 1) { + assert(resolver.getName === "spark-packages") } else { - assert(resolver.getName === s"repo-$i") - assert(resolver.getRoot === expected(i - 1)) + assert(resolver.getName === s"repo-${i - 1}") + assert(resolver.getRoot === expected(i - 2)) } } } diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 118701549a7591bd231cb8ae15ec401d60ba8a87..4e4af76316863501742d5889939fb6ceb3655bb3 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -173,8 +173,11 @@ in-process. In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `--master` argument, and you can add JARs to the classpath -by passing a comma-separated list to the `--jars` argument. -For example, to run `bin/spark-shell` on exactly four cores, use: +by passing a comma-separated list to the `--jars` argument. You can also add dependencies +(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates +to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) +can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly +four cores, use: {% highlight bash %} $ ./bin/spark-shell --master local[4] @@ -186,6 +189,12 @@ Or, to also add `code.jar` to its classpath, use: $ ./bin/spark-shell --master local[4] --jars code.jar {% endhighlight %} +To include a dependency using maven coordinates: + +{% highlight bash %} +$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" +{% endhighlight %} + For a complete list of options, run `spark-shell --help`. Behind the scenes, `spark-shell` invokes the more general [`spark-submit` script](submitting-applications.html). @@ -196,7 +205,11 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes, In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files -to the runtime path by passing a comma-separated list to `--py-files`. +to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies +(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates +to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) +can be passed to the `--repositories` argument. Any python dependencies a Spark Package has (listed in +the requirements.txt of that package) must be manually installed using pip when necessary. For example, to run `bin/pyspark` on exactly four cores, use: {% highlight bash %} diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 14a87f8436984dd12509951067ddd030088e0806..57b074778f2b0f6fc0fdaa441377dcf205f3b468 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -174,6 +174,11 @@ This can use up a significant amount of space over time and will need to be clea is handled automatically, and with Spark standalone, automatic cleanup can be configured with the `spark.worker.cleanup.appDataTtl` property. +Users may also include any other dependencies by supplying a comma-delimited list of maven coordinates +with `--packages`. All transitive dependencies will be handled when using this command. Additional +repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`. +These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages. + For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f64e25c60740ad124e47289ee047dc91bdcc6942..52e82091c9f819a14a080f2d7f5468275777bda1 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1440,31 +1440,59 @@ class SparkSubmitTests(unittest.TestCase): def tearDown(self): shutil.rmtree(self.programDir) - def createTempFile(self, name, content): + def createTempFile(self, name, content, dir=None): """ Create a temp file with the given name and content and return its path. Strips leading spaces from content up to the first '|' in each line. """ pattern = re.compile(r'^ *\|', re.MULTILINE) content = re.sub(pattern, '', content.strip()) - path = os.path.join(self.programDir, name) + if dir is None: + path = os.path.join(self.programDir, name) + else: + os.makedirs(os.path.join(self.programDir, dir)) + path = os.path.join(self.programDir, dir, name) with open(path, "w") as f: f.write(content) return path - def createFileInZip(self, name, content): + def createFileInZip(self, name, content, ext=".zip", dir=None, zip_name=None): """ Create a zip archive containing a file with the given content and return its path. Strips leading spaces from content up to the first '|' in each line. """ pattern = re.compile(r'^ *\|', re.MULTILINE) content = re.sub(pattern, '', content.strip()) - path = os.path.join(self.programDir, name + ".zip") + if dir is None: + path = os.path.join(self.programDir, name + ext) + else: + path = os.path.join(self.programDir, dir, zip_name + ext) zip = zipfile.ZipFile(path, 'w') zip.writestr(name, content) zip.close() return path + def create_spark_package(self, artifact_name): + group_id, artifact_id, version = artifact_name.split(":") + self.createTempFile("%s-%s.pom" % (artifact_id, version), (""" + |<?xml version="1.0" encoding="UTF-8"?> + |<project xmlns="http://maven.apache.org/POM/4.0.0" + | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + | xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + | http://maven.apache.org/xsd/maven-4.0.0.xsd"> + | <modelVersion>4.0.0</modelVersion> + | <groupId>%s</groupId> + | <artifactId>%s</artifactId> + | <version>%s</version> + |</project> + """ % (group_id, artifact_id, version)).lstrip(), + os.path.join(group_id, artifact_id, version)) + self.createFileInZip("%s.py" % artifact_id, """ + |def myfunc(x): + | return x + 1 + """, ".jar", os.path.join(group_id, artifact_id, version), + "%s-%s" % (artifact_id, version)) + def test_single_script(self): """Submit and test a single script file""" script = self.createTempFile("test.py", """ @@ -1533,6 +1561,39 @@ class SparkSubmitTests(unittest.TestCase): self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) + def test_package_dependency(self): + """Submit and test a script with a dependency on a Spark Package""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + |from mylib import myfunc + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + """) + self.create_spark_package("a:mylib:0.1") + proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", + "file:" + self.programDir, script], stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[2, 3, 4]", out) + + def test_package_dependency_on_cluster(self): + """Submit and test a script with a dependency on a Spark Package on a cluster""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + |from mylib import myfunc + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + """) + self.create_spark_package("a:mylib:0.1") + proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", + "file:" + self.programDir, "--master", + "local-cluster[1,1,512]", script], stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[2, 3, 4]", out) + def test_single_script_on_cluster(self): """Submit and test a single script on a cluster""" script = self.createTempFile("test.py", """