From 5c8a0ec99bded2271481f8d6cf5443fea5da4bbd Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Sat, 23 Apr 2016 12:44:00 -0700
Subject: [PATCH] [SPARK-14872][SQL] Restructure command package

## What changes were proposed in this pull request?
This patch restructures sql.execution.command package to break the commands into multiple files, in some logical organization: databases, tables, views, functions.

I also renamed basicOperators.scala to basicLogicalOperators.scala and basicPhysicalOperators.scala.

## How was this patch tested?
N/A - all I did was moving code around.

Author: Reynold Xin <rxin@databricks.com>

Closes #12636 from rxin/SPARK-14872.
---
 ...tors.scala => basicLogicalOperators.scala} |   0
 ...ors.scala => basicPhysicalOperators.scala} |   0
 .../spark/sql/execution/command/cache.scala   |  70 +++++
 .../sql/execution/command/commands.scala      | 264 +-----------------
 .../sql/execution/command/databases.scala     |  64 +++++
 .../sql/execution/command/functions.scala     |  99 ++++++-
 .../spark/sql/execution/command/tables.scala  |  77 ++++-
 7 files changed, 317 insertions(+), 257 deletions(-)
 rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/{basicOperators.scala => basicLogicalOperators.scala} (100%)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/{basicOperators.scala => basicPhysicalOperators.scala} (100%)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
similarity index 100%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
similarity index 100%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
new file mode 100644
index 0000000000..5be5d0c2b0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Dataset, Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+
+case class CacheTableCommand(
+  tableName: String,
+  plan: Option[LogicalPlan],
+  isLazy: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    plan.foreach { logicalPlan =>
+      sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName)
+    }
+    sqlContext.cacheTable(tableName)
+
+    if (!isLazy) {
+      // Performs eager caching
+      sqlContext.table(tableName).count()
+    }
+
+    Seq.empty[Row]
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+
+case class UncacheTableCommand(tableName: String) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.table(tableName).unpersist(blocking = false)
+    Seq.empty[Row]
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * Clear all cached data from the in-memory cache.
+ */
+case object ClearCacheCommand extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.clearCache()
+    Seq.empty[Row]
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 971770a97b..0fd7fa92a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -78,6 +78,15 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
  *
  * Note that this command takes in a logical plan, runs the optimizer on the logical plan
  * (but do NOT actually execute it).
+ *
+ * {{{
+ *   EXPLAIN (EXTENDED|CODEGEN) SELECT * FROM ...
+ * }}}
+ *
+ * @param logicalPlan plan to explain
+ * @param output output schema
+ * @param extended whether to do extended explain or not
+ * @param codegen whether to output generated code from whole-stage codegen or not
  */
 case class ExplainCommand(
     logicalPlan: LogicalPlan,
@@ -89,7 +98,6 @@ case class ExplainCommand(
 
   // Run through the optimizer to generate the physical plan.
   override def run(sqlContext: SQLContext): Seq[Row] = try {
-    // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
     val queryExecution = sqlContext.executePlan(logicalPlan)
     val outputString =
       if (codegen) {
@@ -104,257 +112,3 @@ case class ExplainCommand(
     ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
   }
 }
-
-
-case class CacheTableCommand(
-    tableName: String,
-    plan: Option[LogicalPlan],
-    isLazy: Boolean)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    plan.foreach { logicalPlan =>
-      sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName)
-    }
-    sqlContext.cacheTable(tableName)
-
-    if (!isLazy) {
-      // Performs eager caching
-      sqlContext.table(tableName).count()
-    }
-
-    Seq.empty[Row]
-  }
-
-  override def output: Seq[Attribute] = Seq.empty
-}
-
-
-case class UncacheTableCommand(tableName: String) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.table(tableName).unpersist(blocking = false)
-    Seq.empty[Row]
-  }
-
-  override def output: Seq[Attribute] = Seq.empty
-}
-
-/**
- * Clear all cached data from the in-memory cache.
- */
-case object ClearCacheCommand extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.clearCache()
-    Seq.empty[Row]
-  }
-
-  override def output: Seq[Attribute] = Seq.empty
-}
-
-
-/**
- * A command for users to get tables in the given database.
- * If a databaseName is not given, the current database will be used.
- * The syntax of using this command in SQL is:
- * {{{
- *   SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'];
- * }}}
- */
-case class ShowTablesCommand(
-    databaseName: Option[String],
-    tableIdentifierPattern: Option[String]) extends RunnableCommand {
-
-  // The result of SHOW TABLES has two columns, tableName and isTemporary.
-  override val output: Seq[Attribute] = {
-    AttributeReference("tableName", StringType, nullable = false)() ::
-      AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
-  }
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    // Since we need to return a Seq of rows, we will call getTables directly
-    // instead of calling tables in sqlContext.
-    val catalog = sqlContext.sessionState.catalog
-    val db = databaseName.getOrElse(catalog.getCurrentDatabase)
-    val tables =
-      tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
-    tables.map { t =>
-      val isTemp = t.database.isEmpty
-      Row(t.table, isTemp)
-    }
-  }
-}
-
-/**
- * A command for users to list the databases/schemas.
- * If a databasePattern is supplied then the databases that only matches the
- * pattern would be listed.
- * The syntax of using this command in SQL is:
- * {{{
- *   SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];
- * }}}
- */
-case class ShowDatabasesCommand(databasePattern: Option[String]) extends RunnableCommand {
-
-  // The result of SHOW DATABASES has one column called 'result'
-  override val output: Seq[Attribute] = {
-    AttributeReference("result", StringType, nullable = false)() :: Nil
-  }
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
-    val databases =
-      databasePattern.map(catalog.listDatabases(_)).getOrElse(catalog.listDatabases())
-    databases.map { d => Row(d) }
-  }
-}
-
-/**
- * A command for users to list the properties for a table If propertyKey is specified, the value
- * for the propertyKey is returned. If propertyKey is not specified, all the keys and their
- * corresponding values are returned.
- * The syntax of using this command in SQL is:
- * {{{
- *   SHOW TBLPROPERTIES table_name[('propertyKey')];
- * }}}
- */
-case class ShowTablePropertiesCommand(
-    table: TableIdentifier,
-    propertyKey: Option[String]) extends RunnableCommand {
-
-  override val output: Seq[Attribute] = {
-    val schema = AttributeReference("value", StringType, nullable = false)() :: Nil
-    propertyKey match {
-      case None => AttributeReference("key", StringType, nullable = false)() :: schema
-      case _ => schema
-    }
-  }
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
-
-    if (catalog.isTemporaryTable(table)) {
-      Seq.empty[Row]
-    } else {
-      val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
-
-      propertyKey match {
-        case Some(p) =>
-          val propValue = catalogTable
-            .properties
-            .getOrElse(p, s"Table ${catalogTable.qualifiedName} does not have property: $p")
-          Seq(Row(propValue))
-        case None =>
-          catalogTable.properties.map(p => Row(p._1, p._2)).toSeq
-      }
-    }
-  }
-}
-
-/**
- * A command for users to list all of the registered functions.
- * The syntax of using this command in SQL is:
- * {{{
- *    SHOW FUNCTIONS [LIKE pattern]
- * }}}
- * For the pattern, '*' matches any sequence of characters (including no characters) and
- * '|' is for alternation.
- * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
- *
- * TODO currently we are simply ignore the db
- */
-case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
-  override val output: Seq[Attribute] = {
-    val schema = StructType(
-      StructField("function", StringType, nullable = false) :: Nil)
-
-    schema.toAttributes
-  }
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
-    // If pattern is not specified, we use '*', which is used to
-    // match any sequence of characters (including no characters).
-    val functionNames =
-      sqlContext.sessionState.catalog
-        .listFunctions(dbName, pattern.getOrElse("*"))
-        .map(_.unquotedString)
-    // The session catalog caches some persistent functions in the FunctionRegistry
-    // so there can be duplicates.
-    functionNames.distinct.sorted.map(Row(_))
-  }
-}
-
-/**
- * A command for users to get the usage of a registered function.
- * The syntax of using this command in SQL is
- * {{{
- *   DESCRIBE FUNCTION [EXTENDED] upper;
- * }}}
- */
-case class DescribeFunction(
-    functionName: String,
-    isExtended: Boolean) extends RunnableCommand {
-
-  override val output: Seq[Attribute] = {
-    val schema = StructType(
-      StructField("function_desc", StringType, nullable = false) :: Nil)
-
-    schema.toAttributes
-  }
-
-  private def replaceFunctionName(usage: String, functionName: String): String = {
-    if (usage == null) {
-      "To be added."
-    } else {
-      usage.replaceAll("_FUNC_", functionName)
-    }
-  }
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    // Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
-    functionName.toLowerCase match {
-      case "<>" =>
-        Row(s"Function: $functionName") ::
-        Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil
-      case "!=" =>
-        Row(s"Function: $functionName") ::
-        Row(s"Usage: a != b - Returns TRUE if a is not equal to b") :: Nil
-      case "between" =>
-        Row(s"Function: between") ::
-        Row(s"Usage: a [NOT] BETWEEN b AND c - " +
-          s"evaluate if a is [not] in between b and c") :: Nil
-      case "case" =>
-        Row(s"Function: case") ::
-        Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " +
-          s"When a = b, returns c; when a = d, return e; else return f") :: Nil
-      case _ => sqlContext.sessionState.functionRegistry.lookupFunction(functionName) match {
-        case Some(info) =>
-          val result =
-            Row(s"Function: ${info.getName}") ::
-            Row(s"Class: ${info.getClassName}") ::
-            Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil
-
-          if (isExtended) {
-            result :+
-              Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}")
-          } else {
-            result
-          }
-
-        case None => Seq(Row(s"Function: $functionName not found."))
-      }
-    }
-  }
-}
-
-case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
-    Seq.empty[Row]
-  }
-
-  override val output: Seq[Attribute] = Seq.empty
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
new file mode 100644
index 0000000000..33cc10d53a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.types.StringType
+
+
+/**
+ * A command for users to list the databases/schemas.
+ * If a databasePattern is supplied then the databases that only matches the
+ * pattern would be listed.
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];
+ * }}}
+ */
+case class ShowDatabasesCommand(databasePattern: Option[String]) extends RunnableCommand {
+
+  // The result of SHOW DATABASES has one column called 'result'
+  override val output: Seq[Attribute] = {
+    AttributeReference("result", StringType, nullable = false)() :: Nil
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val databases =
+      databasePattern.map(catalog.listDatabases(_)).getOrElse(catalog.listDatabases())
+    databases.map { d => Row(d) }
+  }
+}
+
+
+/**
+ * Command for setting the current database.
+ * {{{
+ *   USE database_name;
+ * }}}
+ */
+case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
+    Seq.empty[Row]
+  }
+
+  override val output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index c6e601799f..89ccacdc73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction
-import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
 
 /**
@@ -73,6 +74,69 @@ case class CreateFunction(
   }
 }
 
+
+/**
+ * A command for users to get the usage of a registered function.
+ * The syntax of using this command in SQL is
+ * {{{
+ *   DESCRIBE FUNCTION [EXTENDED] upper;
+ * }}}
+ */
+case class DescribeFunction(
+    functionName: String,
+    isExtended: Boolean) extends RunnableCommand {
+
+  override val output: Seq[Attribute] = {
+    val schema = StructType(StructField("function_desc", StringType, nullable = false) :: Nil)
+    schema.toAttributes
+  }
+
+  private def replaceFunctionName(usage: String, functionName: String): String = {
+    if (usage == null) {
+      "To be added."
+    } else {
+      usage.replaceAll("_FUNC_", functionName)
+    }
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    // Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
+    functionName.toLowerCase match {
+      case "<>" =>
+        Row(s"Function: $functionName") ::
+          Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil
+      case "!=" =>
+        Row(s"Function: $functionName") ::
+          Row(s"Usage: a != b - Returns TRUE if a is not equal to b") :: Nil
+      case "between" =>
+        Row(s"Function: between") ::
+          Row(s"Usage: a [NOT] BETWEEN b AND c - " +
+            s"evaluate if a is [not] in between b and c") :: Nil
+      case "case" =>
+        Row(s"Function: case") ::
+          Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " +
+            s"When a = b, returns c; when a = d, return e; else return f") :: Nil
+      case _ => sqlContext.sessionState.functionRegistry.lookupFunction(functionName) match {
+        case Some(info) =>
+          val result =
+            Row(s"Function: ${info.getName}") ::
+              Row(s"Class: ${info.getClassName}") ::
+              Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil
+
+          if (isExtended) {
+            result :+
+              Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}")
+          } else {
+            result
+          }
+
+        case None => Seq(Row(s"Function: $functionName not found."))
+      }
+    }
+  }
+}
+
+
 /**
  * The DDL command that drops a function.
  * ifExists: returns an error if the function doesn't exist, unless this is true.
@@ -103,3 +167,36 @@ case class DropFunction(
     Seq.empty[Row]
   }
 }
+
+
+/**
+ * A command for users to list all of the registered functions.
+ * The syntax of using this command in SQL is:
+ * {{{
+ *    SHOW FUNCTIONS [LIKE pattern]
+ * }}}
+ * For the pattern, '*' matches any sequence of characters (including no characters) and
+ * '|' is for alternation.
+ * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
+ *
+ * TODO currently we are simply ignore the db
+ */
+case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
+  override val output: Seq[Attribute] = {
+    val schema = StructType(StructField("function", StringType, nullable = false) :: Nil)
+    schema.toAttributes
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+    // If pattern is not specified, we use '*', which is used to
+    // match any sequence of characters (including no characters).
+    val functionNames =
+      sqlContext.sessionState.catalog
+        .listFunctions(dbName, pattern.getOrElse("*"))
+        .map(_.unquotedString)
+    // The session catalog caches some persistent functions in the FunctionRegistry
+    // so there can be duplicates.
+    functionNames.distinct.sorted.map(Row(_))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index b7e3056f92..eae8fe8975 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
-import org.apache.spark.sql.types.{MetadataBuilder, StringType}
+import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
 import org.apache.spark.util.Utils
 
 case class CreateTableAsSelectLogicalPlan(
@@ -313,3 +313,78 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
     result
   }
 }
+
+
+/**
+ * A command for users to get tables in the given database.
+ * If a databaseName is not given, the current database will be used.
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'];
+ * }}}
+ */
+case class ShowTablesCommand(
+    databaseName: Option[String],
+    tableIdentifierPattern: Option[String]) extends RunnableCommand {
+
+  // The result of SHOW TABLES has two columns, tableName and isTemporary.
+  override val output: Seq[Attribute] = {
+    AttributeReference("tableName", StringType, nullable = false)() ::
+      AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    // Since we need to return a Seq of rows, we will call getTables directly
+    // instead of calling tables in sqlContext.
+    val catalog = sqlContext.sessionState.catalog
+    val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+    val tables =
+      tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
+    tables.map { t =>
+      val isTemp = t.database.isEmpty
+      Row(t.table, isTemp)
+    }
+  }
+}
+
+
+/**
+ * A command for users to list the properties for a table If propertyKey is specified, the value
+ * for the propertyKey is returned. If propertyKey is not specified, all the keys and their
+ * corresponding values are returned.
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   SHOW TBLPROPERTIES table_name[('propertyKey')];
+ * }}}
+ */
+case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Option[String])
+  extends RunnableCommand {
+
+  override val output: Seq[Attribute] = {
+    val schema = AttributeReference("value", StringType, nullable = false)() :: Nil
+    propertyKey match {
+      case None => AttributeReference("key", StringType, nullable = false)() :: schema
+      case _ => schema
+    }
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+
+    if (catalog.isTemporaryTable(table)) {
+      Seq.empty[Row]
+    } else {
+      val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
+
+      propertyKey match {
+        case Some(p) =>
+          val propValue = catalogTable
+            .properties
+            .getOrElse(p, s"Table ${catalogTable.qualifiedName} does not have property: $p")
+          Seq(Row(propValue))
+        case None =>
+          catalogTable.properties.map(p => Row(p._1, p._2)).toSeq
+      }
+    }
+  }
+}
-- 
GitLab