From d8a83a564ff3fd0281007adbf8aa3757da8a2c2b Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Tue, 26 Apr 2016 21:29:25 -0700
Subject: [PATCH] [SPARK-13477][SQL] Expose new user-facing Catalog interface

## What changes were proposed in this pull request?

#12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface.

## How was this patch tested?

See `CatalogSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes #12713 from andrewor14/user-facing-catalog.
---
 .../spark/sql/catalyst/ScalaReflection.scala  |  26 +-
 .../sql/catalyst/catalog/SessionCatalog.scala |  36 +-
 .../sql/catalyst/catalog/interface.scala      |   8 +-
 .../catalyst/encoders/ExpressionEncoder.scala |   5 +-
 ...Cases.scala => ExternalCatalogSuite.scala} |  11 +-
 .../catalog/InMemoryCatalogSuite.scala        |   2 +-
 .../catalog/SessionCatalogSuite.scala         |   2 +-
 .../org/apache/spark/sql/SQLContext.scala     |  33 +-
 .../org/apache/spark/sql/SparkSession.scala   | 248 +-----------
 .../apache/spark/sql/catalog/Catalog.scala    | 214 +++++++++++
 .../apache/spark/sql/catalog/interface.scala  | 101 +++++
 .../spark/sql/execution/SparkSqlParser.scala  |   6 +-
 .../spark/sql/execution/command/cache.scala   |   4 +-
 .../sql/execution/command/commands.scala      |   4 +-
 .../command/createDataSourceTables.scala      |   4 +-
 .../spark/sql/execution/command/ddl.scala     |   8 +-
 .../spark/sql/execution/command/tables.scala  |   2 +-
 .../spark/sql/execution/command/views.scala   |   2 +-
 .../spark/sql/internal/CatalogImpl.scala      | 352 ++++++++++++++++++
 .../sql/execution/command/DDLSuite.scala      |   2 +-
 .../spark/sql/internal/CatalogSuite.scala     | 271 ++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala |   2 +-
 .../spark/sql/hive/MetastoreRelation.scala    |   8 +-
 .../sql/hive/client/HiveClientImpl.scala      |  16 +-
 .../spark/sql/hive/CachedTableSuite.scala     |   4 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala  |  18 +-
 .../sql/hive/HiveExternalCatalogSuite.scala   |   2 +-
 .../sql/hive/HiveMetastoreCatalogSuite.scala  |   6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala  |  10 +-
 .../spark/sql/hive/client/VersionsSuite.scala |   2 +-
 .../sql/hive/execution/HiveDDLSuite.scala     |   6 +-
 31 files changed, 1090 insertions(+), 325 deletions(-)
 rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/{CatalogTestCases.scala => ExternalCatalogSuite.scala} (98%)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index bd723135b5..be67605c45 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -22,7 +22,15 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.Utils
+
+
+/**
+ * A helper trait to create [[org.apache.spark.sql.catalyst.encoders.ExpressionEncoder]]s
+ * for classes whose fields are entirely defined by constructor params but should not be
+ * case classes.
+ */
+private[sql] trait DefinedByConstructorParams
+
 
 /**
  * A default version of ScalaReflection that uses the runtime universe.
@@ -333,7 +341,7 @@ object ScalaReflection extends ScalaReflection {
           "toScalaMap",
           keyData :: valueData :: Nil)
 
-      case t if t <:< localTypeOf[Product] =>
+      case t if definedByConstructorParams(t) =>
         val params = getConstructorParameters(t)
 
         val cls = getClassFromType(tpe)
@@ -401,7 +409,7 @@ object ScalaReflection extends ScalaReflection {
     val clsName = getClassNameFromType(tpe)
     val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil
     serializerFor(inputObject, tpe, walkedTypePath) match {
-      case expressions.If(_, _, s: CreateNamedStruct) if tpe <:< localTypeOf[Product] => s
+      case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s
       case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil)
     }
   }
@@ -491,7 +499,7 @@ object ScalaReflection extends ScalaReflection {
                 serializerFor(unwrapped, optType, newPath))
           }
 
-        case t if t <:< localTypeOf[Product] =>
+        case t if definedByConstructorParams(t) =>
           val params = getConstructorParameters(t)
           val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) =>
             val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType))
@@ -680,7 +688,7 @@ object ScalaReflection extends ScalaReflection {
         val Schema(valueDataType, valueNullable) = schemaFor(valueType)
         Schema(MapType(schemaFor(keyType).dataType,
           valueDataType, valueContainsNull = valueNullable), nullable = true)
-      case t if t <:< localTypeOf[Product] =>
+      case t if definedByConstructorParams(t) =>
         val params = getConstructorParameters(t)
         Schema(StructType(
           params.map { case (fieldName, fieldType) =>
@@ -712,6 +720,14 @@ object ScalaReflection extends ScalaReflection {
         throw new UnsupportedOperationException(s"Schema for type $other is not supported")
     }
   }
+
+  /**
+   * Whether the fields of the given type is defined entirely by its constructor parameters.
+   */
+  private[sql] def definedByConstructorParams(tpe: Type): Boolean = {
+    tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
+  }
+
 }
 
 /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 402aacfc1f..91d35de790 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -612,6 +612,25 @@ class SessionCatalog(
       s"a permanent function registered in the database $currentDb.")
   }
 
+  /**
+   * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
+   */
+  private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
+    // TODO: just make function registry take in FunctionIdentifier instead of duplicating this
+    val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb)))
+    functionRegistry.lookupFunction(name.funcName)
+      .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
+      .getOrElse {
+        val db = qualifiedName.database.get
+        if (externalCatalog.functionExists(db, name.funcName)) {
+          val metadata = externalCatalog.getFunction(db, name.funcName)
+          new ExpressionInfo(metadata.className, qualifiedName.unquotedString)
+        } else {
+          failFunctionLookup(name.funcName)
+        }
+      }
+  }
+
   /**
    * Return an [[Expression]] that represents the specified function, assuming it exists.
    *
@@ -646,6 +665,7 @@ class SessionCatalog(
     // The function has not been loaded to the function registry, which means
     // that the function is a permanent function (if it actually has been registered
     // in the metastore). We need to first put the function in the FunctionRegistry.
+    // TODO: why not just check whether the function exists first?
     val catalogFunction = try {
       externalCatalog.getFunction(currentDb, name.funcName)
     } catch {
@@ -662,7 +682,7 @@ class SessionCatalog(
     val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
     createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
     // Now, we need to create the Expression.
-    return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
+    functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
   }
 
   /**
@@ -687,8 +707,8 @@ class SessionCatalog(
   // -----------------
 
   /**
-   * Drop all existing databases (except "default") along with all associated tables,
-   * partitions and functions, and set the current database to "default".
+   * Drop all existing databases (except "default"), tables, partitions and functions,
+   * and set the current database to "default".
    *
    * This is mainly used for tests.
    */
@@ -697,6 +717,16 @@ class SessionCatalog(
     listDatabases().filter(_ != default).foreach { db =>
       dropDatabase(db, ignoreIfNotExists = false, cascade = true)
     }
+    listTables(default).foreach { table =>
+      dropTable(table, ignoreIfNotExists = false)
+    }
+    listFunctions(default).foreach { func =>
+      if (func.database.isDefined) {
+        dropFunction(func, ignoreIfNotExists = false)
+      } else {
+        dropTempFunction(func.funcName, ignoreIfNotExists = false)
+      }
+    }
     tempTables.clear()
     functionRegistry.clear()
     // restore built-in functions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 9e90987731..d1e2b3f664 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -299,10 +299,10 @@ case class CatalogTable(
 
 case class CatalogTableType private(name: String)
 object CatalogTableType {
-  val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
-  val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
-  val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
-  val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
+  val EXTERNAL = new CatalogTableType("EXTERNAL")
+  val MANAGED = new CatalogTableType("MANAGED")
+  val INDEX = new CatalogTableType("INDEX")
+  val VIEW = new CatalogTableType("VIEW")
 }
 
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 56d29cfbe1..5d294485af 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -47,8 +47,9 @@ object ExpressionEncoder {
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
     // We convert the not-serializable TypeTag into StructType and ClassTag.
     val mirror = typeTag[T].mirror
-    val cls = mirror.runtimeClass(typeTag[T].tpe)
-    val flat = !classOf[Product].isAssignableFrom(cls)
+    val tpe = typeTag[T].tpe
+    val cls = mirror.runtimeClass(tpe)
+    val flat = !ScalaReflection.definedByConstructorParams(tpe)
 
     val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = false)
     val serializer = ScalaReflection.serializerFor[T](inputObject)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
similarity index 98%
rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index f961fe3292..d739b17743 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.Utils
  *
  * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
  */
-abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
+abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEach {
   protected val utils: CatalogTestUtils
   import utils._
 
@@ -152,10 +152,10 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
   test("the table type of an external table should be EXTERNAL_TABLE") {
     val catalog = newBasicCatalog()
     val table =
-      newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL_TABLE)
+      newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
     catalog.createTable("db2", table, ignoreIfExists = false)
     val actual = catalog.getTable("db2", "external_table1")
-    assert(actual.tableType === CatalogTableType.EXTERNAL_TABLE)
+    assert(actual.tableType === CatalogTableType.EXTERNAL)
   }
 
   test("drop table") {
@@ -551,14 +551,15 @@ abstract class CatalogTestUtils {
   def newTable(name: String, database: Option[String] = None): CatalogTable = {
     CatalogTable(
       identifier = TableIdentifier(name, database),
-      tableType = CatalogTableType.EXTERNAL_TABLE,
+      tableType = CatalogTableType.EXTERNAL,
       storage = storageFormat,
       schema = Seq(
         CatalogColumn("col1", "int"),
         CatalogColumn("col2", "string"),
         CatalogColumn("a", "int"),
         CatalogColumn("b", "string")),
-      partitionColumnNames = Seq("a", "b"))
+      partitionColumnNames = Seq("a", "b"),
+      bucketColumnNames = Seq("col1"))
   }
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 63a7b2c661..0605daa3f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
 
 
 /** Test suite for the [[InMemoryCatalog]]. */
-class InMemoryCatalogSuite extends CatalogTestCases {
+class InMemoryCatalogSuite extends ExternalCatalogSuite {
 
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {
     override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 1933be50b2..ba5d8ce0f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
 /**
  * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
  *
- * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]].
+ * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]].
  * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
  * signatures but do not extend a common parent. This is largely by design but
  * unfortunately leads to very similar test code in two places.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 47c043a00d..dbbdf11e59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
@@ -258,7 +259,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def isCached(tableName: String): Boolean = {
-    sparkSession.isCached(tableName)
+    sparkSession.catalog.isCached(tableName)
   }
 
   /**
@@ -267,7 +268,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   private[sql] def isCached(qName: Dataset[_]): Boolean = {
-    sparkSession.isCached(qName)
+    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
   }
 
   /**
@@ -276,7 +277,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def cacheTable(tableName: String): Unit = {
-    sparkSession.cacheTable(tableName)
+    sparkSession.catalog.cacheTable(tableName)
   }
 
   /**
@@ -285,7 +286,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def uncacheTable(tableName: String): Unit = {
-    sparkSession.uncacheTable(tableName)
+    sparkSession.catalog.uncacheTable(tableName)
   }
 
   /**
@@ -293,7 +294,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def clearCache(): Unit = {
-    sparkSession.clearCache()
+    sparkSession.catalog.clearCache()
   }
 
   // scalastyle:off
@@ -507,7 +508,7 @@ class SQLContext private[sql](
    */
   @Experimental
   def createExternalTable(tableName: String, path: String): DataFrame = {
-    sparkSession.createExternalTable(tableName, path)
+    sparkSession.catalog.createExternalTable(tableName, path)
   }
 
   /**
@@ -523,7 +524,7 @@ class SQLContext private[sql](
       tableName: String,
       path: String,
       source: String): DataFrame = {
-    sparkSession.createExternalTable(tableName, path, source)
+    sparkSession.catalog.createExternalTable(tableName, path, source)
   }
 
   /**
@@ -539,7 +540,7 @@ class SQLContext private[sql](
       tableName: String,
       source: String,
       options: java.util.Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, options)
+    sparkSession.catalog.createExternalTable(tableName, source, options)
   }
 
   /**
@@ -556,7 +557,7 @@ class SQLContext private[sql](
       tableName: String,
       source: String,
       options: Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, options)
+    sparkSession.catalog.createExternalTable(tableName, source, options)
   }
 
   /**
@@ -573,7 +574,7 @@ class SQLContext private[sql](
       source: String,
       schema: StructType,
       options: java.util.Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, schema, options)
+    sparkSession.catalog.createExternalTable(tableName, source, schema, options)
   }
 
   /**
@@ -591,7 +592,7 @@ class SQLContext private[sql](
       source: String,
       schema: StructType,
       options: Map[String, String]): DataFrame = {
-    sparkSession.createExternalTable(tableName, source, schema, options)
+    sparkSession.catalog.createExternalTable(tableName, source, schema, options)
   }
 
   /**
@@ -611,7 +612,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def dropTempTable(tableName: String): Unit = {
-    sparkSession.dropTempTable(tableName)
+    sparkSession.catalog.dropTempTable(tableName)
   }
 
   /**
@@ -700,7 +701,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tables(): DataFrame = {
-    sparkSession.tables()
+    Dataset.ofRows(sparkSession, ShowTablesCommand(None, None))
   }
 
   /**
@@ -712,7 +713,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tables(databaseName: String): DataFrame = {
-    sparkSession.tables(databaseName)
+    Dataset.ofRows(sparkSession, ShowTablesCommand(Some(databaseName), None))
   }
 
   /**
@@ -730,7 +731,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(): Array[String] = {
-    sparkSession.tableNames()
+    sparkSession.catalog.listTables().collect().map(_.name)
   }
 
   /**
@@ -740,7 +741,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(databaseName: String): Array[String] = {
-    sparkSession.tableNames(databaseName)
+    sparkSession.catalog.listTables(databaseName).collect().map(_.name)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a0f0bd3f59..6477f42680 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -31,16 +31,16 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalog.Catalog
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ShowTablesCommand
-import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{CatalogImpl, RuntimeConfigImpl, SessionState, SharedState}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, LongType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -191,10 +191,6 @@ class SparkSession private(
    |  Methods for accessing or mutating configurations  |
    * -------------------------------------------------- */
 
-  @transient private lazy val _conf: RuntimeConfig = {
-    new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
-  }
-
   /**
    * Runtime configuration interface for Spark.
    *
@@ -205,7 +201,9 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def conf: RuntimeConfig = _conf
+  @transient lazy val conf: RuntimeConfig = {
+    new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
+  }
 
   /**
    * Set Spark SQL configuration properties.
@@ -274,61 +272,6 @@ class SparkSession private(
   }
 
 
-  /* ------------------------------------- *
-   |  Methods related to cache management  |
-   * ------------------------------------- */
-
-  /**
-   * Returns true if the table is currently cached in-memory.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def isCached(tableName: String): Boolean = {
-    cacheManager.lookupCachedData(table(tableName)).nonEmpty
-  }
-
-  /**
-   * Caches the specified table in-memory.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def cacheTable(tableName: String): Unit = {
-    cacheManager.cacheQuery(table(tableName), Some(tableName))
-  }
-
-  /**
-   * Removes the specified table from the in-memory cache.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def uncacheTable(tableName: String): Unit = {
-    cacheManager.uncacheQuery(table(tableName))
-  }
-
-  /**
-   * Removes all cached tables from the in-memory cache.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  def clearCache(): Unit = {
-    cacheManager.clearCache()
-  }
-
-  /**
-   * Returns true if the [[Dataset]] is currently cached in-memory.
-   *
-   * @group cachemgmt
-   * @since 2.0.0
-   */
-  protected[sql] def isCached(qName: Dataset[_]): Boolean = {
-    cacheManager.lookupCachedData(qName).nonEmpty
-  }
-
-
   /* --------------------------------- *
    |  Methods for creating DataFrames  |
    * --------------------------------- */
@@ -605,139 +548,18 @@ class SparkSession private(
   }
 
 
-  /* --------------------------- *
-   |  Methods related to tables  |
-   * --------------------------- */
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path and returns the corresponding DataFrame.
-   * It will use the default data source configured by spark.sql.sources.default.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(tableName: String, path: String): DataFrame = {
-    val dataSourceName = sessionState.conf.defaultDataSourceName
-    createExternalTable(tableName, path, dataSourceName)
-  }
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source
-   * and returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
-    createExternalTable(tableName, source, Map("path" -> path))
-  }
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source and a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      options: java.util.Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, options.asScala.toMap)
-  }
-
-  /**
-   * :: Experimental ::
-   * (Scala-specific)
-   * Creates an external table from the given path based on a data source and a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      options: Map[String, String]): DataFrame = {
-    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    val cmd =
-      CreateTableUsing(
-        tableIdent,
-        userSpecifiedSchema = None,
-        source,
-        temporary = false,
-        options,
-        allowExisting = false,
-        managedIfNoPath = false)
-    executePlan(cmd).toRdd
-    table(tableIdent)
-  }
-
-  /**
-   * :: Experimental ::
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, schema, options.asScala.toMap)
-  }
+  /* ------------------------ *
+   |  Catalog-related methods |
+   * ----------------- ------ */
 
   /**
-   * :: Experimental ::
-   * (Scala-specific)
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * Interface through which the user may create, drop, alter or query underlying
+   * databases, tables, functions etc.
    *
    * @group ddl_ops
    * @since 2.0.0
    */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      schema: StructType,
-      options: Map[String, String]): DataFrame = {
-    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    val cmd =
-      CreateTableUsing(
-        tableIdent,
-        userSpecifiedSchema = Some(schema),
-        source,
-        temporary = false,
-        options,
-        allowExisting = false,
-        managedIfNoPath = false)
-    executePlan(cmd).toRdd
-    table(tableIdent)
-  }
-
-  /**
-   * Drops the temporary table with the given table name in the catalog.
-   * If the table has been cached/persisted before, it's also unpersisted.
-   *
-   * @param tableName the name of the table to be unregistered.
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def dropTempTable(tableName: String): Unit = {
-    cacheManager.tryUncacheQuery(table(tableName))
-    sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
-  }
+  @transient lazy val catalog: Catalog = new CatalogImpl(self)
 
   /**
    * Returns the specified table as a [[DataFrame]].
@@ -749,54 +571,10 @@ class SparkSession private(
     table(sessionState.sqlParser.parseTableIdentifier(tableName))
   }
 
-  private def table(tableIdent: TableIdentifier): DataFrame = {
+  protected[sql] def table(tableIdent: TableIdentifier): DataFrame = {
     Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
   }
 
-  /**
-   * Returns a [[DataFrame]] containing names of existing tables in the current database.
-   * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tables(): DataFrame = {
-    Dataset.ofRows(self, ShowTablesCommand(None, None))
-  }
-
-  /**
-   * Returns a [[DataFrame]] containing names of existing tables in the given database.
-   * The returned DataFrame has two columns, tableName and isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tables(databaseName: String): DataFrame = {
-    Dataset.ofRows(self, ShowTablesCommand(Some(databaseName), None))
-  }
-
-  /**
-   * Returns the names of tables in the current database as an array.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tableNames(): Array[String] = {
-    tableNames(sessionState.catalog.getCurrentDatabase)
-  }
-
-  /**
-   * Returns the names of tables in the given database as an array.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  def tableNames(databaseName: String): Array[String] = {
-    sessionState.catalog.listTables(databaseName).map(_.table).toArray
-  }
-
   /**
    * Registers the given [[DataFrame]] as a temporary table in the catalog.
    * Temporary tables exist only during the lifetime of this instance of [[SparkSession]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
new file mode 100644
index 0000000000..868cc3a726
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 2.0.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current default database in this session.
+   *
+   * @since 2.0.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases available across all sessions.
+   *
+   * @since 2.0.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables in the current database.
+   * This includes all temporary tables.
+   *
+   * @since 2.0.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables in the specified database.
+   * This includes all temporary tables.
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database.
+   * This includes all temporary functions
+   *
+   * @since 2.0.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database.
+   * This includes all temporary functions
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table in the current database.
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table in the specified database.
+   *
+   * @since 2.0.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path and returns the corresponding DataFrame.
+   * It will use the default data source configured by spark.sql.sources.default.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(tableName: String, path: String): DataFrame
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source
+   * and returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(tableName: String, path: String, source: String): DataFrame
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): DataFrame
+
+  /**
+   * :: Experimental ::
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame
+
+  /**
+   * Drops the temporary table with the given table name in the catalog.
+   * If the table has been cached/persisted before, it's also unpersisted.
+   *
+   * @param tableName the name of the table to be unregistered.
+   * @since 2.0.0
+   */
+  def dropTempTable(tableName: String): Unit
+
+  /**
+   * Returns true if the table is currently cached in-memory.
+   *
+   * @since 2.0.0
+   */
+  def isCached(tableName: String): Boolean
+
+  /**
+   * Caches the specified table in-memory.
+   *
+   * @since 2.0.0
+   */
+  def cacheTable(tableName: String): Unit
+
+  /**
+   * Removes the specified table from the in-memory cache.
+   *
+   * @since 2.0.0
+   */
+  def uncacheTable(tableName: String): Unit
+
+  /**
+   * Removes all cached tables from the in-memory cache.
+   *
+   * @since 2.0.0
+   */
+  def clearCache(): Unit
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
new file mode 100644
index 0000000000..d5de6cd484
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import javax.annotation.Nullable
+
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+
+
+// Note: all classes here are expected to be wrapped in Datasets and so must extend
+// DefinedByConstructorParams for the catalog to be able to create encoders for them.
+
+class Database(
+    val name: String,
+    @Nullable val description: String,
+    val locationUri: String)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Database[" +
+      s"name='$name', " +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"path='$locationUri']"
+  }
+
+}
+
+
+class Table(
+    val name: String,
+    @Nullable val database: String,
+    @Nullable val description: String,
+    val tableType: String,
+    val isTemporary: Boolean)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Table[" +
+      s"name='$name', " +
+      Option(database).map { d => s"database='$d', " }.getOrElse("") +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"tableType='$tableType', " +
+      s"isTemporary='$isTemporary']"
+  }
+
+}
+
+
+class Column(
+    val name: String,
+    @Nullable val description: String,
+    val dataType: String,
+    val nullable: Boolean,
+    val isPartition: Boolean,
+    val isBucket: Boolean)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Column[" +
+      s"name='$name', " +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"dataType='$dataType', " +
+      s"nullable='$nullable', " +
+      s"isPartition='$isPartition', " +
+      s"isBucket='$isBucket']"
+  }
+
+}
+
+
+class Function(
+    val name: String,
+    @Nullable val description: String,
+    val className: String,
+    val isTemporary: Boolean)
+  extends DefinedByConstructorParams {
+
+  override def toString: String = {
+    "Function[" +
+      s"name='$name', " +
+      Option(description).map { d => s"description='$d', " }.getOrElse("") +
+      s"className='$className', " +
+      s"isTemporary='$isTemporary']"
+  }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ebc60edcba..e04e130eb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -835,9 +835,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
     }
     val tableType = if (external) {
-      CatalogTableType.EXTERNAL_TABLE
+      CatalogTableType.EXTERNAL
     } else {
-      CatalogTableType.MANAGED_TABLE
+      CatalogTableType.MANAGED
     }
     val comment = Option(ctx.STRING).map(string)
     val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
@@ -1083,7 +1083,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val sql = Option(source(query))
     val tableDesc = CatalogTable(
       identifier = visitTableIdentifier(name),
-      tableType = CatalogTableType.VIRTUAL_VIEW,
+      tableType = CatalogTableType.VIEW,
       schema = schema,
       storage = EmptyStorageFormat,
       properties = properties,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index c283bd61d4..ec3fadab50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -32,7 +32,7 @@ case class CacheTableCommand(
     plan.foreach { logicalPlan =>
       sparkSession.registerDataFrameAsTable(Dataset.ofRows(sparkSession, logicalPlan), tableName)
     }
-    sparkSession.cacheTable(tableName)
+    sparkSession.catalog.cacheTable(tableName)
 
     if (!isLazy) {
       // Performs eager caching
@@ -62,7 +62,7 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 case object ClearCacheCommand extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.clearCache()
+    sparkSession.catalog.clearCache()
     Seq.empty[Row]
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 6b1d413845..855e7e2fe3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -183,8 +183,8 @@ case class ShowPartitionsCommand(
        * 2. If it is a datasource table.
        * 3. If it is a view or index table.
        */
-      if (tab.tableType == CatalogTableType.VIRTUAL_VIEW ||
-        tab.tableType == CatalogTableType.INDEX_TABLE) {
+      if (tab.tableType == CatalogTableType.VIEW ||
+        tab.tableType == CatalogTableType.INDEX) {
         throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
           s"${tab.qualifiedName}")
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 31900b4993..f670f63472 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -323,10 +323,10 @@ object CreateDataSourceTableUtils extends Logging {
 
     val tableType = if (isExternal) {
       tableProperties.put("EXTERNAL", "TRUE")
-      CatalogTableType.EXTERNAL_TABLE
+      CatalogTableType.EXTERNAL
     } else {
       tableProperties.put("EXTERNAL", "FALSE")
-      CatalogTableType.MANAGED_TABLE
+      CatalogTableType.MANAGED
     }
 
     val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ecde3320b1..12167ee307 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -204,10 +204,10 @@ case class DropTable(
       // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
       // issue an exception.
       catalog.getTableMetadataOption(tableName).map(_.tableType match {
-        case CatalogTableType.VIRTUAL_VIEW if !isView =>
+        case CatalogTableType.VIEW if !isView =>
           throw new AnalysisException(
             "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
-        case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
+        case o if o != CatalogTableType.VIEW && isView =>
           throw new AnalysisException(
             s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
         case _ =>
@@ -527,10 +527,10 @@ private[sql] object DDLUtils {
       tableIdentifier: TableIdentifier,
       isView: Boolean): Unit = {
     catalog.getTableMetadataOption(tableIdentifier).map(_.tableType match {
-      case CatalogTableType.VIRTUAL_VIEW if !isView =>
+      case CatalogTableType.VIEW if !isView =>
         throw new AnalysisException(
           "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
-      case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
+      case o if o != CatalogTableType.VIEW && isView =>
         throw new AnalysisException(
           s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
       case _ =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 700a704941..8d9feec9b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -73,7 +73,7 @@ case class CreateTableLike(
 
     val tableToCreate = catalog.getTableMetadata(sourceTable).copy(
       identifier = targetTable,
-      tableType = CatalogTableType.MANAGED_TABLE,
+      tableType = CatalogTableType.MANAGED,
       createTime = System.currentTimeMillis,
       lastAccessTime = -1).withNewStorage(locationUri = None)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index f42b56fdc3..1641780db8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -52,7 +52,7 @@ case class CreateViewCommand(
 
   override def output: Seq[Attribute] = Seq.empty[Attribute]
 
-  require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW)
+  require(tableDesc.tableType == CatalogTableType.VIEW)
   require(tableDesc.viewText.isDefined)
 
   private val tableIdentifier = tableDesc.identifier
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
new file mode 100644
index 0000000000..976c9c53de
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import scala.collection.JavaConverters._
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * Internal implementation of the user-facing [[Catalog]].
+ */
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog
+
+  private def requireDatabaseExists(dbName: String): Unit = {
+    if (!sessionCatalog.databaseExists(dbName)) {
+      throw new AnalysisException(s"Database '$dbName' does not exist.")
+    }
+  }
+
+  private def requireTableExists(dbName: String, tableName: String): Unit = {
+    if (!sessionCatalog.tableExists(TableIdentifier(tableName, Some(dbName)))) {
+      throw new AnalysisException(s"Table '$tableName' does not exist in database '$dbName'.")
+    }
+  }
+
+  private def makeDataset[T <: DefinedByConstructorParams: TypeTag](data: Seq[T]): Dataset[T] = {
+    val enc = ExpressionEncoder[T]()
+    val encoded = data.map(d => enc.toRow(d).copy())
+    val plan = new LocalRelation(enc.schema.toAttributes, encoded)
+    val queryExecution = sparkSession.executePlan(plan)
+    new Dataset[T](sparkSession, queryExecution, enc)
+  }
+
+  /**
+   * Returns the current default database in this session.
+   */
+  override def currentDatabase: String = sessionCatalog.getCurrentDatabase
+
+  /**
+   * Sets the current default database in this session.
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    requireDatabaseExists(dbName)
+    sessionCatalog.setCurrentDatabase(dbName)
+  }
+
+  /**
+   * Returns a list of databases available across all sessions.
+   */
+  override def listDatabases(): Dataset[Database] = {
+    val databases = sessionCatalog.listDatabases().map { dbName =>
+      val metadata = sessionCatalog.getDatabaseMetadata(dbName)
+      new Database(
+        name = metadata.name,
+        description = metadata.description,
+        locationUri = metadata.locationUri)
+    }
+    makeDataset(databases)
+  }
+
+  /**
+   * Returns a list of tables in the current database.
+   * This includes all temporary tables.
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables in the specified database.
+   * This includes all temporary tables.
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    requireDatabaseExists(dbName)
+    val tables = sessionCatalog.listTables(dbName).map { tableIdent =>
+      val isTemp = tableIdent.database.isEmpty
+      val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
+      new Table(
+        name = tableIdent.identifier,
+        database = metadata.flatMap(_.identifier.database).orNull,
+        description = metadata.flatMap(_.comment).orNull,
+        tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
+        isTemporary = isTemp)
+    }
+    makeDataset(tables)
+  }
+
+  /**
+   * Returns a list of functions registered in the current database.
+   * This includes all temporary functions
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database.
+   * This includes all temporary functions
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    requireDatabaseExists(dbName)
+    val functions = sessionCatalog.listFunctions(dbName).map { funcIdent =>
+      val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
+      new Function(
+        name = funcIdent.identifier,
+        description = null, // for now, this is always undefined
+        className = metadata.getClassName,
+        isTemporary = funcIdent.database.isEmpty)
+    }
+    makeDataset(functions)
+  }
+
+  /**
+   * Returns a list of columns for the given table in the current database.
+   */
+  @throws[AnalysisException]("table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    listColumns(currentDatabase, tableName)
+  }
+
+  /**
+   * Returns a list of columns for the given table in the specified database.
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    requireTableExists(dbName, tableName)
+    val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, Some(dbName)))
+    val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
+    val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
+    val columns = tableMetadata.schema.map { c =>
+      new Column(
+        name = c.name,
+        description = c.comment.orNull,
+        dataType = c.dataType,
+        nullable = c.nullable,
+        isPartition = partitionColumnNames.contains(c.name),
+        isBucket = bucketColumnNames.contains(c.name))
+    }
+    makeDataset(columns)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path and returns the corresponding DataFrame.
+   * It will use the default data source configured by spark.sql.sources.default.
+   *
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  @Experimental
+  override def createExternalTable(tableName: String, path: String): DataFrame = {
+    val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName
+    createExternalTable(tableName, path, dataSourceName)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source
+   * and returns the corresponding DataFrame.
+   *
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  @Experimental
+  override def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
+    createExternalTable(tableName, source, Map("path" -> path))
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  @Experimental
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame = {
+    createExternalTable(tableName, source, options.asScala.toMap)
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  @Experimental
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): DataFrame = {
+    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val cmd =
+      CreateTableUsing(
+        tableIdent,
+        userSpecifiedSchema = None,
+        source,
+        temporary = false,
+        options,
+        allowExisting = false,
+        managedIfNoPath = false)
+    sparkSession.executePlan(cmd).toRdd
+    sparkSession.table(tableIdent)
+  }
+
+  /**
+   * :: Experimental ::
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   *
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  @Experimental
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame = {
+    createExternalTable(tableName, source, schema, options.asScala.toMap)
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   *
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  @Experimental
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame = {
+    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val cmd =
+      CreateTableUsing(
+        tableIdent,
+        userSpecifiedSchema = Some(schema),
+        source,
+        temporary = false,
+        options,
+        allowExisting = false,
+        managedIfNoPath = false)
+    sparkSession.executePlan(cmd).toRdd
+    sparkSession.table(tableIdent)
+  }
+
+  /**
+   * Drops the temporary table with the given table name in the catalog.
+   * If the table has been cached/persisted before, it's also unpersisted.
+   *
+   * @param tableName the name of the table to be unregistered.
+   * @group ddl_ops
+   * @since 2.0.0
+   */
+  override def dropTempTable(tableName: String): Unit = {
+    sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName))
+    sessionCatalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
+  }
+
+  /**
+   * Returns true if the table is currently cached in-memory.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  override def isCached(tableName: String): Boolean = {
+    sparkSession.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
+  }
+
+  /**
+   * Caches the specified table in-memory.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  override def cacheTable(tableName: String): Unit = {
+    sparkSession.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
+  }
+
+  /**
+   * Removes the specified table from the in-memory cache.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  override def uncacheTable(tableName: String): Unit = {
+    sparkSession.cacheManager.uncacheQuery(sparkSession.table(tableName))
+  }
+
+  /**
+   * Removes all cached tables from the in-memory cache.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  override def clearCache(): Unit = {
+    sparkSession.cacheManager.clearCache()
+  }
+
+  /**
+   * Returns true if the [[Dataset]] is currently cached in-memory.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  protected[sql] def isCached(qName: Dataset[_]): Boolean = {
+    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
+  }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e601ff1e35..58330c49c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -69,7 +69,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
     catalog.createTable(CatalogTable(
       identifier = name,
-      tableType = CatalogTableType.EXTERNAL_TABLE,
+      tableType = CatalogTableType.EXTERNAL,
       storage = CatalogStorageFormat(None, None, None, None, Map()),
       schema = Seq()), ignoreIfExists = false)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
new file mode 100644
index 0000000000..986d8f514f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+/**
+ * Tests for the user-facing [[org.apache.spark.sql.catalog.Catalog]].
+ */
+class CatalogSuite
+  extends SparkFunSuite
+  with BeforeAndAfterEach
+  with SharedSQLContext {
+
+  private def sparkSession: SparkSession = sqlContext.sparkSession
+  private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog
+
+  private val utils = new CatalogTestUtils {
+    override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
+    override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
+    override def newEmptyCatalog(): ExternalCatalog = sparkSession.sharedState.externalCatalog
+  }
+
+  private def createDatabase(name: String): Unit = {
+    sessionCatalog.createDatabase(utils.newDb(name), ignoreIfExists = false)
+  }
+
+  private def dropDatabase(name: String): Unit = {
+    sessionCatalog.dropDatabase(name, ignoreIfNotExists = false, cascade = true)
+  }
+
+  private def createTable(name: String, db: Option[String] = None): Unit = {
+    sessionCatalog.createTable(utils.newTable(name, db), ignoreIfExists = false)
+  }
+
+  private def createTempTable(name: String): Unit = {
+    sessionCatalog.createTempTable(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true)
+  }
+
+  private def dropTable(name: String, db: Option[String] = None): Unit = {
+    sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false)
+  }
+
+  private def createFunction(name: String, db: Option[String] = None): Unit = {
+    sessionCatalog.createFunction(utils.newFunc(name, db), ignoreIfExists = false)
+  }
+
+  private def createTempFunction(name: String): Unit = {
+    val info = new ExpressionInfo("className", name)
+    val tempFunc = (e: Seq[Expression]) => e.head
+    sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
+  }
+
+  private def dropFunction(name: String, db: Option[String] = None): Unit = {
+    sessionCatalog.dropFunction(FunctionIdentifier(name, db), ignoreIfNotExists = false)
+  }
+
+  private def dropTempFunction(name: String): Unit = {
+    sessionCatalog.dropTempFunction(name, ignoreIfNotExists = false)
+  }
+
+  private def testListColumns(tableName: String, dbName: Option[String]): Unit = {
+    val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, dbName))
+    val columns = dbName
+      .map { db => sparkSession.catalog.listColumns(db, tableName) }
+      .getOrElse { sparkSession.catalog.listColumns(tableName) }
+    assume(tableMetadata.schema.nonEmpty, "bad test")
+    assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
+    assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test")
+    assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
+    columns.collect().foreach { col =>
+      assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
+      assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name))
+    }
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      sessionCatalog.reset()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  test("current database") {
+    assert(sparkSession.catalog.currentDatabase == "default")
+    assert(sessionCatalog.getCurrentDatabase == "default")
+    createDatabase("my_db")
+    sparkSession.catalog.setCurrentDatabase("my_db")
+    assert(sparkSession.catalog.currentDatabase == "my_db")
+    assert(sessionCatalog.getCurrentDatabase == "my_db")
+    val e = intercept[AnalysisException] {
+      sparkSession.catalog.setCurrentDatabase("unknown_db")
+    }
+    assert(e.getMessage.contains("unknown_db"))
+  }
+
+  test("list databases") {
+    assert(sparkSession.catalog.listDatabases().collect().map(_.name).toSet == Set("default"))
+    createDatabase("my_db1")
+    createDatabase("my_db2")
+    assert(sparkSession.catalog.listDatabases().collect().map(_.name).toSet ==
+      Set("default", "my_db1", "my_db2"))
+    dropDatabase("my_db1")
+    assert(sparkSession.catalog.listDatabases().collect().map(_.name).toSet ==
+      Set("default", "my_db2"))
+  }
+
+  test("list tables") {
+    assert(sparkSession.catalog.listTables().collect().isEmpty)
+    createTable("my_table1")
+    createTable("my_table2")
+    createTempTable("my_temp_table")
+    assert(sparkSession.catalog.listTables().collect().map(_.name).toSet ==
+      Set("my_table1", "my_table2", "my_temp_table"))
+    dropTable("my_table1")
+    assert(sparkSession.catalog.listTables().collect().map(_.name).toSet ==
+      Set("my_table2", "my_temp_table"))
+    dropTable("my_temp_table")
+    assert(sparkSession.catalog.listTables().collect().map(_.name).toSet == Set("my_table2"))
+  }
+
+  test("list tables with database") {
+    assert(sparkSession.catalog.listTables("default").collect().isEmpty)
+    createDatabase("my_db1")
+    createDatabase("my_db2")
+    createTable("my_table1", Some("my_db1"))
+    createTable("my_table2", Some("my_db2"))
+    createTempTable("my_temp_table")
+    assert(sparkSession.catalog.listTables("default").collect().map(_.name).toSet ==
+      Set("my_temp_table"))
+    assert(sparkSession.catalog.listTables("my_db1").collect().map(_.name).toSet ==
+      Set("my_table1", "my_temp_table"))
+    assert(sparkSession.catalog.listTables("my_db2").collect().map(_.name).toSet ==
+      Set("my_table2", "my_temp_table"))
+    dropTable("my_table1", Some("my_db1"))
+    assert(sparkSession.catalog.listTables("my_db1").collect().map(_.name).toSet ==
+      Set("my_temp_table"))
+    assert(sparkSession.catalog.listTables("my_db2").collect().map(_.name).toSet ==
+      Set("my_table2", "my_temp_table"))
+    dropTable("my_temp_table")
+    assert(sparkSession.catalog.listTables("default").collect().map(_.name).isEmpty)
+    assert(sparkSession.catalog.listTables("my_db1").collect().map(_.name).isEmpty)
+    assert(sparkSession.catalog.listTables("my_db2").collect().map(_.name).toSet ==
+      Set("my_table2"))
+    val e = intercept[AnalysisException] {
+      sparkSession.catalog.listTables("unknown_db")
+    }
+    assert(e.getMessage.contains("unknown_db"))
+  }
+
+  test("list functions") {
+    assert(Set("+", "current_database", "window").subsetOf(
+      sparkSession.catalog.listFunctions().collect().map(_.name).toSet))
+    createFunction("my_func1")
+    createFunction("my_func2")
+    createTempFunction("my_temp_func")
+    val funcNames1 = sparkSession.catalog.listFunctions().collect().map(_.name).toSet
+    assert(funcNames1.contains("my_func1"))
+    assert(funcNames1.contains("my_func2"))
+    assert(funcNames1.contains("my_temp_func"))
+    dropFunction("my_func1")
+    dropTempFunction("my_temp_func")
+    val funcNames2 = sparkSession.catalog.listFunctions().collect().map(_.name).toSet
+    assert(!funcNames2.contains("my_func1"))
+    assert(funcNames2.contains("my_func2"))
+    assert(!funcNames2.contains("my_temp_func"))
+  }
+
+  test("list functions with database") {
+    assert(Set("+", "current_database", "window").subsetOf(
+      sparkSession.catalog.listFunctions("default").collect().map(_.name).toSet))
+    createDatabase("my_db1")
+    createDatabase("my_db2")
+    createFunction("my_func1", Some("my_db1"))
+    createFunction("my_func2", Some("my_db2"))
+    createTempFunction("my_temp_func")
+    val funcNames1 = sparkSession.catalog.listFunctions("my_db1").collect().map(_.name).toSet
+    val funcNames2 = sparkSession.catalog.listFunctions("my_db2").collect().map(_.name).toSet
+    assert(funcNames1.contains("my_func1"))
+    assert(!funcNames1.contains("my_func2"))
+    assert(funcNames1.contains("my_temp_func"))
+    assert(!funcNames2.contains("my_func1"))
+    assert(funcNames2.contains("my_func2"))
+    assert(funcNames2.contains("my_temp_func"))
+    dropFunction("my_func1", Some("my_db1"))
+    dropTempFunction("my_temp_func")
+    val funcNames1b = sparkSession.catalog.listFunctions("my_db1").collect().map(_.name).toSet
+    val funcNames2b = sparkSession.catalog.listFunctions("my_db2").collect().map(_.name).toSet
+    assert(!funcNames1b.contains("my_func1"))
+    assert(!funcNames1b.contains("my_temp_func"))
+    assert(funcNames2b.contains("my_func2"))
+    assert(!funcNames2b.contains("my_temp_func"))
+    val e = intercept[AnalysisException] {
+      sparkSession.catalog.listFunctions("unknown_db")
+    }
+    assert(e.getMessage.contains("unknown_db"))
+  }
+
+  test("list columns") {
+    createTable("tab1")
+    testListColumns("tab1", dbName = None)
+  }
+
+  test("list columns in database") {
+    createDatabase("db1")
+    createTable("tab1", Some("db1"))
+    testListColumns("tab1", dbName = Some("db1"))
+  }
+
+  test("Database.toString") {
+    assert(new Database("cool_db", "cool_desc", "cool_path").toString ==
+      "Database[name='cool_db', description='cool_desc', path='cool_path']")
+    assert(new Database("cool_db", null, "cool_path").toString ==
+      "Database[name='cool_db', path='cool_path']")
+  }
+
+  test("Table.toString") {
+    assert(new Table("volley", "databasa", "one", "world", isTemporary = true).toString ==
+      "Table[name='volley', database='databasa', description='one', " +
+        "tableType='world', isTemporary='true']")
+    assert(new Table("volley", null, null, "world", isTemporary = true).toString ==
+      "Table[name='volley', tableType='world', isTemporary='true']")
+  }
+
+  test("Function.toString") {
+    assert(new Function("nama", "commenta", "classNameAh", isTemporary = true).toString ==
+      "Function[name='nama', description='commenta', className='classNameAh', isTemporary='true']")
+    assert(new Function("nama", null, "classNameAh", isTemporary = false).toString ==
+      "Function[name='nama', className='classNameAh', isTemporary='false']")
+  }
+
+  test("Column.toString") {
+    assert(new Column("namama", "descaca", "datatapa",
+      nullable = true, isPartition = false, isBucket = true).toString ==
+        "Column[name='namama', description='descaca', dataType='datatapa', " +
+          "nullable='true', isPartition='false', isBucket='true']")
+    assert(new Column("namama", null, "datatapa",
+      nullable = false, isPartition = true, isBucket = true).toString ==
+      "Column[name='namama', dataType='datatapa', " +
+        "nullable='false', isPartition='true', isBucket='true']")
+  }
+
+  // TODO: add tests for the rest of them
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 01b7cfbd2e..c4db4f307c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -173,7 +173,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       // Then, if alias is specified, wrap the table with a Subquery using the alias.
       // Otherwise, wrap the table with a Subquery using the table name.
       alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
-    } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) {
+    } else if (table.tableType == CatalogTableType.VIEW) {
       val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
       alias match {
         // because hive use things like `_c0` to build the expanded text
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 367fcf13d2..5b580d0ef9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -77,10 +77,10 @@ private[hive] case class MetastoreRelation(
     catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
 
     tTable.setTableType(catalogTable.tableType match {
-      case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
-      case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
-      case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
-      case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
+      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
+      case CatalogTableType.INDEX => HiveTableType.INDEX_TABLE.toString
+      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
     })
 
     val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 6a7345f758..d651791f9c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -317,10 +317,10 @@ private[hive] class HiveClientImpl(
       CatalogTable(
         identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
         tableType = h.getTableType match {
-          case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
-          case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
-          case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
-          case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
+          case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL
+          case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED
+          case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX
+          case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW
         },
         schema = schema,
         partitionColumnNames = partCols.map(_.name),
@@ -696,13 +696,13 @@ private[hive] class HiveClientImpl(
     // Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
     // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
     hiveTable.setTableType(table.tableType match {
-      case CatalogTableType.EXTERNAL_TABLE =>
+      case CatalogTableType.EXTERNAL =>
         hiveTable.setProperty("EXTERNAL", "TRUE")
         HiveTableType.EXTERNAL_TABLE
-      case CatalogTableType.MANAGED_TABLE =>
+      case CatalogTableType.MANAGED =>
         HiveTableType.MANAGED_TABLE
-      case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
-      case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
+      case CatalogTableType.INDEX => HiveTableType.INDEX_TABLE
+      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
     })
     // Note: In Hive the schema and partition columns must be disjoint sets
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 97bd47a247..4ca5619603 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -166,7 +166,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
     tempPath.delete()
     table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
     sql("DROP TABLE IF EXISTS refreshTable")
-    createExternalTable("refreshTable", tempPath.toString, "parquet")
+    sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
     checkAnswer(
       table("refreshTable"),
       table("src").collect())
@@ -190,7 +190,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
 
     // Drop the table and create it again.
     sql("DROP TABLE refreshTable")
-    createExternalTable("refreshTable", tempPath.toString, "parquet")
+    sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
     // It is not cached.
     assert(!isCached("refreshTable"), "refreshTable should not be cached.")
     // Refresh the table. REFRESH TABLE command should not make a uncached
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index cff1127afb..ec581b681a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -70,7 +70,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(exists)
     assert(desc.identifier.database == Some("mydb"))
     assert(desc.identifier.table == "page_view")
-    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.tableType == CatalogTableType.EXTERNAL)
     assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
       CatalogColumn("viewtime", "int") ::
@@ -120,7 +120,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(exists)
     assert(desc.identifier.database == Some("mydb"))
     assert(desc.identifier.table == "page_view")
-    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.tableType == CatalogTableType.EXTERNAL)
     assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
       CatalogColumn("viewtime", "int") ::
@@ -151,7 +151,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(exists == false)
     assert(desc.identifier.database == None)
     assert(desc.identifier.table == "page_view")
-    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.tableType == CatalogTableType.MANAGED)
     assert(desc.storage.locationUri == None)
     assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == None) // TODO will be SQLText
@@ -187,7 +187,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(exists == false)
     assert(desc.identifier.database == None)
     assert(desc.identifier.table == "ctas2")
-    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.tableType == CatalogTableType.MANAGED)
     assert(desc.storage.locationUri == None)
     assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == None) // TODO will be SQLText
@@ -318,7 +318,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(!allowExisting)
     assert(desc.identifier.database.isEmpty)
     assert(desc.identifier.table == "my_table")
-    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.tableType == CatalogTableType.MANAGED)
     assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
     assert(desc.partitionColumnNames.isEmpty)
     assert(desc.sortColumnNames.isEmpty)
@@ -353,7 +353,7 @@ class HiveDDLCommandSuite extends PlanTest {
   test("create table - external") {
     val query = "CREATE EXTERNAL TABLE tab1 (id int, name string)"
     val (desc, _) = extractTableDesc(query)
-    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.tableType == CatalogTableType.EXTERNAL)
   }
 
   test("create table - if not exists") {
@@ -480,7 +480,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(allowExisting)
     assert(desc.identifier.database == Some("dbx"))
     assert(desc.identifier.table == "my_table")
-    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.tableType == CatalogTableType.EXTERNAL)
     assert(desc.schema == Seq(
       CatalogColumn("id", "int"),
       CatalogColumn("name", "string"),
@@ -506,7 +506,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(!exists)
     assert(desc.identifier.database.isEmpty)
     assert(desc.identifier.table == "view1")
-    assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
+    assert(desc.tableType == CatalogTableType.VIEW)
     assert(desc.storage.locationUri.isEmpty)
     assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == Option("SELECT * FROM tab1"))
@@ -530,7 +530,7 @@ class HiveDDLCommandSuite extends PlanTest {
     val (desc, exists) = extractTableDesc(v1)
     assert(desc.identifier.database.isEmpty)
     assert(desc.identifier.table == "view1")
-    assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
+    assert(desc.tableType == CatalogTableType.VIEW)
     assert(desc.storage.locationUri.isEmpty)
     assert(desc.schema ==
       CatalogColumn("col1", null, nullable = true, None) ::
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index cb60a2c8cf..bf9935ae41 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.client.HiveClient
 /**
  * Test suite for the [[HiveExternalCatalog]].
  */
-class HiveExternalCatalogSuite extends CatalogTestCases {
+class HiveExternalCatalogSuite extends ExternalCatalogSuite {
 
   private val client: HiveClient = {
     // We create a metastore at a temp location to avoid any potential
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index d1a1490f66..0d6a2e7394 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -90,7 +90,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
         assert(hiveTable.storage.serde === Some(serde))
 
         assert(hiveTable.partitionColumnNames.isEmpty)
-        assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
+        assert(hiveTable.tableType === CatalogTableType.MANAGED)
 
         val columns = hiveTable.schema
         assert(columns.map(_.name) === Seq("d1", "d2"))
@@ -121,7 +121,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))
 
-          assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+          assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
           assert(hiveTable.storage.locationUri ===
             Some(path.toURI.toString.stripSuffix(File.separator)))
 
@@ -153,7 +153,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.serde === Some(serde))
 
           assert(hiveTable.partitionColumnNames.isEmpty)
-          assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+          assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7cd01c9104..31ba735708 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -502,13 +502,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         }
 
         withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "json") {
-          createExternalTable("createdJsonTable", tempPath.toString)
+          sparkSession.catalog.createExternalTable("createdJsonTable", tempPath.toString)
           assert(table("createdJsonTable").schema === df.schema)
           checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
 
           assert(
             intercept[AnalysisException] {
-              createExternalTable("createdJsonTable", jsonFilePath.toString)
+              sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
             }.getMessage.contains("Table createdJsonTable already exists."),
             "We should complain that createdJsonTable already exists")
         }
@@ -520,7 +520,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         // Try to specify the schema.
         withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
           val schema = StructType(StructField("b", StringType, true) :: Nil)
-          createExternalTable(
+          sparkSession.catalog.createExternalTable(
             "createdJsonTable",
             "org.apache.spark.sql.json",
             schema,
@@ -539,7 +539,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   test("path required error") {
     assert(
       intercept[AnalysisException] {
-        createExternalTable(
+        sparkSession.catalog.createExternalTable(
           "createdJsonTable",
           "org.apache.spark.sql.json",
           Map.empty[String, String])
@@ -725,7 +725,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val schema = StructType(StructField("int", IntegerType, true) :: Nil)
       val hiveTable = CatalogTable(
         identifier = TableIdentifier(tableName, Some("default")),
-        tableType = CatalogTableType.MANAGED_TABLE,
+        tableType = CatalogTableType.MANAGED,
         schema = Seq.empty,
         storage = CatalogStorageFormat(
           locationUri = None,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 916a470aa5..9341b3816f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -149,7 +149,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
       val table =
         CatalogTable(
           identifier = TableIdentifier("src", Some("default")),
-          tableType = CatalogTableType.MANAGED_TABLE,
+          tableType = CatalogTableType.MANAGED,
           schema = Seq(CatalogColumn("key", "int")),
           storage = CatalogStorageFormat(
             locationUri = None,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fd19fcbd4e..e23272de85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -73,7 +73,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
           hiveContext.sessionState.catalog
             .getTableMetadata(TableIdentifier(tabName, Some("default")))
         // It is a managed table, although it uses external in SQL
-        assert(hiveTable.tableType == CatalogTableType.MANAGED_TABLE)
+        assert(hiveTable.tableType == CatalogTableType.MANAGED)
 
         assert(tmpDir.listFiles.nonEmpty)
         sql(s"DROP TABLE $tabName")
@@ -102,7 +102,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
           hiveContext.sessionState.catalog
             .getTableMetadata(TableIdentifier(tabName, Some("default")))
         // This data source table is external table
-        assert(hiveTable.tableType == CatalogTableType.EXTERNAL_TABLE)
+        assert(hiveTable.tableType == CatalogTableType.EXTERNAL)
 
         assert(tmpDir.listFiles.nonEmpty)
         sql(s"DROP TABLE $tabName")
@@ -166,7 +166,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
 
         val hiveTable = catalog.getTableMetadata(TableIdentifier(externalTab, Some("default")))
-        assert(hiveTable.tableType == CatalogTableType.EXTERNAL_TABLE)
+        assert(hiveTable.tableType == CatalogTableType.EXTERNAL)
         // After data insertion, all the directory are not empty
         assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
 
-- 
GitLab