diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index dfab2398857e827190ca2bb8bcd493248075c59c..2595e1f90c8370490c6c81494fe690577b3e1b02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -170,7 +170,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
     joinedRelation | relationFactor
 
   protected lazy val relationFactor: Parser[LogicalPlan] =
-    ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+    ( tableIdentifier ~ (opt(AS) ~> opt(ident)) ^^ {
         case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
       }
       | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
index d701559bf2d9b011d6d4cd775ceb1e178cae20fe..4d4e4ded99477577e38e2e3c7bc0df9e07a11542 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
@@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst
 /**
  * Identifies a `table` in `database`.  If `database` is not defined, the current database is used.
  */
-private[sql] case class TableIdentifier(table: String, database: Option[String] = None) {
-  def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database))
-
-  def toSeq: Seq[String] = database.toSeq :+ table
+private[sql] case class TableIdentifier(table: String, database: Option[String]) {
+  def this(table: String) = this(table, None)
 
   override def toString: String = quotedString
 
-  def quotedString: String = toSeq.map("`" + _ + "`").mkString(".")
+  def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
+
+  def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
+}
 
-  def unquotedString: String = toSeq.mkString(".")
+private[sql] object TableIdentifier {
+  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 041ab2282739985c979f7dd93f7827c23f450a56..e6046055bf0f61d59f0dd5132e8d323e991a7c9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -105,7 +105,7 @@ class Analyzer(
         // here use the CTE definition first, check table name only and ignore database name
         // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
         case u : UnresolvedRelation =>
-          val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+          val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
             val withAlias = u.alias.map(Subquery(_, relation))
             withAlias.getOrElse(relation)
           }
@@ -257,7 +257,7 @@ class Analyzer(
         catalog.lookupRelation(u.tableIdentifier, u.alias)
       } catch {
         case _: NoSuchTableException =>
-          u.failAnalysis(s"no such table ${u.tableName}")
+          u.failAnalysis(s"Table Not Found: ${u.tableName}")
       }
     }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 4cc9a5520a08509a58b8516c20c0934f6acdeb7b..8f4ce74a2ea38086cd509fa981fc90f631061ca9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -42,11 +42,9 @@ trait Catalog {
 
   val conf: CatalystConf
 
-  def tableExists(tableIdentifier: Seq[String]): Boolean
+  def tableExists(tableIdent: TableIdentifier): Boolean
 
-  def lookupRelation(
-      tableIdentifier: Seq[String],
-      alias: Option[String] = None): LogicalPlan
+  def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
 
   /**
    * Returns tuples of (tableName, isTemporary) for all tables in the given database.
@@ -56,89 +54,59 @@ trait Catalog {
 
   def refreshTable(tableIdent: TableIdentifier): Unit
 
-  // TODO: Refactor it in the work of SPARK-10104
-  def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
+  def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
 
-  // TODO: Refactor it in the work of SPARK-10104
-  def unregisterTable(tableIdentifier: Seq[String]): Unit
+  def unregisterTable(tableIdent: TableIdentifier): Unit
 
   def unregisterAllTables(): Unit
 
-  // TODO: Refactor it in the work of SPARK-10104
-  protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
-    if (conf.caseSensitiveAnalysis) {
-      tableIdentifier
-    } else {
-      tableIdentifier.map(_.toLowerCase)
-    }
-  }
-
-  // TODO: Refactor it in the work of SPARK-10104
-  protected def getDbTableName(tableIdent: Seq[String]): String = {
-    val size = tableIdent.size
-    if (size <= 2) {
-      tableIdent.mkString(".")
-    } else {
-      tableIdent.slice(size - 2, size).mkString(".")
-    }
-  }
-
-  // TODO: Refactor it in the work of SPARK-10104
-  protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
-    (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
-  }
-
   /**
-   * It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
-   * We use this method to check it.
+   * Get the table name of TableIdentifier for temporary tables.
    */
-  protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
-    if (tableIdentifier.length > 1) {
+  protected def getTableName(tableIdent: TableIdentifier): String = {
+    // It is not allowed to specify database name for temporary tables.
+    // We check it here and throw exception if database is defined.
+    if (tableIdent.database.isDefined) {
       throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
         "for temporary tables. If the table name has dots (.) in it, please quote the " +
         "table name with backticks (`).")
     }
+    if (conf.caseSensitiveAnalysis) {
+      tableIdent.table
+    } else {
+      tableIdent.table.toLowerCase
+    }
   }
 }
 
 class SimpleCatalog(val conf: CatalystConf) extends Catalog {
-  val tables = new ConcurrentHashMap[String, LogicalPlan]
-
-  override def registerTable(
-      tableIdentifier: Seq[String],
-      plan: LogicalPlan): Unit = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.put(getDbTableName(tableIdent), plan)
+  private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
+
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+    tables.put(getTableName(tableIdent), plan)
   }
 
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.remove(getDbTableName(tableIdent))
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+    tables.remove(getTableName(tableIdent))
   }
 
   override def unregisterAllTables(): Unit = {
     tables.clear()
   }
 
-  override def tableExists(tableIdentifier: Seq[String]): Boolean = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.containsKey(getDbTableName(tableIdent))
+  override def tableExists(tableIdent: TableIdentifier): Boolean = {
+    tables.containsKey(getTableName(tableIdent))
   }
 
   override def lookupRelation(
-      tableIdentifier: Seq[String],
+      tableIdent: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    val tableFullName = getDbTableName(tableIdent)
-    val table = tables.get(tableFullName)
+    val tableName = getTableName(tableIdent)
+    val table = tables.get(tableName)
     if (table == null) {
-      sys.error(s"Table Not Found: $tableFullName")
+      throw new NoSuchTableException
     }
-    val tableWithQualifiers = Subquery(tableIdent.last, table)
+    val tableWithQualifiers = Subquery(tableName, table)
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
     // properly qualified with this alias.
@@ -146,11 +114,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    val result = ArrayBuffer.empty[(String, Boolean)]
-    for (name <- tables.keySet().asScala) {
-      result += ((name, true))
-    }
-    result
+    tables.keySet().asScala.map(_ -> true).toSeq
   }
 
   override def refreshTable(tableIdent: TableIdentifier): Unit = {
@@ -165,68 +129,50 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
  * lost when the JVM exits.
  */
 trait OverrideCatalog extends Catalog {
+  private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
 
-  // TODO: This doesn't work when the database changes...
-  val overrides = new mutable.HashMap[(Option[String], String), LogicalPlan]()
-
-  abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    // A temporary tables only has a single part in the tableIdentifier.
-    val overriddenTable = if (tableIdentifier.length > 1) {
-      None: Option[LogicalPlan]
+  private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
+    if (tableIdent.database.isDefined) {
+      None
     } else {
-      overrides.get(getDBTable(tableIdent))
+      Option(overrides.get(getTableName(tableIdent)))
     }
-    overriddenTable match {
+  }
+
+  abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
+    getOverriddenTable(tableIdent) match {
       case Some(_) => true
-      case None => super.tableExists(tableIdentifier)
+      case None => super.tableExists(tableIdent)
     }
   }
 
   abstract override def lookupRelation(
-      tableIdentifier: Seq[String],
+      tableIdent: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    // A temporary tables only has a single part in the tableIdentifier.
-    val overriddenTable = if (tableIdentifier.length > 1) {
-      None: Option[LogicalPlan]
-    } else {
-      overrides.get(getDBTable(tableIdent))
-    }
-    val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
+    getOverriddenTable(tableIdent) match {
+      case Some(table) =>
+        val tableName = getTableName(tableIdent)
+        val tableWithQualifiers = Subquery(tableName, table)
 
-    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
-    // properly qualified with this alias.
-    val withAlias =
-      tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
+        // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
+        // are properly qualified with this alias.
+        alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
 
-    withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
+      case None => super.lookupRelation(tableIdent, alias)
+    }
   }
 
   abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    // We always return all temporary tables.
-    val temporaryTables = overrides.map {
-      case ((_, tableName), _) => (tableName, true)
-    }.toSeq
-
-    temporaryTables ++ super.getTables(databaseName)
+    overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
   }
 
-  override def registerTable(
-      tableIdentifier: Seq[String],
-      plan: LogicalPlan): Unit = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    overrides.put(getDBTable(tableIdent), plan)
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+    overrides.put(getTableName(tableIdent), plan)
   }
 
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
-    // A temporary tables only has a single part in the tableIdentifier.
-    // If tableIdentifier has more than one parts, it is not a temporary table
-    // and we do not need to do anything at here.
-    if (tableIdentifier.length == 1) {
-      val tableIdent = processTableIdentifier(tableIdentifier)
-      overrides.remove(getDBTable(tableIdent))
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+    if (tableIdent.database.isEmpty) {
+      overrides.remove(getTableName(tableIdent))
     }
   }
 
@@ -243,12 +189,12 @@ object EmptyCatalog extends Catalog {
 
   override val conf: CatalystConf = EmptyConf
 
-  override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+  override def tableExists(tableIdent: TableIdentifier): Boolean = {
     throw new UnsupportedOperationException
   }
 
   override def lookupRelation(
-      tableIdentifier: Seq[String],
+      tableIdent: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
     throw new UnsupportedOperationException
   }
@@ -257,15 +203,17 @@ object EmptyCatalog extends Catalog {
     throw new UnsupportedOperationException
   }
 
-  override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
     throw new UnsupportedOperationException
   }
 
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
     throw new UnsupportedOperationException
   }
 
-  override def unregisterAllTables(): Unit = {}
+  override def unregisterAllTables(): Unit = {
+    throw new UnsupportedOperationException
+  }
 
   override def refreshTable(tableIdent: TableIdentifier): Unit = {
     throw new UnsupportedOperationException
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 43ee3191935ebfc505e90bf2e557f9641a1db343..c97365003935e4f6943c8980294bf1aa8142eaf2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.errors
+import org.apache.spark.sql.catalyst.{TableIdentifier, errors}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -36,11 +36,11 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
  * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
  */
 case class UnresolvedRelation(
-    tableIdentifier: Seq[String],
+    tableIdentifier: TableIdentifier,
     alias: Option[String] = None) extends LeafNode {
 
   /** Returns a `.` separated name for this relation. */
-  def tableName: String = tableIdentifier.mkString(".")
+  def tableName: String = tableIdentifier.unquotedString
 
   override def output: Seq[Attribute] = Nil
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 699c4cc63d09a3d8dd58540bc547669fd5ea7bd7..27b3cd84b3846022127bbcd9e131910cd8b7062a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -286,7 +286,8 @@ package object dsl {
 
       def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
         InsertIntoTable(
-          analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
+          analysis.UnresolvedRelation(TableIdentifier(tableName)),
+          Map.empty, logicalPlan, overwrite, false)
 
       def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer.execute(logicalPlan))
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 820b336aac7590342326d4d6609a57ad2932c8d1..ec05cfa63c5bffd67463adfce45066fcf57d1624 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -53,32 +54,39 @@ class AnalysisSuite extends AnalysisTest {
       Project(testRelation.output, testRelation))
 
     checkAnalysis(
-      Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("TbL.a")),
+        UnresolvedRelation(TableIdentifier("TaBlE"), Some("TbL"))),
       Project(testRelation.output, testRelation))
 
     assertAnalysisError(
-      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+        TableIdentifier("TaBlE"), Some("TbL"))),
       Seq("cannot resolve"))
 
     checkAnalysis(
-      Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(
+        TableIdentifier("TaBlE"), Some("TbL"))),
       Project(testRelation.output, testRelation),
       caseSensitive = false)
 
     checkAnalysis(
-      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+        TableIdentifier("TaBlE"), Some("TbL"))),
       Project(testRelation.output, testRelation),
       caseSensitive = false)
   }
 
   test("resolve relations") {
-    assertAnalysisError(UnresolvedRelation(Seq("tAbLe"), None), Seq("Table Not Found: tAbLe"))
+    assertAnalysisError(
+      UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
 
-    checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation)
+    checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
 
-    checkAnalysis(UnresolvedRelation(Seq("tAbLe"), None), testRelation, caseSensitive = false)
+    checkAnalysis(
+      UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
 
-    checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation, caseSensitive = false)
+    checkAnalysis(
+      UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
   }
 
   test("divide should be casted into fractional types") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 53b3695a86be570141023b3dca4d17556b6fd3d2..23861ed15da61c3082137f5f66c056cc0a65adad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
 
 trait AnalysisTest extends PlanTest {
 
@@ -30,8 +31,8 @@ trait AnalysisTest extends PlanTest {
     val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
     val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
 
-    caseSensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
-    caseInsensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
+    caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
+    caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
 
     new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
       override val extendedResolutionRules = EliminateSubQueries :: Nil
@@ -67,8 +68,7 @@ trait AnalysisTest extends PlanTest {
       expectedErrors: Seq[String],
       caseSensitive: Boolean = true): Unit = {
     val analyzer = getAnalyzer(caseSensitive)
-    // todo: make sure we throw AnalysisException during analysis
-    val e = intercept[Exception] {
+    val e = intercept[AnalysisException] {
       analyzer.checkAnalysis(analyzer.execute(inputPlan))
     }
     assert(expectedErrors.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b4ad618c23e3980fe49aded4ad94182887149246..40c4ae792091822148834f98dca115a1ae09a142 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
 
 class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
   val conf = new SimpleCatalystConf(true)
@@ -47,7 +47,7 @@ class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
   val b: Expression = UnresolvedAttribute("b")
 
   before {
-    catalog.registerTable(Seq("table"), relation)
+    catalog.registerTable(TableIdentifier("table"), relation)
   }
 
   private def checkType(expression: Expression, expectedType: DataType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 97a8b6518a8322c4787d0bbfb2045c0b58980f0a..eacdea2c1e5b3e9fd7cd9c4890665fd40fa0c41e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{Logging, Partition}
+import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
 
 /**
  * :: Experimental ::
@@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * @since 1.4.0
    */
   def table(tableName: String): DataFrame = {
-    DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+    DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName)))
   }
 
   ///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 03e973666e88870e188699f20d5a2d574119cd32..764510ab4b4bdfba27da22ff604b69a546119b7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
     val overwrite = mode == SaveMode.Overwrite
     df.sqlContext.executePlan(
       InsertIntoTable(
-        UnresolvedRelation(tableIdent.toSeq),
+        UnresolvedRelation(tableIdent),
         partitions.getOrElse(Map.empty[String, Option[String]]),
         df.logicalPlan,
         overwrite,
@@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq)
+    val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3d5e35ab315ebb6a3df9991064985edc8446c162..361eb576c567ad78697ca559aac14677b91093e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -714,7 +714,7 @@ class SQLContext private[sql](
    * only during the lifetime of this instance of SQLContext.
    */
   private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
-    catalog.registerTable(Seq(tableName), df.logicalPlan)
+    catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)
   }
 
   /**
@@ -728,7 +728,7 @@ class SQLContext private[sql](
    */
   def dropTempTable(tableName: String): Unit = {
     cacheManager.tryUncacheQuery(table(tableName))
-    catalog.unregisterTable(Seq(tableName))
+    catalog.unregisterTable(TableIdentifier(tableName))
   }
 
   /**
@@ -795,7 +795,7 @@ class SQLContext private[sql](
   }
 
   private def table(tableIdent: TableIdentifier): DataFrame = {
-    DataFrame(this, catalog.lookupRelation(tableIdent.toSeq))
+    DataFrame(this, catalog.lookupRelation(tableIdent))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
index f7a88b98c0b483ae046922e8adb24aa617dd5767..446739d5b8a2cfe0775b2c796956916054704d5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
   protected lazy val describeTable: Parser[LogicalPlan] =
     (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
       case e ~ tableIdent =>
-        DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined)
+        DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
     }
 
   protected lazy val refreshTable: Parser[LogicalPlan] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31d6b75e13477d20e98d11326e575d6cb1a20291..e7deeff13dc4d988f8c1547f6c180a929590feae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -71,7 +71,6 @@ case class CreateTableUsing(
  * can analyze the logical plan that will be used to populate the table.
  * So, [[PreWriteCheck]] can detect cases that are not allowed.
  */
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
 case class CreateTableUsingAsSelect(
     tableIdent: TableIdentifier,
     provider: String,
@@ -93,7 +92,7 @@ case class CreateTempTableUsing(
     val resolved = ResolvedDataSource(
       sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
     sqlContext.catalog.registerTable(
-      tableIdent.toSeq,
+      tableIdent,
       DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
 
     Seq.empty[Row]
@@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect(
     val df = DataFrame(sqlContext, query)
     val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
     sqlContext.catalog.registerTable(
-      tableIdent.toSeq,
+      tableIdent,
       DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
 
     Seq.empty[Row]
@@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
 
     // If this table is cached as a InMemoryColumnarRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq)
+    val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
     val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 8efc8016f94dd9fc206627b459a5bf0f81cb5018..b00e5680fef9e69f66603274658d911485cf28dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
       case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table of
         // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) {
+        if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) {
           // Need to remove SubQuery operator.
-          EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
+          EliminateSubQueries(catalog.lookupRelation(tableIdent)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
             case l @ LogicalRelation(dest: BaseRelation, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 356d4ff3fa83760ec4ff1d87a700548c01dab671..fd566c8276bc14d0f75e30e255b34782215dda62 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.PhysicalRDD
 
 import scala.concurrent.duration._
@@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
     testData.select('key).registerTempTable("t1")
     sqlContext.table("t1")
     sqlContext.dropTempTable("t1")
-    assert(
-      intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+    intercept[NoSuchTableException](sqlContext.table("t1"))
   }
 
   test("Drops cached temporary table") {
@@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
     assert(sqlContext.isCached("t2"))
 
     sqlContext.dropTempTable("t1")
-    assert(
-      intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+    intercept[NoSuchTableException](sqlContext.table("t1"))
     assert(!sqlContext.isCached("t2"))
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7a027e13089e30a6406b1679b226ec6c43ffc554..b1fb06815868cb7e917722fa9d4aec566eefa494 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     upperCaseData.where('N <= 4).registerTempTable("left")
     upperCaseData.where('N >= 3).registerTempTable("right")
 
-    val left = UnresolvedRelation(Seq("left"), None)
-    val right = UnresolvedRelation(Seq("right"), None)
+    val left = UnresolvedRelation(TableIdentifier("left"), None)
+    val right = UnresolvedRelation(TableIdentifier("right"), None)
 
     checkAnswer(
       left.join(right, $"left.N" === $"right.N", "full"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index eab0fbb196eb64dbe9679f29ca4e864adee309af..5688f46e5e3d4f64abcf4aaa84434b95a4ed1d4c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.TableIdentifier
 
 class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
   import testImplicits._
@@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
   }
 
   after {
-    sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
   }
 
   test("get all tables") {
@@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 
@@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index cc02ef81c9f8bd2ae060ee77aa58a7ffc5e5b728..baff7f5752a75ba70b7322426f0124a3626d8818 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
 import org.apache.spark.sql.test.SharedSQLContext
@@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
     }
-    sqlContext.catalog.unregisterTable(Seq("tmp"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("overwriting") {
@@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
     }
-    sqlContext.catalog.unregisterTable(Seq("tmp"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("self-join") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e620d7fb82af902a76f9c30a1c4e6bd48abc016f..4d8a3f728e6b5cebd3e679ce7927898615e09cb2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -358,7 +358,7 @@ class HiveContext private[hive](
   @Experimental
   def analyze(tableName: String) {
     val tableIdent = SqlParser.parseTableIdentifier(tableName)
-    val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq))
+    val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent))
 
     relation match {
       case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1f8223e1ff507c30c9961ceb0310b52c23fc5d13..5819cb9d0877896f67167ddcd03a74122e87fcba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.execution.{FileRelation, datasources}
@@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
   /** Usages should lock on `this`. */
   protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
 
-  // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
   /** A fully qualified identifier for a table (i.e., database.tableName) */
-  case class QualifiedTableName(database: String, name: String) {
-    def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+  case class QualifiedTableName(database: String, name: String)
+
+  private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+    QualifiedTableName(
+      tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+      tableIdent.table.toLowerCase)
+  }
+
+  private def getQualifiedTableName(hiveTable: HiveTable) = {
+    QualifiedTableName(
+      hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+      hiveTable.name.toLowerCase)
   }
 
   /** A cache of Spark SQL data source tables that have been accessed. */
@@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
   }
 
   def invalidateTable(tableIdent: TableIdentifier): Unit = {
-    val databaseName = tableIdent.database.getOrElse(client.currentDatabase)
-    val tableName = tableIdent.table
-
-    cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
-  }
-
-  val caseSensitive: Boolean = false
-
-  /**
-   * Creates a data source table (a table created with USING clause) in Hive's metastore.
-   * Returns true when the table has been created. Otherwise, false.
-   */
-  // TODO: Remove this in SPARK-10104.
-  def createDataSourceTable(
-      tableName: String,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      provider: String,
-      options: Map[String, String],
-      isExternal: Boolean): Unit = {
-    createDataSourceTable(
-      SqlParser.parseTableIdentifier(tableName),
-      userSpecifiedSchema,
-      partitionColumns,
-      provider,
-      options,
-      isExternal)
+    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
   }
 
   def createDataSourceTable(
@@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       provider: String,
       options: Map[String, String],
       isExternal: Boolean): Unit = {
-    val (dbName, tblName) = {
-      val database = tableIdent.database.getOrElse(client.currentDatabase)
-      processDatabaseAndTableName(database, tableIdent.table)
-    }
+    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
 
     val tableProperties = new mutable.HashMap[String, String]
     tableProperties.put("spark.sql.sources.provider", provider)
@@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
 
     // TODO: Support persisting partitioned data source relations in Hive compatible format
     val qualifiedTableName = tableIdent.quotedString
-    val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
       case (Some(serde), relation: HadoopFsRelation)
         if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
         val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
@@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         (None, message)
     }
 
-    (hiveCompitiableTable, logMessage) match {
+    (hiveCompatibleTable, logMessage) match {
       case (Some(table), message) =>
-        // We first try to save the metadata of the table in a Hive compatiable way.
+        // We first try to save the metadata of the table in a Hive compatible way.
         // If Hive throws an error, we fall back to save its metadata in the Spark SQL
         // specific way.
         try {
@@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     }
   }
 
-  def hiveDefaultTableFilePath(tableName: String): String = {
-    hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName))
-  }
-
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
-    val database = tableIdent.database.getOrElse(client.currentDatabase)
-
-    new Path(
-      new Path(client.getDatabase(database).location),
-      tableIdent.table.toLowerCase).toString
+    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+    new Path(new Path(client.getDatabase(dbName).location), tblName).toString
   }
 
-  def tableExists(tableIdentifier: Seq[String]): Boolean = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    val databaseName =
-      tableIdent
-        .lift(tableIdent.size - 2)
-        .getOrElse(client.currentDatabase)
-    val tblName = tableIdent.last
-    client.getTableOption(databaseName, tblName).isDefined
+  override def tableExists(tableIdent: TableIdentifier): Boolean = {
+    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+    client.getTableOption(dbName, tblName).isDefined
   }
 
-  def lookupRelation(
-      tableIdentifier: Seq[String],
+  override def lookupRelation(
+      tableIdent: TableIdentifier,
       alias: Option[String]): LogicalPlan = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
-      client.currentDatabase)
-    val tblName = tableIdent.last
-    val table = client.getTable(databaseName, tblName)
+    val qualifiedTableName = getQualifiedTableName(tableIdent)
+    val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
 
     if (table.properties.get("spark.sql.sources.provider").isDefined) {
-      val dataSourceTable =
-        cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
+      val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable)
       // Then, if alias is specified, wrap the table with a Subquery using the alias.
       // Otherwise, wrap the table with a Subquery using the table name.
-      val withAlias =
-        alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
-          Subquery(tableIdent.last, dataSourceTable))
-
-      withAlias
+      alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
     } else if (table.tableType == VirtualView) {
       val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
       alias match {
@@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText))
       }
     } else {
-      MetastoreRelation(databaseName, tblName, alias)(table)(hive)
+      MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
     }
   }
 
@@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     client.listTables(db).map(tableName => (tableName, false))
   }
 
-  protected def processDatabaseAndTableName(
-      databaseName: Option[String],
-      tableName: String): (Option[String], String) = {
-    if (!caseSensitive) {
-      (databaseName.map(_.toLowerCase), tableName.toLowerCase)
-    } else {
-      (databaseName, tableName)
-    }
-  }
-
-  protected def processDatabaseAndTableName(
-      databaseName: String,
-      tableName: String): (String, String) = {
-    if (!caseSensitive) {
-      (databaseName.toLowerCase, tableName.toLowerCase)
-    } else {
-      (databaseName, tableName)
-    }
-  }
-
   /**
    * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
    * data source relations for better performance.
@@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
               "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
           }
 
-          val (dbName, tblName) = processDatabaseAndTableName(
-            table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+          val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
           execution.CreateViewAsSelect(
             table.copy(
@@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
           CreateTableUsingAsSelect(
             TableIdentifier(desc.name),
-            hive.conf.defaultDataSourceName,
+            conf.defaultDataSourceName,
             temporary = false,
             Array.empty[String],
             mode,
@@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
             table
           }
 
-          val (dbName, tblName) =
-            processDatabaseAndTableName(
-              desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+          val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
           execution.CreateTableAsSelect(
             desc.copy(
@@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
    * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
    * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
    */
-  override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
     throw new UnsupportedOperationException
   }
 
@@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
    * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
    * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
    */
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
     throw new UnsupportedOperationException
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 1d505019400bc9562d06eb1690b6cf01ff10cde9..d4ff5cc0f12a2ccf8b0348151d88be3e8b06fdd7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{logical, _}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveShim._
@@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging {
       throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ")
   }
 
-  protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
-    val (db, tableName) =
-      tableNameParts.getChildren.asScala.map {
-        case Token(part, Nil) => cleanIdentifier(part)
-      } match {
-        case Seq(tableOnly) => (None, tableOnly)
-        case Seq(databaseName, table) => (Some(databaseName), table)
-      }
-
-    (db, tableName)
-  }
-
-  protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+  protected def extractTableIdent(tableNameParts: Node): TableIdentifier = {
     tableNameParts.getChildren.asScala.map {
       case Token(part, Nil) => cleanIdentifier(part)
     } match {
-      case Seq(tableOnly) => Seq(tableOnly)
-      case Seq(databaseName, table) => Seq(databaseName, table)
+      case Seq(tableOnly) => TableIdentifier(tableOnly)
+      case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName))
       case other => sys.error("Hive only supports tables names like 'tableName' " +
         s"or 'databaseName.tableName', found '$other'")
     }
@@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       properties: Map[String, String],
       allowExist: Boolean,
       replace: Boolean): CreateViewAsSelect = {
-    val (db, viewName) = extractDbNameTableName(viewNameParts)
+    val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
 
     val originalText = context.getTokenRewriteStream
       .toString(query.getTokenStartIndex, query.getTokenStopIndex)
 
     val tableDesc = HiveTable(
-      specifiedDatabase = db,
+      specifiedDatabase = dbName,
       name = viewName,
       schema = schema,
       partitionColumns = Seq.empty[HiveColumn],
@@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
               case tableName =>
                 // It is describing a table with the format like "describe table".
                 DescribeCommand(
-                  UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined)
+                  UnresolvedRelation(TableIdentifier(tableName.getText), None),
+                  isExtended = extended.isDefined)
             }
           }
           // All other cases.
@@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             "TOK_TABLELOCATION",
             "TOK_TABLEPROPERTIES"),
           children)
-      val (db, tableName) = extractDbNameTableName(tableNameParts)
+      val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
 
       // TODO add bucket support
       var tableDesc: HiveTable = HiveTable(
-        specifiedDatabase = db,
-        name = tableName,
+        specifiedDatabase = dbName,
+        name = tblName,
         schema = Seq.empty[HiveColumn],
         partitionColumns = Seq.empty[HiveColumn],
         properties = Map[String, String](),
@@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           nonAliasClauses)
       }
 
-      val tableIdent =
-        tableNameParts.getChildren.asScala.map {
-          case Token(part, Nil) => cleanIdentifier(part)
-        } match {
-          case Seq(tableOnly) => Seq(tableOnly)
-          case Seq(databaseName, table) => Seq(databaseName, table)
-          case other => sys.error("Hive only supports tables names like 'tableName' " +
-            s"or 'databaseName.tableName', found '$other'")
-      }
+      val tableIdent = extractTableIdent(tableNameParts)
       val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
       val relation = UnresolvedRelation(tableIdent, alias)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 8422287e177e50541cd3625c168e983b91d8da43..e72a60b42e653bb5295e04a19d4391b489206fb5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
@@ -37,8 +38,7 @@ case class CreateTableAsSelect(
     allowExisting: Boolean)
   extends RunnableCommand {
 
-  def database: String = tableDesc.database
-  def tableName: String = tableDesc.name
+  val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
 
   override def children: Seq[LogicalPlan] = Seq(query)
 
@@ -72,18 +72,18 @@ case class CreateTableAsSelect(
       hiveContext.catalog.client.createTable(withSchema)
 
       // Get the Metastore Relation
-      hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {
+      hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
         case r: MetastoreRelation => r
       }
     }
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    if (hiveContext.catalog.tableExists(Seq(database, tableName))) {
+    if (hiveContext.catalog.tableExists(tableIdentifier)) {
       if (allowExisting) {
         // table already exists, will do nothing, to keep consistent with Hive
       } else {
-        throw new AnalysisException(s"$database.$tableName already exists.")
+        throw new AnalysisException(s"$tableIdentifier already exists.")
       }
     } else {
       hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
   }
 
   override def argString: String = {
-    s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]"
+    s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 2b504ac974f071433883b5531330c5e47e7d6e22..2c81115ee4fed9b1424bb818cb80081d75425f18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
@@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect(
   assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
   assert(tableDesc.viewText.isDefined)
 
+  val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
-    val database = tableDesc.database
-    val viewName = tableDesc.name
 
-    if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+    if (hiveContext.catalog.tableExists(tableIdentifier)) {
       if (allowExisting) {
         // view already exists, will do nothing, to keep consistent with Hive
       } else if (orReplace) {
         hiveContext.catalog.client.alertView(prepareTable())
       } else {
-        throw new AnalysisException(s"View $database.$viewName already exists. " +
+        throw new AnalysisException(s"View $tableIdentifier already exists. " +
           "If you want to update the view definition, please use ALTER VIEW AS or " +
           "CREATE OR REPLACE VIEW AS")
       }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 51ec92afd06ed6d0ba6a518a4d1a3c8b3989f452..94210a5394f9b044f1b4657fe762cf6cfac12761 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
     }
     hiveContext.invalidateTable(tableName)
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
-    hiveContext.catalog.unregisterTable(Seq(tableName))
+    hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
     Seq.empty[Row]
   }
 }
@@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand {
   }
 }
 
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
 private[hive]
 case class CreateMetastoreDataSource(
     tableIdent: TableIdentifier,
@@ -131,7 +130,7 @@ case class CreateMetastoreDataSource(
     val tableName = tableIdent.unquotedString
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
 
-    if (hiveContext.catalog.tableExists(tableIdent.toSeq)) {
+    if (hiveContext.catalog.tableExists(tableIdent)) {
       if (allowExisting) {
         return Seq.empty[Row]
       } else {
@@ -160,7 +159,6 @@ case class CreateMetastoreDataSource(
   }
 }
 
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
 private[hive]
 case class CreateMetastoreDataSourceAsSelect(
     tableIdent: TableIdentifier,
@@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect(
       }
 
     var existingSchema = None: Option[StructType]
-    if (sqlContext.catalog.tableExists(tableIdent.toSeq)) {
+    if (sqlContext.catalog.tableExists(tableIdent)) {
       // Check if we need to throw an exception or just return.
       mode match {
         case SaveMode.ErrorIfExists =>
@@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect(
           val resolved = ResolvedDataSource(
             sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
-          EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
+          EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
               if (l.relation != createdRelation.relation) {
                 val errorDescription =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ff39ccb7c1ea53491be8fef34eeee2cdee4b5656..6883d305cbeade4da457892b94fec16978b31ba9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       // Make sure any test tables referenced are loaded.
       val referencedTables =
         describedTables ++
-        logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
+        logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
       val referencedTestTables = referencedTables.filter(testTables.contains)
       logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
       referencedTestTables.foreach(loadTestTable)
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index c8d272794d10b1cd4b478b9068898fa054c5f036..8c4af1b8eaf449445f91f34d5aadd03cf8e06303 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.SaveMode;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.util.Utils;
 
 public class JavaMetastoreDataSourcesSuite {
@@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite {
     if (path.exists()) {
       path.delete();
     }
-    hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
+    hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+      new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
     if (fs.exists(hiveManagedPath)){
       fs.delete(hiveManagedPath, true);
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 579631df772b52eb35a4bcfa2d8ab626833165a7..183aca29cf98d47e56a80b65e5256013fe101d9e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.Row
 
@@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
 
   override def beforeAll(): Unit = {
     // The catalog in HiveContext is a case insensitive one.
-    catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan)
+    catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
     sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
     sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
     sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
   }
 
   override def afterAll(): Unit = {
-    catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
     sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
     sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d3565380005a0a48392d03a3ed7cd7a6c83cb970..d2928876887bdcd1955b4841424aed5c5982a45a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.util.Utils
 
 /**
@@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
            |)
          """.stripMargin)
 
-      val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+      val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
       val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
       if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           // Drop table will also delete the data.
           sql("DROP TABLE savedJsonTable")
           intercept[IOException] {
-            read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+            read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
           }
         }
 
@@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
         // Manually create a metastore data source table.
         catalog.createDataSourceTable(
-          tableName = "wide_schema",
+          tableIdent = TableIdentifier("wide_schema"),
           userSpecifiedSchema = Some(schema),
           partitionColumns = Array.empty[String],
           provider = "json",
@@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           "EXTERNAL" -> "FALSE"),
         tableType = ManagedTable,
         serdeProperties = Map(
-          "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+          "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
 
       catalog.client.createTable(hiveTable)
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6a692d6fce5629708a8160a508f9db63adbe2cc5..9bb32f11b76bd2d4fce1a33d972041af3e3ae5b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{Row, SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
+      hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     intercept[UnsupportedOperationException] {
       hiveContext.analyze("tempTable")
     }
-    hiveContext.catalog.unregisterTable(Seq("tempTable"))
+    hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
   }
 
   test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6aa34605b05a8ae37a257f66dc100afdaf32416c..c929ba50680bc55ea5b7ec6e97898341f51d6e5a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.DefaultParserDialect
+import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect}
 import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries}
 import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
   test("CTAS without serde") {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
-      val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+      val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
       relation match {
         case LogicalRelation(r: ParquetRelation, _) =>
           if (!isDataSourceParquet) {
@@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     (1 to 100).par.map { i =>
       val tableName = s"SPARK_6618_table_$i"
       sql(s"CREATE TABLE $tableName (col1 string)")
-      catalog.lookupRelation(Seq(tableName))
+      catalog.lookupRelation(TableIdentifier(tableName))
       table(tableName)
       tables()
       sql(s"DROP TABLE $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 5eb39b11297019e5c3e4b3b982fecd0b4cff6a9e..7efeab528c1dd2c02b518599dc4bb2b77d6a7636 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 
@@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
-    catalog.unregisterTable(Seq("tmp"))
+    catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("overwriting") {
@@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), data.map(Row.fromTuple))
     }
-    catalog.unregisterTable(Seq("tmp"))
+    catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("self-join") {