diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 54365fd978ab932b2f0dc0ea317e1a440b43d061..19f8665383315667560578c62e2859c5edd5cd35 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -162,6 +163,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(actual.tableType === CatalogTableType.EXTERNAL)
   }
 
+  test("create table when the table already exists") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    val table = newTable("tbl1", "db2")
+    intercept[TableAlreadyExistsException] {
+      catalog.createTable(table, ignoreIfExists = false)
+    }
+  }
+
   test("drop table") {
     val catalog = newBasicCatalog()
     assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7f50e38d30c9ac96b90d0963b4a0da8a2ea0cf17..ed87ac3c3e634dcfa8c043a97cb2d5542c7a35bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
@@ -171,9 +172,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       ignoreIfExists: Boolean): Unit = withClient {
     assert(tableDefinition.identifier.database.isDefined)
     val db = tableDefinition.identifier.database.get
+    val table = tableDefinition.identifier.table
     requireDbExists(db)
     verifyTableProperties(tableDefinition)
 
+    if (tableExists(db, table) && !ignoreIfExists) {
+      throw new TableAlreadyExistsException(db = db, table = table)
+    }
     // Before saving data source table metadata into Hive metastore, we should:
     //  1. Put table schema, partition column names and bucket specification in table properties.
     //  2. Check if this table is hive compatible
@@ -450,7 +455,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
-    client.getTableOption(db, table).isDefined
+    client.tableExists(db, table)
   }
 
   override def listTables(db: String): Seq[String] = withClient {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 6f009d714bef4329fcc80bd3cc9185f5ebf40b31..dc74fa257aa4d1b8ca4b0720a85fbeeb3fc8e102 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -68,6 +68,9 @@ private[hive] trait HiveClient {
   /** List the names of all the databases that match the specified pattern. */
   def listDatabases(pattern: String): Seq[String]
 
+  /** Return whether a table/view with the specified name exists. */
+  def tableExists(dbName: String, tableName: String): Boolean
+
   /** Returns the specified table, or throws [[NoSuchTableException]]. */
   final def getTable(dbName: String, tableName: String): CatalogTable = {
     getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b45ad30dcae410914081b16728e5e645d9a17a5a..dd982192a383726f7070cac890852bbca83ca5f0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -347,6 +347,10 @@ private[hive] class HiveClientImpl(
     client.getDatabasesByPattern(pattern).asScala
   }
 
+  override def tableExists(dbName: String, tableName: String): Boolean = withHiveState {
+    Option(client.getTable(dbName, tableName, false /* do not throw exception */)).nonEmpty
+  }
+
   override def getTableOption(
       dbName: String,
       tableName: String): Option[CatalogTable] = withHiveState {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a2509f2a75f4746109f6f3bb33472617d8dbdb81..10b6cd102416cf66feb4e024e29cb9bc87163bb9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -218,6 +218,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
         holdDDLTime = false)
     }
 
+    test(s"$version: tableExists") {
+      // No exception should be thrown
+      assert(client.tableExists("default", "src"))
+      assert(!client.tableExists("default", "nonexistent"))
+    }
+
     test(s"$version: getTable") {
       // No exception should be thrown
       client.getTable("default", "src")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9019333d76869a2c0dcb61066481a3b6d6b0097f..58c43ebcae6fc7f7a9811dbff0a910d13c806020 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -24,8 +24,10 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -675,6 +677,37 @@ class HiveDDLSuite
     }
   }
 
+  test("create table with the same name as an index table") {
+    val tabName = "tab1"
+    val indexName = tabName + "_index"
+    withTable(tabName) {
+      // Spark SQL does not support creating index. Thus, we have to use Hive client.
+      val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+      sql(s"CREATE TABLE $tabName(a int)")
+
+      try {
+        client.runSqlHive(
+          s"CREATE INDEX $indexName ON TABLE $tabName (a) AS 'COMPACT' WITH DEFERRED REBUILD")
+        val indexTabName =
+          spark.sessionState.catalog.listTables("default", s"*$indexName*").head.table
+        intercept[TableAlreadyExistsException] {
+          sql(s"CREATE TABLE $indexTabName(b int)")
+        }
+        intercept[TableAlreadyExistsException] {
+          sql(s"ALTER TABLE $tabName RENAME TO $indexTabName")
+        }
+
+        // When tableExists is not invoked, we still can get an AnalysisException
+        val e = intercept[AnalysisException] {
+          sql(s"DESCRIBE $indexTabName")
+        }.getMessage
+        assert(e.contains("Hive index table is not supported."))
+      } finally {
+        client.runSqlHive(s"DROP INDEX IF EXISTS $indexName ON $tabName")
+      }
+    }
+  }
+
   test("desc table for data source table - no user-defined schema") {
     Seq("parquet", "json", "orc").foreach { fileFormat =>
       withTable("t1") {