Skip to content
Snippets Groups Projects
Commit 5a5bbc29 authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-9074] [LAUNCHER] Allow arbitrary Spark args to be set.

This change allows any Spark argument to be added to the app to
be started using SparkLauncher. Known arguments are properly
validated, while unknown arguments are allowed so that the
library can launch newer Spark versions (in case SPARK_HOME points
at one).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #7975 from vanzin/SPARK-9074 and squashes the following commits:

b5e451a [Marcelo Vanzin] [SPARK-9074] [launcher] Allow arbitrary Spark args to be set.
parent 736af95b
No related branches found
No related tags found
No related merge requests found
......@@ -20,12 +20,13 @@ package org.apache.spark.launcher;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
/**
/**
* Launcher for Spark applications.
* <p>
* Use this class to start Spark applications programmatically. The class uses a builder pattern
......@@ -57,7 +58,8 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";
private final SparkSubmitCommandBuilder builder;
// Visible for testing.
final SparkSubmitCommandBuilder builder;
public SparkLauncher() {
this(null);
......@@ -187,6 +189,73 @@ public class SparkLauncher {
return this;
}
/**
* Adds a no-value argument to the Spark invocation. If the argument is known, this method
* validates whether the argument is indeed a no-value argument, and throws an exception
* otherwise.
* <p/>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @param arg Argument to add.
* @return This launcher.
*/
public SparkLauncher addSparkArg(String arg) {
SparkSubmitOptionParser validator = new ArgumentValidator(false);
validator.parse(Arrays.asList(arg));
builder.sparkArgs.add(arg);
return this;
}
/**
* Adds an argument with a value to the Spark invocation. If the argument name corresponds to
* a known argument, the code validates that the argument actually expects a value, and throws
* an exception otherwise.
* <p/>
* It is safe to add arguments modified by other methods in this class (such as
* {@link #setMaster(String)} - the last invocation will be the one to take effect.
* <p/>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @param name Name of argument to add.
* @param value Value of the argument.
* @return This launcher.
*/
public SparkLauncher addSparkArg(String name, String value) {
SparkSubmitOptionParser validator = new ArgumentValidator(true);
if (validator.MASTER.equals(name)) {
setMaster(value);
} else if (validator.PROPERTIES_FILE.equals(name)) {
setPropertiesFile(value);
} else if (validator.CONF.equals(name)) {
String[] vals = value.split("=", 2);
setConf(vals[0], vals[1]);
} else if (validator.CLASS.equals(name)) {
setMainClass(value);
} else if (validator.JARS.equals(name)) {
builder.jars.clear();
for (String jar : value.split(",")) {
addJar(jar);
}
} else if (validator.FILES.equals(name)) {
builder.files.clear();
for (String file : value.split(",")) {
addFile(file);
}
} else if (validator.PY_FILES.equals(name)) {
builder.pyFiles.clear();
for (String file : value.split(",")) {
addPyFile(file);
}
} else {
validator.parse(Arrays.asList(name, value));
builder.sparkArgs.add(name);
builder.sparkArgs.add(value);
}
return this;
}
/**
* Adds command line arguments for the application.
*
......@@ -277,4 +346,32 @@ public class SparkLauncher {
return pb.start();
}
private static class ArgumentValidator extends SparkSubmitOptionParser {
private final boolean hasValue;
ArgumentValidator(boolean hasValue) {
this.hasValue = hasValue;
}
@Override
protected boolean handle(String opt, String value) {
if (value == null && hasValue) {
throw new IllegalArgumentException(String.format("'%s' does not expect a value.", opt));
}
return true;
}
@Override
protected boolean handleUnknown(String opt) {
// Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
return true;
}
protected void handleExtraArgs(List<String> extra) {
// No op.
}
};
}
......@@ -76,7 +76,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
"spark-internal");
}
private final List<String> sparkArgs;
final List<String> sparkArgs;
private final boolean printHelp;
/**
......
......@@ -20,6 +20,7 @@ package org.apache.spark.launcher;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
......@@ -35,8 +36,54 @@ public class SparkLauncherSuite {
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
@Test
public void testSparkArgumentHandling() throws Exception {
SparkLauncher launcher = new SparkLauncher()
.setSparkHome(System.getProperty("spark.test.home"));
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
launcher.addSparkArg(opts.HELP);
try {
launcher.addSparkArg(opts.PROXY_USER);
fail("Expected IllegalArgumentException.");
} catch (IllegalArgumentException e) {
// Expected.
}
launcher.addSparkArg(opts.PROXY_USER, "someUser");
try {
launcher.addSparkArg(opts.HELP, "someValue");
fail("Expected IllegalArgumentException.");
} catch (IllegalArgumentException e) {
// Expected.
}
launcher.addSparkArg("--future-argument");
launcher.addSparkArg("--future-argument", "someValue");
launcher.addSparkArg(opts.MASTER, "myMaster");
assertEquals("myMaster", launcher.builder.master);
launcher.addJar("foo");
launcher.addSparkArg(opts.JARS, "bar");
assertEquals(Arrays.asList("bar"), launcher.builder.jars);
launcher.addFile("foo");
launcher.addSparkArg(opts.FILES, "bar");
assertEquals(Arrays.asList("bar"), launcher.builder.files);
launcher.addPyFile("foo");
launcher.addSparkArg(opts.PY_FILES, "bar");
assertEquals(Arrays.asList("bar"), launcher.builder.pyFiles);
launcher.setConf("spark.foo", "foo");
launcher.addSparkArg(opts.CONF, "spark.foo=bar");
assertEquals("bar", launcher.builder.conf.get("spark.foo"));
}
@Test
public void testChildProcLauncher() throws Exception {
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
Map<String, String> env = new HashMap<String, String>();
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
......@@ -44,9 +91,12 @@ public class SparkLauncherSuite {
.setSparkHome(System.getProperty("spark.test.home"))
.setMaster("local")
.setAppResource("spark-internal")
.addSparkArg(opts.CONF,
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-Dfoo=bar -Dtest.name=-testChildProcLauncher")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment