From 9f273c5173c05017c3009faaf3e10f2f70a842d0 Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Mon, 28 Nov 2016 07:04:38 -0800
Subject: [PATCH] [SPARK-17783][SQL] Hide Credentials in CREATE and DESC
 FORMATTED/EXTENDED a PERSISTENT/TEMP Table for JDBC

### What changes were proposed in this pull request?

We should never expose the Credentials in the EXPLAIN and DESC FORMATTED/EXTENDED command. However, below commands exposed the credentials.

In the related PR: https://github.com/apache/spark/pull/10452

> URL patterns to specify credential seems to be vary between different databases.

Thus, we hide the whole `url` value if it contains the keyword `password`. We also hide the `password` property.

Before the fix, the command outputs look like:

``` SQL
CREATE TABLE tab1
USING org.apache.spark.sql.jdbc
OPTIONS (
 url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass',
 dbtable 'TEST.PEOPLE',
 user 'testUser',
 password '$password')

DESC FORMATTED tab1
DESC EXTENDED tab1
```

Before the fix,
- The output of SQL statement EXPLAIN
```
== Physical Plan ==
ExecutedCommand
   +- CreateDataSourceTableCommand CatalogTable(
	Table: `tab1`
	Created: Wed Nov 16 23:00:10 PST 2016
	Last Access: Wed Dec 31 15:59:59 PST 1969
	Type: MANAGED
	Provider: org.apache.spark.sql.jdbc
	Storage(Properties: [url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, user=testUser, password=testPass])), false
```

- The output of `DESC FORMATTED`
```
...
|Storage Desc Parameters:    |                                                                  |       |
|  url                       |jdbc:h2:mem:testdb0;user=testUser;password=testPass               |       |
|  dbtable                   |TEST.PEOPLE                                                       |       |
|  user                      |testUser                                                          |       |
|  password                  |testPass                                                          |       |
+----------------------------+------------------------------------------------------------------+-------+
```

- The output of `DESC EXTENDED`
```
|# Detailed Table Information|CatalogTable(
	Table: `default`.`tab1`
	Created: Wed Nov 16 23:00:10 PST 2016
	Last Access: Wed Dec 31 15:59:59 PST 1969
	Type: MANAGED
	Schema: [StructField(NAME,StringType,false), StructField(THEID,IntegerType,false)]
	Provider: org.apache.spark.sql.jdbc
	Storage(Location: file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: [url=jdbc:h2:mem:testdb0;user=testUser;password=testPass, dbtable=TEST.PEOPLE, user=testUser, password=testPass]))|       |
```

After the fix,
- The output of SQL statement EXPLAIN
```
== Physical Plan ==
ExecutedCommand
   +- CreateDataSourceTableCommand CatalogTable(
	Table: `tab1`
	Created: Wed Nov 16 22:43:49 PST 2016
	Last Access: Wed Dec 31 15:59:59 PST 1969
	Type: MANAGED
	Provider: org.apache.spark.sql.jdbc
	Storage(Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, password=###])), false
```
- The output of `DESC FORMATTED`
```
...
|Storage Desc Parameters:    |                                                                  |       |
|  url                       |###                                                               |       |
|  dbtable                   |TEST.PEOPLE                                                       |       |
|  user                      |testUser                                                          |       |
|  password                  |###                                                               |       |
+----------------------------+------------------------------------------------------------------+-------+
```

- The output of `DESC EXTENDED`
```
|# Detailed Table Information|CatalogTable(
	Table: `default`.`tab1`
	Created: Wed Nov 16 22:43:49 PST 2016
	Last Access: Wed Dec 31 15:59:59 PST 1969
	Type: MANAGED
	Schema: [StructField(NAME,StringType,false), StructField(THEID,IntegerType,false)]
	Provider: org.apache.spark.sql.jdbc
	Storage(Location: file:/Users/xiaoli/IdeaProjects/sparkDelivery/spark-warehouse/tab1, Properties: [url=###, dbtable=TEST.PEOPLE, user=testUser, password=###]))|       |
```

### How was this patch tested?

Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15358 from gatorsmile/maskCredentials.
---
 .../catalog/ExternalCatalogUtils.scala        | 15 +++++++++
 .../sql/catalyst/catalog/interface.scala      | 10 +++---
 .../spark/sql/execution/command/tables.scala  |  3 +-
 .../spark/sql/execution/datasources/ddl.scala | 10 +++++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 32 +++++++++++++++++++
 5 files changed, 62 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index b1442eec16..817c1ab688 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -119,3 +119,18 @@ object ExternalCatalogUtils {
     }
   }
 }
+
+object CatalogUtils {
+  /**
+   * Masking credentials in the option lists. For example, in the sql plan explain output
+   * for JDBC data sources.
+   */
+  def maskCredentials(options: Map[String, String]): Map[String, String] = {
+    options.map {
+      case (key, _) if key.toLowerCase == "password" => (key, "###")
+      case (key, value) if key.toLowerCase == "url" && value.toLowerCase.contains("password") =>
+        (key, "###")
+      case o => o
+    }
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 93c70de18a..d8bc86727e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -52,12 +52,10 @@ case class CatalogStorageFormat(
     properties: Map[String, String]) {
 
   override def toString: String = {
-    val serdePropsToString =
-      if (properties.nonEmpty) {
-        s"Properties: " + properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
-      } else {
-        ""
-      }
+    val serdePropsToString = CatalogUtils.maskCredentials(properties) match {
+      case props if props.isEmpty => ""
+      case props => "Properties: " + props.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
+    }
     val output =
       Seq(locationUri.map("Location: " + _).getOrElse(""),
         inputFormat.map("InputFormat: " + _).getOrElse(""),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 7049e53a78..ca4d20a99c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -503,7 +503,8 @@ case class DescribeTableCommand(
     describeBucketingInfo(metadata, buffer)
 
     append(buffer, "Storage Desc Parameters:", "", "")
-    metadata.storage.properties.foreach { case (key, value) =>
+    val maskedProperties = CatalogUtils.maskCredentials(metadata.storage.properties)
+    maskedProperties.foreach { case (key, value) =>
       append(buffer, s"  $key", value, "")
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fa8dfa9640..695ba1234d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.execution.command.RunnableCommand
@@ -56,6 +56,14 @@ case class CreateTempViewUsing(
       s"Temporary view '$tableIdent' should not have specified a database")
   }
 
+  override def argString: String = {
+    s"[tableIdent:$tableIdent " +
+      userSpecifiedSchema.map(_ + " ").getOrElse("") +
+      s"replace:$replace " +
+      s"provider:$provider " +
+      CatalogUtils.maskCredentials(options)
+  }
+
   def run(sparkSession: SparkSession): Seq[Row] = {
     val dataSource = DataSource(
       sparkSession,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index f921939ada..b16be457ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -734,6 +734,38 @@ class JDBCSuite extends SparkFunSuite
     }
   }
 
+  test("hide credentials in create and describe a persistent/temp table") {
+    val password = "testPass"
+    val tableName = "tab1"
+    Seq("TABLE", "TEMPORARY VIEW").foreach { tableType =>
+      withTable(tableName) {
+        val df = sql(
+          s"""
+             |CREATE $tableType $tableName
+             |USING org.apache.spark.sql.jdbc
+             |OPTIONS (
+             | url '$urlWithUserAndPass',
+             | dbtable 'TEST.PEOPLE',
+             | user 'testUser',
+             | password '$password')
+           """.stripMargin)
+
+        val explain = ExplainCommand(df.queryExecution.logical, extended = true)
+        spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach { r =>
+          assert(!r.toString.contains(password))
+        }
+
+        sql(s"DESC FORMATTED $tableName").collect().foreach { r =>
+          assert(!r.toString().contains(password))
+        }
+
+        sql(s"DESC EXTENDED $tableName").collect().foreach { r =>
+          assert(!r.toString().contains(password))
+        }
+      }
+    }
+  }
+
   test("SPARK 12941: The data type mapping for StringType to Oracle") {
     val oracleDialect = JdbcDialects.get("jdbc:oracle://127.0.0.1/db")
     assert(oracleDialect.getJDBCType(StringType).
-- 
GitLab