From 0b9cae42426e14060bc6182c037fd715f35a2d23 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Tue, 10 May 2016 10:35:54 -0700
Subject: [PATCH] [SPARK-11249][LAUNCHER] Throw error if app resource is not
 provided.

Without this, the code would build an invalid spark-submit command line,
and a more cryptic error would be presented to the user. Also, expose
a constant that allows users to set a dummy resource in cases where
they don't need an actual resource file; for backwards compatibility,
that uses the same "spark-internal" resource that Spark itself uses.

Tested via unit tests, run-example, spark-shell, and running the
thrift server with mixed spark and hive command line arguments.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #12909 from vanzin/SPARK-11249.
---
 .../org/apache/spark/deploy/SparkSubmit.scala |  9 ++----
 .../spark/launcher/SparkLauncherSuite.java    |  2 +-
 .../spark/launcher/LauncherBackendSuite.scala |  2 +-
 .../apache/spark/launcher/SparkLauncher.java  |  7 +++++
 .../launcher/SparkSubmitCommandBuilder.java   | 29 ++++++++++---------
 .../SparkSubmitCommandBuilderSuite.java       | 11 +++++--
 .../spark/deploy/yarn/YarnClusterSuite.scala  |  2 +-
 7 files changed, 38 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 755c4b6ec1..9075e3eb3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -43,6 +43,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
 import org.apache.spark.{SPARK_VERSION, SparkException, SparkUserAppException}
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.rest._
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
 
 
@@ -75,10 +76,6 @@ object SparkSubmit {
   private val CLUSTER = 2
   private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
 
-  // A special jar name that indicates the class being run is inside of Spark itself, and therefore
-  // no user jar is needed.
-  private val SPARK_INTERNAL = "spark-internal"
-
   // Special primary resource names that represent shells rather than application jars.
   private val SPARK_SHELL = "spark-shell"
   private val PYSPARK_SHELL = "pyspark-shell"
@@ -575,7 +572,7 @@ object SparkSubmit {
         childArgs += ("--primary-r-file", mainFile)
         childArgs += ("--class", "org.apache.spark.deploy.RRunner")
       } else {
-        if (args.primaryResource != SPARK_INTERNAL) {
+        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
           childArgs += ("--jar", args.primaryResource)
         }
         childArgs += ("--class", args.mainClass)
@@ -795,7 +792,7 @@ object SparkSubmit {
   }
 
   private[deploy] def isInternal(res: String): Boolean = {
-    res == SPARK_INTERNAL
+    res == SparkLauncher.NO_RESOURCE
   }
 
   /**
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 3e47bfc274..8ca54b24d8 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -94,7 +94,7 @@ public class SparkLauncherSuite {
     SparkLauncher launcher = new SparkLauncher(env)
       .setSparkHome(System.getProperty("spark.test.home"))
       .setMaster("local")
-      .setAppResource("spark-internal")
+      .setAppResource(SparkLauncher.NO_RESOURCE)
       .addSparkArg(opts.CONF,
         String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
       .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
index 713560d3dd..cac15a1dc4 100644
--- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -48,7 +48,7 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
       .setConf("spark.ui.enabled", "false")
       .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
       .setMaster(master)
-      .setAppResource("spark-internal")
+      .setAppResource(SparkLauncher.NO_RESOURCE)
       .setMainClass(TestApp.getClass.getName().stripSuffix("$"))
       .startApplication()
 
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index a083f05a2a..08873f5811 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -66,6 +66,13 @@ public class SparkLauncher {
   /** Logger name to use when launching a child process. */
   public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
 
+  /**
+   * A special value for the resource that tells Spark to not try to process the app resource as a
+   * file. This is useful when the class being executed is added to the application using other
+   * means - for example, by adding jars using the package download feature.
+   */
+  public static final String NO_RESOURCE = "spark-internal";
+
   /**
    * Maximum time (in ms) to wait for a child process to connect back to the launcher server
    * when using @link{#start()}.
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 6941ca903c..76897c4f75 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -83,9 +83,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
   static {
     specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
     specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
-      "spark-internal");
+      SparkLauncher.NO_RESOURCE);
     specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
-      "spark-internal");
+      SparkLauncher.NO_RESOURCE);
   }
 
   final List<String> sparkArgs;
@@ -112,11 +112,11 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
     List<String> submitArgs = args;
     if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
       this.allowsMixedArguments = true;
-      appResource = PYSPARK_SHELL_RESOURCE;
+      appResource = PYSPARK_SHELL;
       submitArgs = args.subList(1, args.size());
     } else if (args.size() > 0 && args.get(0).equals(SPARKR_SHELL)) {
       this.allowsMixedArguments = true;
-      appResource = SPARKR_SHELL_RESOURCE;
+      appResource = SPARKR_SHELL;
       submitArgs = args.subList(1, args.size());
     } else if (args.size() > 0 && args.get(0).equals(RUN_EXAMPLE)) {
       isExample = true;
@@ -134,9 +134,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
   @Override
   public List<String> buildCommand(Map<String, String> env)
       throws IOException, IllegalArgumentException {
-    if (PYSPARK_SHELL_RESOURCE.equals(appResource) && !printInfo) {
+    if (PYSPARK_SHELL.equals(appResource) && !printInfo) {
       return buildPySparkShellCommand(env);
-    } else if (SPARKR_SHELL_RESOURCE.equals(appResource) && !printInfo) {
+    } else if (SPARKR_SHELL.equals(appResource) && !printInfo) {
       return buildSparkRCommand(env);
     } else {
       return buildSparkSubmitCommand(env);
@@ -147,6 +147,10 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
     List<String> args = new ArrayList<>();
     SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
 
+    if (!allowsMixedArguments) {
+      checkArgument(appResource != null, "Missing application resource.");
+    }
+
     if (verbose) {
       args.add(parser.VERBOSE);
     }
@@ -278,6 +282,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
 
     // When launching the pyspark shell, the spark-submit arguments should be stored in the
     // PYSPARK_SUBMIT_ARGS env variable.
+    appResource = PYSPARK_SHELL_RESOURCE;
     constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
 
     // The executable is the PYSPARK_DRIVER_PYTHON env variable set by the pyspark script,
@@ -301,6 +306,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
     }
     // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS
     // env variable.
+    appResource = SPARKR_SHELL_RESOURCE;
     constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");
 
     // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up.
@@ -435,22 +441,19 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
           className = EXAMPLE_CLASS_PREFIX + className;
         }
         mainClass = className;
-        appResource = "spark-internal";
+        appResource = SparkLauncher.NO_RESOURCE;
         return false;
       } else {
         checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
-        sparkArgs.add(opt);
+        checkState(appResource == null, "Found unrecognized argument but resource is already set.");
+        appResource = opt;
         return false;
       }
     }
 
     @Override
     protected void handleExtraArgs(List<String> extra) {
-      if (isExample) {
-        appArgs.addAll(extra);
-      } else {
-        sparkArgs.addAll(extra);
-      }
+      appArgs.addAll(extra);
     }
 
   }
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index c7e8b2e03a..c16f46a360 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -72,7 +72,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
       parser.CONF,
       "spark.randomOption=foo",
       parser.CONF,
-      SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath");
+      SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath",
+      SparkLauncher.NO_RESOURCE);
     Map<String, String> env = new HashMap<>();
     List<String> cmd = buildCommand(sparkSubmitArgs, env);
 
@@ -109,7 +110,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
     List<String> sparkSubmitArgs = Arrays.asList(
       parser.CLASS + "=org.my.Class",
       parser.MASTER + "=foo",
-      parser.DEPLOY_MODE + "=bar");
+      parser.DEPLOY_MODE + "=bar",
+      SparkLauncher.NO_RESOURCE);
 
     List<String> cmd = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
     assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
@@ -168,6 +170,11 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
     assertEquals("42", cmd.get(cmd.size() - 1));
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingAppResource() {
+    new SparkSubmitCommandBuilder().buildSparkSubmitArgs();
+  }
+
   private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) throws Exception {
     String deployMode = isDriver ? "client" : "cluster";
 
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index b2b4d84f53..7df11ca760 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -147,7 +147,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       .setPropertiesFile(propsFile)
       .setMaster("yarn")
       .setDeployMode("client")
-      .setAppResource("spark-internal")
+      .setAppResource(SparkLauncher.NO_RESOURCE)
       .setMainClass(mainClassName(YarnLauncherTestApp.getClass))
       .startApplication()
 
-- 
GitLab