From 004e29cba518684d239d2d1661dce7c894a79f14 Mon Sep 17 00:00:00 2001
From: Andrew Duffy <root@aduffy.org>
Date: Tue, 19 Jul 2016 17:08:38 -0700
Subject: [PATCH] [SPARK-14702] Make environment of SparkLauncher launched
 process more configurable

## What changes were proposed in this pull request?

Adds a few public methods to `SparkLauncher` to allow configuring some extra features of the `ProcessBuilder`, including the working directory, output and error stream redirection.

## How was this patch tested?

Unit testing + simple Spark driver programs

Author: Andrew Duffy <root@aduffy.org>

Closes #14201 from andreweduffy/feature/launcher.
---
 .../spark/launcher/SparkLauncherSuite.java    |  67 ++++++-
 .../spark/launcher/ChildProcAppHandle.java    |   5 +-
 .../apache/spark/launcher/SparkLauncher.java  | 167 +++++++++++++++---
 3 files changed, 208 insertions(+), 31 deletions(-)

diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 8ca54b24d8..e393db06a0 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,10 +41,15 @@ public class SparkLauncherSuite {
   private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
   private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
 
+  private SparkLauncher launcher;
+
+  @Before
+  public void configureLauncher() {
+    launcher = new SparkLauncher().setSparkHome(System.getProperty("spark.test.home"));
+  }
+
   @Test
   public void testSparkArgumentHandling() throws Exception {
-    SparkLauncher launcher = new SparkLauncher()
-      .setSparkHome(System.getProperty("spark.test.home"));
     SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
 
     launcher.addSparkArg(opts.HELP);
@@ -85,14 +91,67 @@ public class SparkLauncherSuite {
     assertEquals("bar", launcher.builder.conf.get("spark.foo"));
   }
 
+  @Test(expected=IllegalStateException.class)
+  public void testRedirectTwiceFails() throws Exception {
+    launcher.setAppResource("fake-resource.jar")
+      .setMainClass("my.fake.class.Fake")
+      .redirectError()
+      .redirectError(ProcessBuilder.Redirect.PIPE)
+      .launch();
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testRedirectToLogWithOthersFails() throws Exception {
+    launcher.setAppResource("fake-resource.jar")
+      .setMainClass("my.fake.class.Fake")
+      .redirectToLog("fakeLog")
+      .redirectError(ProcessBuilder.Redirect.PIPE)
+      .launch();
+  }
+
+  @Test
+  public void testRedirectErrorToOutput() throws Exception {
+    launcher.redirectError();
+    assertTrue(launcher.redirectErrorStream);
+  }
+
+  @Test
+  public void testRedirectsSimple() throws Exception {
+    launcher.redirectError(ProcessBuilder.Redirect.PIPE);
+    assertNotNull(launcher.errorStream);
+    assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE);
+
+    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
+    assertNotNull(launcher.outputStream);
+    assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE);
+  }
+
+  @Test
+  public void testRedirectLastWins() throws Exception {
+    launcher.redirectError(ProcessBuilder.Redirect.PIPE)
+      .redirectError(ProcessBuilder.Redirect.INHERIT);
+    assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
+
+    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
+      .redirectOutput(ProcessBuilder.Redirect.INHERIT);
+    assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
+  }
+
+  @Test
+  public void testRedirectToLog() throws Exception {
+    launcher.redirectToLog("fakeLogger");
+    assertTrue(launcher.redirectToLog);
+    assertTrue(launcher.builder.getEffectiveConfig()
+      .containsKey(SparkLauncher.CHILD_PROCESS_LOGGER_NAME));
+  }
+
   @Test
   public void testChildProcLauncher() throws Exception {
     SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
     Map<String, String> env = new HashMap<>();
     env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
 
-    SparkLauncher launcher = new SparkLauncher(env)
-      .setSparkHome(System.getProperty("spark.test.home"))
+    launcher
       .setMaster("local")
       .setAppResource(SparkLauncher.NO_RESOURCE)
       .addSparkArg(opts.CONF,
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 1bfda289de..c0779e1c4e 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,8 +30,6 @@ import java.util.logging.Logger;
 class ChildProcAppHandle implements SparkAppHandle {
 
   private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
-  private static final ThreadFactory REDIRECTOR_FACTORY =
-    new NamedThreadFactory("launcher-proc-%d");
 
   private final String secret;
   private final LauncherServer server;
@@ -127,7 +124,7 @@ class ChildProcAppHandle implements SparkAppHandle {
   void setChildProc(Process childProc, String loggerName) {
     this.childProc = childProc;
     this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
-      REDIRECTOR_FACTORY);
+      SparkLauncher.REDIRECTOR_FACTORY);
   }
 
   void setConnection(LauncherConnection connection) {
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 08873f5811..41f7f1f3ed 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -82,6 +83,9 @@ public class SparkLauncher {
   /** Used internally to create unique logger names. */
   private static final AtomicInteger COUNTER = new AtomicInteger();
 
+  /** Factory for creating OutputRedirector threads. **/
+  static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
+
   static final Map<String, String> launcherConfig = new HashMap<>();
 
   /**
@@ -99,6 +103,11 @@ public class SparkLauncher {
 
   // Visible for testing.
   final SparkSubmitCommandBuilder builder;
+  File workingDir;
+  boolean redirectToLog;
+  boolean redirectErrorStream;
+  ProcessBuilder.Redirect errorStream;
+  ProcessBuilder.Redirect outputStream;
 
   public SparkLauncher() {
     this(null);
@@ -358,6 +367,83 @@ public class SparkLauncher {
     return this;
   }
 
+  /**
+   * Sets the working directory of spark-submit.
+   *
+   * @param dir The directory to set as spark-submit's working directory.
+   * @return This launcher.
+   */
+  public SparkLauncher directory(File dir) {
+    workingDir = dir;
+    return this;
+  }
+
+  /**
+   * Specifies that stderr in spark-submit should be redirected to stdout.
+   *
+   * @return This launcher.
+   */
+  public SparkLauncher redirectError() {
+    redirectErrorStream = true;
+    return this;
+  }
+
+  /**
+   * Redirects error output to the specified Redirect.
+   *
+   * @param to The method of redirection.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
+    errorStream = to;
+    return this;
+  }
+
+  /**
+   * Redirects standard output to the specified Redirect.
+   *
+   * @param to The method of redirection.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
+    outputStream = to;
+    return this;
+  }
+
+  /**
+   * Redirects error output to the specified File.
+   *
+   * @param errFile The file to which stderr is written.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectError(File errFile) {
+    errorStream = ProcessBuilder.Redirect.to(errFile);
+    return this;
+  }
+
+  /**
+   * Redirects error output to the specified File.
+   *
+   * @param outFile The file to which stdout is written.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectOutput(File outFile) {
+    outputStream = ProcessBuilder.Redirect.to(outFile);
+    return this;
+  }
+
+  /**
+   * Sets all output to be logged and redirected to a logger with the specified name.
+   *
+   * @param loggerName The name of the logger to log stdout and stderr.
+   * @return This launcher.
+   */
+  public SparkLauncher redirectToLog(String loggerName) {
+    setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
+    redirectToLog = true;
+    return this;
+  }
+
   /**
    * Launches a sub-process that will start the configured Spark application.
    * <p>
@@ -367,7 +453,12 @@ public class SparkLauncher {
    * @return A process handle for the Spark app.
    */
   public Process launch() throws IOException {
-    return createBuilder().start();
+    Process childProc = createBuilder().start();
+    if (redirectToLog) {
+      String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+      new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
+    }
+    return childProc;
   }
 
   /**
@@ -383,12 +474,13 @@ public class SparkLauncher {
    * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
    * <p>
    * Currently, all applications are launched as child processes. The child's stdout and stderr
-   * are merged and written to a logger (see <code>java.util.logging</code>). The logger's name
-   * can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If
-   * that option is not set, the code will try to derive a name from the application's name or
-   * main class / script file. If those cannot be determined, an internal, unique name will be
-   * used. In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit
-   * more easily into the configuration of commonly-used logging systems.
+   * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
+   * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
+   * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
+   * option is not set, the code will try to derive a name from the application's name or main
+   * class / script file. If those cannot be determined, an internal, unique name will be used.
+   * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
+   * easily into the configuration of commonly-used logging systems.
    *
    * @since 1.6.0
    * @param listeners Listeners to add to the handle before the app is launched.
@@ -400,27 +492,33 @@ public class SparkLauncher {
       handle.addListener(l);
     }
 
-    String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
-    if (appName == null) {
-      if (builder.appName != null) {
-        appName = builder.appName;
-      } else if (builder.mainClass != null) {
-        int dot = builder.mainClass.lastIndexOf(".");
-        if (dot >= 0 && dot < builder.mainClass.length() - 1) {
-          appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+    String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+    ProcessBuilder pb = createBuilder();
+    // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
+    // redirection.
+    if (loggerName == null) {
+      String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+      if (appName == null) {
+        if (builder.appName != null) {
+          appName = builder.appName;
+        } else if (builder.mainClass != null) {
+          int dot = builder.mainClass.lastIndexOf(".");
+          if (dot >= 0 && dot < builder.mainClass.length() - 1) {
+            appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+          } else {
+            appName = builder.mainClass;
+          }
+        } else if (builder.appResource != null) {
+          appName = new File(builder.appResource).getName();
         } else {
-          appName = builder.mainClass;
+          appName = String.valueOf(COUNTER.incrementAndGet());
         }
-      } else if (builder.appResource != null) {
-        appName = new File(builder.appResource).getName();
-      } else {
-        appName = String.valueOf(COUNTER.incrementAndGet());
       }
+      String loggerPrefix = getClass().getPackage().getName();
+      loggerName = String.format("%s.app.%s", loggerPrefix, appName);
+      pb.redirectErrorStream(true);
     }
 
-    String loggerPrefix = getClass().getPackage().getName();
-    String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
-    ProcessBuilder pb = createBuilder().redirectErrorStream(true);
     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
       String.valueOf(LauncherServer.getServerInstance().getPort()));
     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
@@ -455,6 +553,29 @@ public class SparkLauncher {
     for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
       pb.environment().put(e.getKey(), e.getValue());
     }
+
+    if (workingDir != null) {
+      pb.directory(workingDir);
+    }
+
+    // Only one of redirectError and redirectError(...) can be specified.
+    // Similarly, if redirectToLog is specified, no other redirections should be specified.
+    checkState(!redirectErrorStream || errorStream == null,
+      "Cannot specify both redirectError() and redirectError(...) ");
+    checkState(!redirectToLog ||
+      (!redirectErrorStream && errorStream == null && outputStream == null),
+      "Cannot used redirectToLog() in conjunction with other redirection methods.");
+
+    if (redirectErrorStream || redirectToLog) {
+      pb.redirectErrorStream(true);
+    }
+    if (errorStream != null) {
+      pb.redirectError(errorStream);
+    }
+    if (outputStream != null) {
+      pb.redirectOutput(outputStream);
+    }
+
     return pb;
   }
 
-- 
GitLab