diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index abde04062c4b1232fd868aecddd17a172c5030d4..0ea14361b2f775bf0db7c5c2826900b541cfe50d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -273,6 +273,25 @@ object SparkSubmit extends CommandLineUtils { } } + // Fail fast, the following modes are not supported or applicable + (clusterManager, deployMode) match { + case (STANDALONE, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python " + + "applications on standalone clusters.") + case (STANDALONE, CLUSTER) if args.isR => + printErrorAndExit("Cluster deploy mode is currently not supported for R " + + "applications on standalone clusters.") + case (LOCAL, CLUSTER) => + printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") + case (_, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case (_, CLUSTER) if isSqlShell(args.mainClass) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") + case (_, CLUSTER) if isThriftServer(args.mainClass) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") + case _ => + } + // Update args.deployMode if it is null. It will be passed down as a Spark property later. (args.deployMode, deployMode) match { case (null, CLIENT) => args.deployMode = "client" @@ -282,36 +301,40 @@ object SparkSubmit extends CommandLineUtils { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val exclusions: Seq[String] = + if (!isMesosCluster) { + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for packages that include Python code + val exclusions: Seq[String] = if (!StringUtils.isBlank(args.packagesExclusions)) { args.packagesExclusions.split(",") } else { Nil } - // Create the IvySettings, either load from file or build defaults - val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => - SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), - Option(args.ivyRepoPath)) - }.getOrElse { - SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) - } + // Create the IvySettings, either load from file or build defaults + val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), + Option(args.ivyRepoPath)) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) + } - val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - ivySettings, exclusions = exclusions) - if (!StringUtils.isBlank(resolvedMavenCoordinates)) { - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython) { - args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, + ivySettings, exclusions = exclusions) + + + if (!StringUtils.isBlank(resolvedMavenCoordinates)) { + args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + if (args.isPython) { + args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + } } - } - // install any R packages that may have been passed through --jars or --packages. - // Spark Packages may contain R source code inside the jar. - if (args.isR && !StringUtils.isBlank(args.jars)) { - RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. + if (args.isR && !StringUtils.isBlank(args.jars)) { + RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + } } val hadoopConf = new HadoopConfiguration() @@ -343,24 +366,6 @@ object SparkSubmit extends CommandLineUtils { }.orNull } - // The following modes are not supported or applicable - (clusterManager, deployMode) match { - case (STANDALONE, CLUSTER) if args.isPython => - printErrorAndExit("Cluster deploy mode is currently not supported for python " + - "applications on standalone clusters.") - case (STANDALONE, CLUSTER) if args.isR => - printErrorAndExit("Cluster deploy mode is currently not supported for R " + - "applications on standalone clusters.") - case (LOCAL, CLUSTER) => - printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") - case (_, CLUSTER) if isShell(args.primaryResource) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") - case (_, CLUSTER) if isSqlShell(args.mainClass) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") - case (_, CLUSTER) if isThriftServer(args.mainClass) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") - case _ => - } // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { @@ -468,6 +473,12 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), + // Mesos only - propagate attributes for dependency resolution at the driver side + OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"), + OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"), + OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"), + // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,