diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 19a8fcdd8b75b99a36683ff6dab9725681b43c8e..002aecb9bf1330fc378d8eaf722153feae0142cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -85,6 +85,21 @@ class SessionCatalog(
   @GuardedBy("this")
   protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
 
+  /**
+   * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
+   * i.e. if this name only contains characters, numbers, and _.
+   *
+   * This method is intended to have the same behavior of
+   * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
+   */
+  private def validateName(name: String): Unit = {
+    val validNameFormat = "([\\w_]+)".r
+    if (!validNameFormat.pattern.matcher(name).matches()) {
+      throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " +
+        "Valid names only contain alphabet characters, numbers and _.")
+    }
+  }
+
   /**
    * Format table name, taking into account case sensitivity.
    */
@@ -143,6 +158,7 @@ class SessionCatalog(
         s"${globalTempViewManager.database} is a system preserved database, " +
           "you cannot create a database with this name.")
     }
+    validateName(dbName)
     val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
     externalCatalog.createDatabase(
       dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
@@ -226,6 +242,7 @@ class SessionCatalog(
   def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
     val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableDefinition.identifier.table)
+    validateName(table)
     val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
     requireDbExists(db)
     externalCatalog.createTable(newTableDefinition, ignoreIfExists)
@@ -474,6 +491,7 @@ class SessionCatalog(
       if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
         requireTableExists(TableIdentifier(oldTableName, Some(db)))
         requireTableNotExists(TableIdentifier(newTableName, Some(db)))
+        validateName(newTableName)
         externalCatalog.renameTable(db, oldTableName, newTableName)
       } else {
         if (newName.database.isDefined) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 52385de50db6bd9578021b8589c0f322419214d3..da41d3614b78411ce3713a5a04b4ed46c63c6383 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -61,6 +61,22 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.databaseExists("does_not_exist"))
   }
 
+  def testInvalidName(func: (String) => Unit) {
+    // scalastyle:off
+    // non ascii characters are not allowed in the source code, so we disable the scalastyle.
+    val name = "ç –"
+    // scalastyle:on
+    val e = intercept[AnalysisException] {
+      func(name)
+    }.getMessage
+    assert(e.contains(s"`$name` is not a valid name for tables/databases."))
+  }
+
+  test("create databases using invalid names") {
+    val catalog = new SessionCatalog(newEmptyCatalog())
+    testInvalidName(name => catalog.createDatabase(newDb(name), ignoreIfExists = true))
+  }
+
   test("get database when a database exists") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val db1 = catalog.getDatabaseMetadata("db1")
@@ -194,6 +210,11 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
   }
 
+  test("create tables using invalid names") {
+    val catalog = new SessionCatalog(newEmptyCatalog())
+    testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false))
+  }
+
   test("create table when database does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     // Creating table in non-existent database should always fail
@@ -309,6 +330,12 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
+  test("rename tables to an invalid name") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    testInvalidName(
+      name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name)))
+  }
+
   test("rename table when database/table does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[NoSuchDatabaseException] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5ba44ff9f5d9d3d55a968ca333e9ddbed0203bb3..7154e3e41c93bf4dcc3a02bf24ef2a499f5949f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -309,24 +309,9 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
 
   def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
 
-  // This regex is used to check if the table name and database name is valid for `CreateTable`.
-  private val validNameFormat = Pattern.compile("[\\w_]+")
-
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
-        // Since we are saving table metadata to metastore, we should make sure the table name
-        // and database name don't break some common restrictions, e.g. special chars except
-        // underscore are not allowed.
-        val tblIdent = tableDesc.identifier
-        if (!validNameFormat.matcher(tblIdent.table).matches()) {
-          failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " +
-            s"metastore. Metastore only accepts table name containing characters, numbers and _.")
-        }
-        if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) {
-          failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
-            s"metastore. Metastore only accepts table name containing characters, numbers and _.")
-        }
         if (query.isDefined &&
           mode == SaveMode.Overwrite &&
           catalog.tableExists(tableDesc.identifier)) {
@@ -334,7 +319,7 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation, _, _) =>
+            case LogicalRelation(dest: BaseRelation, _, _) =>
               // Get all input data source relations of the query.
               val srcRelations = query.get.collect {
                 case LogicalRelation(src: BaseRelation, _, _) => src
@@ -347,9 +332,8 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           }
         }
 
-      case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation, _, _),
-        partition, query, overwrite, ifNotExists) =>
+      case logical.InsertIntoTable(
+          l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) =>
         // Right now, we do not support insert into a data source table with partition specs.
         if (partition.nonEmpty) {
           failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
@@ -367,15 +351,15 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
         }
 
       case logical.InsertIntoTable(
-        LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) =>
+        LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) =>
         // We need to make sure the partition columns specified by users do match partition
         // columns of the relation.
         val existingPartitionColumns = r.partitionSchema.fieldNames.toSet
         val specifiedPartitionColumns = part.keySet
         if (existingPartitionColumns != specifiedPartitionColumns) {
-          failAnalysis(s"Specified partition columns " +
+          failAnalysis("Specified partition columns " +
             s"(${specifiedPartitionColumns.mkString(", ")}) " +
-            s"do not match the partition columns of the table. Please use " +
+            "do not match the partition columns of the table. Please use " +
             s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
         } else {
           // OK
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 9f4401ae225606fe72ef847e91f61704b58dc6cc..73224651092f60a05e67f445fb1198c44873ee21 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -269,17 +269,17 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
       val message = intercept[AnalysisException] {
         df.write.format("parquet").saveAsTable("`d:b`.`t:a`")
       }.getMessage
-      assert(message.contains("is not a valid name for metastore"))
+      assert(message.contains("Database 'd:b' not found"))
     }
 
     {
       val message = intercept[AnalysisException] {
         df.write.format("parquet").saveAsTable("`d:b`.`table`")
       }.getMessage
-      assert(message.contains("is not a valid name for metastore"))
+      assert(message.contains("Database 'd:b' not found"))
     }
 
-    withTempPath { dir =>
+    withTempDir { dir =>
       val path = dir.getCanonicalPath
 
       {
@@ -293,7 +293,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
             |)
             """.stripMargin)
         }.getMessage
-        assert(message.contains("is not a valid name for metastore"))
+        assert(message.contains("`t:a` is not a valid name for tables/databases. " +
+          "Valid names only contain alphabet characters, numbers and _."))
       }
 
       {
@@ -307,7 +308,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
               |)
               """.stripMargin)
         }.getMessage
-        assert(message.contains("is not a valid name for metastore"))
+        assert(message.contains("Database 'd:b' not found"))
       }
     }
   }