From 66636ef0b046e5d1f340c3b8153d7213fa9d19c7 Mon Sep 17 00:00:00 2001
From: Mark Grover <mark@apache.org>
Date: Wed, 26 Apr 2017 17:06:21 -0700
Subject: [PATCH] [SPARK-20435][CORE] More thorough redaction of sensitive
 information

This change does a more thorough redaction of sensitive information from logs and UI
Add unit tests that ensure that no regressions happen that leak sensitive information to the logs.

The motivation for this change was appearance of password like so in `SparkListenerEnvironmentUpdate` in event logs under some JVM configurations:
`"sun.java.command":"org.apache.spark.deploy.SparkSubmit ... --conf spark.executorEnv.HADOOP_CREDSTORE_PASSWORD=secret_password ..."
`
Previously redaction logic was only checking if the key matched the secret regex pattern, it'd redact it's value. That worked for most cases. However, in the above case, the key (sun.java.command) doesn't tell much, so the value needs to be searched. This PR expands the check to check for values as well.

## How was this patch tested?

New unit tests added that ensure that no sensitive information is present in the event logs or the yarn logs. Old unit test in UtilsSuite was modified because the test was asserting that a non-sensitive property's value won't be redacted. However, the non-sensitive value had the literal "secret" in it which was causing it to redact. Simply updating the non-sensitive property's value to another arbitrary value (that didn't have "secret" in it) fixed it.

Author: Mark Grover <mark@apache.org>

Closes #17725 from markgrover/spark-20435.
---
 .../spark/internal/config/package.scala       |  4 +--
 .../scheduler/EventLoggingListener.scala      | 16 ++++++---
 .../scala/org/apache/spark/util/Utils.scala   | 22 +++++++++---
 .../spark/deploy/SparkSubmitSuite.scala       | 34 +++++++++++++++++++
 .../org/apache/spark/util/UtilsSuite.scala    | 10 ++++--
 docs/configuration.md                         |  4 +--
 .../spark/deploy/yarn/YarnClusterSuite.scala  | 32 +++++++++++++----
 7 files changed, 100 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 89aeea4939..2f0a3064be 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -244,8 +244,8 @@ package object config {
     ConfigBuilder("spark.redaction.regex")
       .doc("Regex to decide which Spark configuration properties and environment variables in " +
         "driver and executor environments contain sensitive information. When this regex matches " +
-        "a property, its value is redacted from the environment UI and various logs like YARN " +
-        "and event logs.")
+        "a property key or value, the value is redacted from the environment UI and various logs " +
+        "like YARN and event logs.")
       .regexConf
       .createWithDefault("(?i)secret|password".r)
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index aecb3a980e..a7dbf87915 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -252,11 +252,17 @@ private[spark] class EventLoggingListener(
 
   private[spark] def redactEvent(
       event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
-    // "Spark Properties" entry will always exist because the map is always populated with it.
-    val redactedProps = Utils.redact(sparkConf, event.environmentDetails("Spark Properties"))
-    val redactedEnvironmentDetails = event.environmentDetails +
-      ("Spark Properties" -> redactedProps)
-    SparkListenerEnvironmentUpdate(redactedEnvironmentDetails)
+    // environmentDetails maps a string descriptor to a set of properties
+    // Similar to:
+    // "JVM Information" -> jvmInformation,
+    // "Spark Properties" -> sparkProperties,
+    // ...
+    // where jvmInformation, sparkProperties, etc. are sequence of tuples.
+    // We go through the various  of properties and redact sensitive information from them.
+    val redactedProps = event.environmentDetails.map{ case (name, props) =>
+      name -> Utils.redact(sparkConf, props)
+    }
+    SparkListenerEnvironmentUpdate(redactedProps)
   }
 
 }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 943dde0723..e042badcdd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2606,10 +2606,24 @@ private[spark] object Utils extends Logging {
   }
 
   private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = {
-    kvs.map { kv =>
-      redactionPattern.findFirstIn(kv._1)
-        .map { _ => (kv._1, REDACTION_REPLACEMENT_TEXT) }
-        .getOrElse(kv)
+    // If the sensitive information regex matches with either the key or the value, redact the value
+    // While the original intent was to only redact the value if the key matched with the regex,
+    // we've found that especially in verbose mode, the value of the property may contain sensitive
+    // information like so:
+    // "sun.java.command":"org.apache.spark.deploy.SparkSubmit ... \
+    // --conf spark.executorEnv.HADOOP_CREDSTORE_PASSWORD=secret_password ...
+    //
+    // And, in such cases, simply searching for the sensitive information regex in the key name is
+    // not sufficient. The values themselves have to be searched as well and redacted if matched.
+    // This does mean we may be accounting more false positives - for example, if the value of an
+    // arbitrary property contained the term 'password', we may redact the value from the UI and
+    // logs. In order to work around it, user would have to make the spark.redaction.regex property
+    // more specific.
+    kvs.map { case (key, value) =>
+      redactionPattern.findFirstIn(key)
+        .orElse(redactionPattern.findFirstIn(value))
+        .map { _ => (key, REDACTION_REPLACEMENT_TEXT) }
+        .getOrElse((key, value))
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 7c2ec01a03..a43839a881 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,8 +21,10 @@ import java.io._
 import java.nio.charset.StandardCharsets
 
 import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
 
 import com.google.common.io.ByteStreams
+import org.apache.hadoop.fs.Path
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
@@ -34,6 +36,7 @@ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.TestUtils.JavaSourceFromString
+import org.apache.spark.scheduler.EventLoggingListener
 import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils}
 
 
@@ -404,6 +407,37 @@ class SparkSubmitSuite
     runSparkSubmit(args)
   }
 
+  test("launch simple application with spark-submit with redaction") {
+    val testDir = Utils.createTempDir()
+    testDir.deleteOnExit()
+    val testDirPath = new Path(testDir.getAbsolutePath())
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val fileSystem = Utils.getHadoopFileSystem("/",
+      SparkHadoopUtil.get.newConfiguration(new SparkConf()))
+    try {
+      val args = Seq(
+        "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+        "--name", "testApp",
+        "--master", "local",
+        "--conf", "spark.ui.enabled=false",
+        "--conf", "spark.master.rest.enabled=false",
+        "--conf", "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD=secret_password",
+        "--conf", "spark.eventLog.enabled=true",
+        "--conf", "spark.eventLog.testing=true",
+        "--conf", s"spark.eventLog.dir=${testDirPath.toUri.toString}",
+        "--conf", "spark.hadoop.fs.defaultFS=unsupported://example.com",
+        unusedJar.toString)
+      runSparkSubmit(args)
+      val listStatus = fileSystem.listStatus(testDirPath)
+      val logData = EventLoggingListener.openEventLog(listStatus.last.getPath, fileSystem)
+      Source.fromInputStream(logData).getLines().foreach { line =>
+        assert(!line.contains("secret_password"))
+      }
+    } finally {
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
   test("includes jars passed in through --jars") {
     val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
     val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 8ed09749ff..3339d5b35d 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1010,15 +1010,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
       "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
       "spark.my.password",
       "spark.my.sECreT")
-    secretKeys.foreach { key => sparkConf.set(key, "secret_password") }
+    secretKeys.foreach { key => sparkConf.set(key, "sensitive_value") }
     // Set a non-secret key
-    sparkConf.set("spark.regular.property", "not_a_secret")
+    sparkConf.set("spark.regular.property", "regular_value")
+    // Set a property with a regular key but secret in the value
+    sparkConf.set("spark.sensitive.property", "has_secret_in_value")
 
     // Redact sensitive information
     val redactedConf = Utils.redact(sparkConf, sparkConf.getAll).toMap
 
     // Assert that secret information got redacted while the regular property remained the same
     secretKeys.foreach { key => assert(redactedConf(key) === Utils.REDACTION_REPLACEMENT_TEXT) }
-    assert(redactedConf("spark.regular.property") === "not_a_secret")
+    assert(redactedConf("spark.regular.property") === "regular_value")
+    assert(redactedConf("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT)
+
   }
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 8b53e92ccd..1d8d963016 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -372,8 +372,8 @@ Apart from these, the following properties are also available, and may be useful
   <td>(?i)secret|password</td>
   <td>
     Regex to decide which Spark configuration properties and environment variables in driver and
-    executor environments contain sensitive information. When this regex matches a property, its
-    value is redacted from the environment UI and various logs like YARN and event logs.
+    executor environments contain sensitive information. When this regex matches a property key or
+    value, the value is redacted from the environment UI and various logs like YARN and event logs.
   </td>
 </tr>
 <tr>
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 99fb58a289..59adb7e22d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -24,6 +24,7 @@ import java.util.{HashMap => JHashMap}
 
 import scala.collection.mutable
 import scala.concurrent.duration._
+import scala.io.Source
 import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
@@ -87,24 +88,30 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     testBasicYarnApp(false)
   }
 
-  test("run Spark in yarn-client mode with different configurations") {
+  test("run Spark in yarn-client mode with different configurations, ensuring redaction") {
     testBasicYarnApp(true,
       Map(
         "spark.driver.memory" -> "512m",
         "spark.executor.cores" -> "1",
         "spark.executor.memory" -> "512m",
-        "spark.executor.instances" -> "2"
+        "spark.executor.instances" -> "2",
+        // Sending some senstive information, which we'll make sure gets redacted
+        "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD,
+        "spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD
       ))
   }
 
-  test("run Spark in yarn-cluster mode with different configurations") {
+  test("run Spark in yarn-cluster mode with different configurations, ensuring redaction") {
     testBasicYarnApp(false,
       Map(
         "spark.driver.memory" -> "512m",
         "spark.driver.cores" -> "1",
         "spark.executor.cores" -> "1",
         "spark.executor.memory" -> "512m",
-        "spark.executor.instances" -> "2"
+        "spark.executor.instances" -> "2",
+        // Sending some senstive information, which we'll make sure gets redacted
+        "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD,
+        "spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD
       ))
   }
 
@@ -349,6 +356,7 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc
 private object YarnClusterDriver extends Logging with Matchers {
 
   val WAIT_TIMEOUT_MILLIS = 10000
+  val SECRET_PASSWORD = "secret_password"
 
   def main(args: Array[String]): Unit = {
     if (args.length != 1) {
@@ -395,6 +403,13 @@ private object YarnClusterDriver extends Logging with Matchers {
     assert(executorInfos.nonEmpty)
     executorInfos.foreach { info =>
       assert(info.logUrlMap.nonEmpty)
+      info.logUrlMap.values.foreach { url =>
+        val log = Source.fromURL(url).mkString
+        assert(
+          !log.contains(SECRET_PASSWORD),
+          s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
+        )
+      }
     }
 
     // If we are running in yarn-cluster mode, verify that driver logs links and present and are
@@ -406,8 +421,13 @@ private object YarnClusterDriver extends Logging with Matchers {
       assert(driverLogs.contains("stderr"))
       assert(driverLogs.contains("stdout"))
       val urlStr = driverLogs("stderr")
-      // Ensure that this is a valid URL, else this will throw an exception
-      new URL(urlStr)
+      driverLogs.foreach { kv =>
+        val log = Source.fromURL(kv._2).mkString
+        assert(
+          !log.contains(SECRET_PASSWORD),
+          s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
+        )
+      }
       val containerId = YarnSparkHadoopUtil.get.getContainerId
       val user = Utils.getCurrentUserName()
       assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
-- 
GitLab