Skip to content
Snippets Groups Projects
Commit 6d8e5fbc authored by Burak Yavuz's avatar Burak Yavuz Committed by Patrick Wendell
Browse files

[SPARK-5979][SPARK-6032] Smaller safer --packages fix

pwendell tdas
This is the safer parts of PR #4754:
 - SPARK-5979: All dependencies with the groupId `org.apache.spark` passed through `--packages`, were being excluded from the dependency tree on the assumption that they would be in the assembly jar. This is not the case, therefore the exclusion rules had to be defined more explicitly.
 - SPARK-6032: Ivy prints a whole lot of logs while retrieving dependencies. These were printed to `System.out`. Moved the logging to `System.err`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #4802 from brkyvz/simple-streaming-fix and squashes the following commits:

e0f38cb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into simple-streaming-fix
bad921c [Burak Yavuz] [SPARK-5979][SPARK-6032] Smaller safer fix
parent dba08d1f
No related branches found
No related tags found
No related merge requests found
......@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
* simplicity for Spark Package users.
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
......@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
md.addDependency(dd)
}
}
/** Add exclusion rules for dependencies already included in the spark-assembly */
private[spark] def addExclusionRules(
ivySettings: IvySettings,
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
md.addExcludeRule(scalaDependencyExcludeRule)
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
// other spark-streaming utility components. Underscore is there to differentiate between
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
components.foreach { comp =>
val sparkArtifacts =
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
md.addExcludeRule(sparkDependencyExcludeRule)
}
}
/** A nice function to use in tests as well. Values are dummy strings. */
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
......@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
if (coordinates == null || coordinates.trim.isEmpty) {
""
} else {
val sysOut = System.out
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
......@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)
// Add an exclusion rule for Spark and Scala Library
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
md.addExcludeRule(scalaDependencyExcludeRule)
// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
// resolve dependencies
......@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
System.setOut(sysOut)
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
......
......@@ -117,8 +117,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
}
test("neglects Spark and Spark's dependencies") {
val path = SparkSubmitUtils.resolveMavenCoordinates(
"org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
val coordinates =
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"
val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
assert(path === "", "should return empty path")
// Should not exclude the following dependency. Will throw an error, because it doesn't exist,
// but the fact that it is checking means that it wasn't excluded.
intercept[RuntimeException] {
SparkSubmitUtils.resolveMavenCoordinates(coordinates +
",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment