From 1e8fbefa3b61e2deb3dc7d7d3467e4cec69e54ce Mon Sep 17 00:00:00 2001 From: gatorsmile <gatorsmile@gmail.com> Date: Sun, 27 Nov 2016 19:43:24 -0800 Subject: [PATCH] [SPARK-18594][SQL] Name Validation of Databases/Tables ### What changes were proposed in this pull request? Currently, the name validation checks are limited to table creation. It is enfored by Analyzer rule: `PreWriteCheck`. However, table renaming and database creation have the same issues. It makes more sense to do the checks in `SessionCatalog`. This PR is to add it into `SessionCatalog`. ### How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #16018 from gatorsmile/nameValidate. (cherry picked from commit 07f32c2283e26e86474ba8c9b50125831009a1ea) Signed-off-by: gatorsmile <gatorsmile@gmail.com> --- .../sql/catalyst/catalog/SessionCatalog.scala | 18 ++++++++++++ .../catalog/SessionCatalogSuite.scala | 27 ++++++++++++++++++ .../sql/execution/datasources/rules.scala | 28 ++++--------------- .../spark/sql/hive/MultiDatabaseSuite.scala | 11 ++++---- 4 files changed, 57 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 19a8fcdd8b..002aecb9bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -85,6 +85,21 @@ class SessionCatalog( @GuardedBy("this") protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) + /** + * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), + * i.e. if this name only contains characters, numbers, and _. + * + * This method is intended to have the same behavior of + * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName. + */ + private def validateName(name: String): Unit = { + val validNameFormat = "([\\w_]+)".r + if (!validNameFormat.pattern.matcher(name).matches()) { + throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " + + "Valid names only contain alphabet characters, numbers and _.") + } + } + /** * Format table name, taking into account case sensitivity. */ @@ -143,6 +158,7 @@ class SessionCatalog( s"${globalTempViewManager.database} is a system preserved database, " + "you cannot create a database with this name.") } + validateName(dbName) val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString externalCatalog.createDatabase( dbDefinition.copy(name = dbName, locationUri = qualifiedPath), @@ -226,6 +242,7 @@ class SessionCatalog( def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) + validateName(table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) externalCatalog.createTable(newTableDefinition, ignoreIfExists) @@ -474,6 +491,7 @@ class SessionCatalog( if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { requireTableExists(TableIdentifier(oldTableName, Some(db))) requireTableNotExists(TableIdentifier(newTableName, Some(db))) + validateName(newTableName) externalCatalog.renameTable(db, oldTableName, newTableName) } else { if (newName.database.isDefined) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 52385de50d..da41d3614b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -61,6 +61,22 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.databaseExists("does_not_exist")) } + def testInvalidName(func: (String) => Unit) { + // scalastyle:off + // non ascii characters are not allowed in the source code, so we disable the scalastyle. + val name = "ç –" + // scalastyle:on + val e = intercept[AnalysisException] { + func(name) + }.getMessage + assert(e.contains(s"`$name` is not a valid name for tables/databases.")) + } + + test("create databases using invalid names") { + val catalog = new SessionCatalog(newEmptyCatalog()) + testInvalidName(name => catalog.createDatabase(newDb(name), ignoreIfExists = true)) + } + test("get database when a database exists") { val catalog = new SessionCatalog(newBasicCatalog()) val db1 = catalog.getDatabaseMetadata("db1") @@ -194,6 +210,11 @@ class SessionCatalogSuite extends SparkFunSuite { assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) } + test("create tables using invalid names") { + val catalog = new SessionCatalog(newEmptyCatalog()) + testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false)) + } + test("create table when database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) // Creating table in non-existent database should always fail @@ -309,6 +330,12 @@ class SessionCatalogSuite extends SparkFunSuite { } } + test("rename tables to an invalid name") { + val catalog = new SessionCatalog(newBasicCatalog()) + testInvalidName( + name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name))) + } + test("rename table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[NoSuchDatabaseException] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 5ba44ff9f5..7154e3e41c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -309,24 +309,9 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } - // This regex is used to check if the table name and database name is valid for `CreateTable`. - private val validNameFormat = Pattern.compile("[\\w_]+") - def apply(plan: LogicalPlan): Unit = { plan.foreach { case c @ CreateTable(tableDesc, mode, query) if c.resolved => - // Since we are saving table metadata to metastore, we should make sure the table name - // and database name don't break some common restrictions, e.g. special chars except - // underscore are not allowed. - val tblIdent = tableDesc.identifier - if (!validNameFormat.matcher(tblIdent.table).matches()) { - failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } - if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) { - failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } if (query.isDefined && mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) { @@ -334,7 +319,7 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). - case l @ LogicalRelation(dest: BaseRelation, _, _) => + case LogicalRelation(dest: BaseRelation, _, _) => // Get all input data source relations of the query. val srcRelations = query.get.collect { case LogicalRelation(src: BaseRelation, _, _) => src @@ -347,9 +332,8 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } } - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation, _, _), - partition, query, overwrite, ifNotExists) => + case logical.InsertIntoTable( + l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) => // Right now, we do not support insert into a data source table with partition specs. if (partition.nonEmpty) { failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") @@ -367,15 +351,15 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } case logical.InsertIntoTable( - LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) => + LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) => // We need to make sure the partition columns specified by users do match partition // columns of the relation. val existingPartitionColumns = r.partitionSchema.fieldNames.toSet val specifiedPartitionColumns = part.keySet if (existingPartitionColumns != specifiedPartitionColumns) { - failAnalysis(s"Specified partition columns " + + failAnalysis("Specified partition columns " + s"(${specifiedPartitionColumns.mkString(", ")}) " + - s"do not match the partition columns of the table. Please use " + + "do not match the partition columns of the table. Please use " + s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") } else { // OK diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 9f4401ae22..7322465109 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -269,17 +269,17 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val message = intercept[AnalysisException] { df.write.format("parquet").saveAsTable("`d:b`.`t:a`") }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("Database 'd:b' not found")) } { val message = intercept[AnalysisException] { df.write.format("parquet").saveAsTable("`d:b`.`table`") }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("Database 'd:b' not found")) } - withTempPath { dir => + withTempDir { dir => val path = dir.getCanonicalPath { @@ -293,7 +293,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle |) """.stripMargin) }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("`t:a` is not a valid name for tables/databases. " + + "Valid names only contain alphabet characters, numbers and _.")) } { @@ -307,7 +308,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle |) """.stripMargin) }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("Database 'd:b' not found")) } } } -- GitLab