Skip to content
Snippets Groups Projects
Commit 004e29cb authored by Andrew Duffy's avatar Andrew Duffy Committed by Marcelo Vanzin
Browse files

[SPARK-14702] Make environment of SparkLauncher launched process more configurable

## What changes were proposed in this pull request?

Adds a few public methods to `SparkLauncher` to allow configuring some extra features of the `ProcessBuilder`, including the working directory, output and error stream redirection.

## How was this patch tested?

Unit testing + simple Spark driver programs

Author: Andrew Duffy <root@aduffy.org>

Closes #14201 from andreweduffy/feature/launcher.
parent 2ae7b88a
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,10 +41,15 @@ public class SparkLauncherSuite {
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
private SparkLauncher launcher;
@Before
public void configureLauncher() {
launcher = new SparkLauncher().setSparkHome(System.getProperty("spark.test.home"));
}
@Test
public void testSparkArgumentHandling() throws Exception {
SparkLauncher launcher = new SparkLauncher()
.setSparkHome(System.getProperty("spark.test.home"));
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
launcher.addSparkArg(opts.HELP);
......@@ -85,14 +91,67 @@ public class SparkLauncherSuite {
assertEquals("bar", launcher.builder.conf.get("spark.foo"));
}
@Test(expected=IllegalStateException.class)
public void testRedirectTwiceFails() throws Exception {
launcher.setAppResource("fake-resource.jar")
.setMainClass("my.fake.class.Fake")
.redirectError()
.redirectError(ProcessBuilder.Redirect.PIPE)
.launch();
}
@Test(expected=IllegalStateException.class)
public void testRedirectToLogWithOthersFails() throws Exception {
launcher.setAppResource("fake-resource.jar")
.setMainClass("my.fake.class.Fake")
.redirectToLog("fakeLog")
.redirectError(ProcessBuilder.Redirect.PIPE)
.launch();
}
@Test
public void testRedirectErrorToOutput() throws Exception {
launcher.redirectError();
assertTrue(launcher.redirectErrorStream);
}
@Test
public void testRedirectsSimple() throws Exception {
launcher.redirectError(ProcessBuilder.Redirect.PIPE);
assertNotNull(launcher.errorStream);
assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE);
launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
assertNotNull(launcher.outputStream);
assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE);
}
@Test
public void testRedirectLastWins() throws Exception {
launcher.redirectError(ProcessBuilder.Redirect.PIPE)
.redirectError(ProcessBuilder.Redirect.INHERIT);
assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
.redirectOutput(ProcessBuilder.Redirect.INHERIT);
assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
}
@Test
public void testRedirectToLog() throws Exception {
launcher.redirectToLog("fakeLogger");
assertTrue(launcher.redirectToLog);
assertTrue(launcher.builder.getEffectiveConfig()
.containsKey(SparkLauncher.CHILD_PROCESS_LOGGER_NAME));
}
@Test
public void testChildProcLauncher() throws Exception {
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
Map<String, String> env = new HashMap<>();
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
SparkLauncher launcher = new SparkLauncher(env)
.setSparkHome(System.getProperty("spark.test.home"))
launcher
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.addSparkArg(opts.CONF,
......
......@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -31,8 +30,6 @@ import java.util.logging.Logger;
class ChildProcAppHandle implements SparkAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final ThreadFactory REDIRECTOR_FACTORY =
new NamedThreadFactory("launcher-proc-%d");
private final String secret;
private final LauncherServer server;
......@@ -127,7 +124,7 @@ class ChildProcAppHandle implements SparkAppHandle {
void setChildProc(Process childProc, String loggerName) {
this.childProc = childProc;
this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
REDIRECTOR_FACTORY);
SparkLauncher.REDIRECTOR_FACTORY);
}
void setConnection(LauncherConnection connection) {
......
......@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
......@@ -82,6 +83,9 @@ public class SparkLauncher {
/** Used internally to create unique logger names. */
private static final AtomicInteger COUNTER = new AtomicInteger();
/** Factory for creating OutputRedirector threads. **/
static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
static final Map<String, String> launcherConfig = new HashMap<>();
/**
......@@ -99,6 +103,11 @@ public class SparkLauncher {
// Visible for testing.
final SparkSubmitCommandBuilder builder;
File workingDir;
boolean redirectToLog;
boolean redirectErrorStream;
ProcessBuilder.Redirect errorStream;
ProcessBuilder.Redirect outputStream;
public SparkLauncher() {
this(null);
......@@ -358,6 +367,83 @@ public class SparkLauncher {
return this;
}
/**
* Sets the working directory of spark-submit.
*
* @param dir The directory to set as spark-submit's working directory.
* @return This launcher.
*/
public SparkLauncher directory(File dir) {
workingDir = dir;
return this;
}
/**
* Specifies that stderr in spark-submit should be redirected to stdout.
*
* @return This launcher.
*/
public SparkLauncher redirectError() {
redirectErrorStream = true;
return this;
}
/**
* Redirects error output to the specified Redirect.
*
* @param to The method of redirection.
* @return This launcher.
*/
public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
errorStream = to;
return this;
}
/**
* Redirects standard output to the specified Redirect.
*
* @param to The method of redirection.
* @return This launcher.
*/
public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
outputStream = to;
return this;
}
/**
* Redirects error output to the specified File.
*
* @param errFile The file to which stderr is written.
* @return This launcher.
*/
public SparkLauncher redirectError(File errFile) {
errorStream = ProcessBuilder.Redirect.to(errFile);
return this;
}
/**
* Redirects error output to the specified File.
*
* @param outFile The file to which stdout is written.
* @return This launcher.
*/
public SparkLauncher redirectOutput(File outFile) {
outputStream = ProcessBuilder.Redirect.to(outFile);
return this;
}
/**
* Sets all output to be logged and redirected to a logger with the specified name.
*
* @param loggerName The name of the logger to log stdout and stderr.
* @return This launcher.
*/
public SparkLauncher redirectToLog(String loggerName) {
setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
redirectToLog = true;
return this;
}
/**
* Launches a sub-process that will start the configured Spark application.
* <p>
......@@ -367,7 +453,12 @@ public class SparkLauncher {
* @return A process handle for the Spark app.
*/
public Process launch() throws IOException {
return createBuilder().start();
Process childProc = createBuilder().start();
if (redirectToLog) {
String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
}
return childProc;
}
/**
......@@ -383,12 +474,13 @@ public class SparkLauncher {
* a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
* <p>
* Currently, all applications are launched as child processes. The child's stdout and stderr
* are merged and written to a logger (see <code>java.util.logging</code>). The logger's name
* can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If
* that option is not set, the code will try to derive a name from the application's name or
* main class / script file. If those cannot be determined, an internal, unique name will be
* used. In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit
* more easily into the configuration of commonly-used logging systems.
* are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
* has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
* defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
* option is not set, the code will try to derive a name from the application's name or main
* class / script file. If those cannot be determined, an internal, unique name will be used.
* In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
* easily into the configuration of commonly-used logging systems.
*
* @since 1.6.0
* @param listeners Listeners to add to the handle before the app is launched.
......@@ -400,27 +492,33 @@ public class SparkLauncher {
handle.addListener(l);
}
String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
if (appName == null) {
if (builder.appName != null) {
appName = builder.appName;
} else if (builder.mainClass != null) {
int dot = builder.mainClass.lastIndexOf(".");
if (dot >= 0 && dot < builder.mainClass.length() - 1) {
appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
ProcessBuilder pb = createBuilder();
// Only setup stderr + stdout to logger redirection if user has not otherwise configured output
// redirection.
if (loggerName == null) {
String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
if (appName == null) {
if (builder.appName != null) {
appName = builder.appName;
} else if (builder.mainClass != null) {
int dot = builder.mainClass.lastIndexOf(".");
if (dot >= 0 && dot < builder.mainClass.length() - 1) {
appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
} else {
appName = builder.mainClass;
}
} else if (builder.appResource != null) {
appName = new File(builder.appResource).getName();
} else {
appName = builder.mainClass;
appName = String.valueOf(COUNTER.incrementAndGet());
}
} else if (builder.appResource != null) {
appName = new File(builder.appResource).getName();
} else {
appName = String.valueOf(COUNTER.incrementAndGet());
}
String loggerPrefix = getClass().getPackage().getName();
loggerName = String.format("%s.app.%s", loggerPrefix, appName);
pb.redirectErrorStream(true);
}
String loggerPrefix = getClass().getPackage().getName();
String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
ProcessBuilder pb = createBuilder().redirectErrorStream(true);
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
String.valueOf(LauncherServer.getServerInstance().getPort()));
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
......@@ -455,6 +553,29 @@ public class SparkLauncher {
for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
if (workingDir != null) {
pb.directory(workingDir);
}
// Only one of redirectError and redirectError(...) can be specified.
// Similarly, if redirectToLog is specified, no other redirections should be specified.
checkState(!redirectErrorStream || errorStream == null,
"Cannot specify both redirectError() and redirectError(...) ");
checkState(!redirectToLog ||
(!redirectErrorStream && errorStream == null && outputStream == null),
"Cannot used redirectToLog() in conjunction with other redirection methods.");
if (redirectErrorStream || redirectToLog) {
pb.redirectErrorStream(true);
}
if (errorStream != null) {
pb.redirectError(errorStream);
}
if (outputStream != null) {
pb.redirectOutput(outputStream);
}
return pb;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment