From d5ec5dbb0dc0358b0394626c80781e422f9af581 Mon Sep 17 00:00:00 2001 From: gatorsmile <gatorsmile@gmail.com> Date: Tue, 20 Sep 2016 20:11:48 +0800 Subject: [PATCH] [SPARK-17502][SQL] Fix Multiple Bugs in DDL Statements on Temporary Views ### What changes were proposed in this pull request? - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for partition-related ALTER TABLE commands. However, it always reports a confusing error message. For example, ``` Partition spec is invalid. The spec (a, b) must match the partition spec () defined in table '`testview`'; ``` - When the permanent tables/views do not exist but the temporary view exists, the expected error should be `NoSuchTableException` for `ALTER TABLE ... UNSET TBLPROPERTIES`. However, it reports a missing table property. For example, ``` Attempted to unset non-existent property 'p' in table '`testView`'; ``` - When `ANALYZE TABLE` is called on a view or a temporary view, we should issue an error message. However, it reports a strange error: ``` ANALYZE TABLE is not supported for Project ``` - When inserting into a temporary view that is generated from `Range`, we will get the following error message: ``` assertion failed: No plan for 'InsertIntoTable Range (0, 10, step=1, splits=Some(1)), false, false +- Project [1 AS 1#20] +- OneRowRelation$ ``` This PR is to fix the above four issues. ### How was this patch tested? Added multiple test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #15054 from gatorsmile/tempViewDDL. --- .../sql/catalyst/analysis/CheckAnalysis.scala | 1 + .../sql/catalyst/catalog/SessionCatalog.scala | 53 ++++---- .../catalog/SessionCatalogSuite.scala | 21 +--- .../command/AnalyzeTableCommand.scala | 5 +- .../spark/sql/execution/command/ddl.scala | 30 ++--- .../spark/sql/execution/command/tables.scala | 113 ++++++++---------- .../spark/sql/internal/CatalogImpl.scala | 7 +- .../sql/execution/command/DDLSuite.scala | 4 +- .../sql/hive/execution/HiveCommandSuite.scala | 17 ++- .../sql/hive/execution/HiveDDLSuite.scala | 6 +- .../sql/hive/execution/SQLViewSuite.scala | 63 ++++++---- 11 files changed, 164 insertions(+), 156 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e07e9194be..9c06069f24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -360,6 +360,7 @@ trait CheckAnalysis extends PredicateHelper { case InsertIntoTable(t, _, _, _, _) if !t.isInstanceOf[LeafNode] || + t.isInstanceOf[Range] || t == OneRowRelation || t.isInstanceOf[LocalRelation] => failAnalysis(s"Inserting into an RDD-based table is not allowed.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 574c3d7eee..ef29c75c01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -246,27 +246,16 @@ class SessionCatalog( } /** - * Retrieve the metadata of an existing metastore table. - * If no database is specified, assume the table is in the current database. - * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown. + * Retrieve the metadata of an existing permanent table/view. If no database is specified, + * assume the table/view is in the current database. If the specified table/view is not found + * in the database then a [[NoSuchTableException]] is thrown. */ def getTableMetadata(name: TableIdentifier): CatalogTable = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) - val tid = TableIdentifier(table) - if (isTemporaryTable(name)) { - CatalogTable( - identifier = tid, - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = tempTables(table).output.toStructType, - properties = Map(), - viewText = None) - } else { - requireDbExists(db) - requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.getTable(db, table) - } + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) + externalCatalog.getTable(db, table) } /** @@ -281,6 +270,24 @@ class SessionCatalog( externalCatalog.getTableOption(db, table) } + /** + * Retrieve the metadata of an existing temporary view or permanent table/view. + * If the temporary view does not exist, tries to get the metadata an existing permanent + * table/view. If no database is specified, assume the table/view is in the current database. + * If the specified table/view is not found in the database then a [[NoSuchTableException]] is + * thrown. + */ + def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized { + val table = formatTableName(name) + getTempView(table).map { plan => + CatalogTable( + identifier = TableIdentifier(table), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = plan.output.toStructType) + }.getOrElse(getTableMetadata(TableIdentifier(name))) + } + /** * Load files stored in given path into an existing metastore table. * If no database is specified, assume the table is in the current database. @@ -530,11 +537,11 @@ class SessionCatalog( tableName: TableIdentifier, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { - requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } @@ -547,11 +554,11 @@ class SessionCatalog( specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean): Unit = { - requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge) } @@ -566,12 +573,12 @@ class SessionCatalog( specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { val tableMetadata = getTableMetadata(tableName) - requireExactMatchedPartitionSpec(specs, tableMetadata) - requireExactMatchedPartitionSpec(newSpecs, tableMetadata) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + requireExactMatchedPartitionSpec(specs, tableMetadata) + requireExactMatchedPartitionSpec(newSpecs, tableMetadata) externalCatalog.renamePartitions(db, table, specs, newSpecs) } @@ -585,11 +592,11 @@ class SessionCatalog( * this becomes a no-op. */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { - requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) externalCatalog.alterPartitions(db, table, parts) } @@ -598,11 +605,11 @@ class SessionCatalog( * If no database is specified, assume the table is in the current database. */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { - requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) externalCatalog.getPartition(db, table, spec) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 84b77ad250..384a730861 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -444,27 +444,16 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("view1", Some("default")))) } - test("getTableMetadata on temporary views") { + test("getTempViewOrPermanentTableMetadata on temporary views") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10) - val m = intercept[AnalysisException] { - catalog.getTableMetadata(TableIdentifier("view1")) - }.getMessage - assert(m.contains("Table or view 'view1' not found in database 'default'")) - - val m2 = intercept[AnalysisException] { - catalog.getTableMetadata(TableIdentifier("view1", Some("default"))) + intercept[NoSuchTableException] { + catalog.getTempViewOrPermanentTableMetadata("view1") }.getMessage - assert(m2.contains("Table or view 'view1' not found in database 'default'")) catalog.createTempView("view1", tempTable, overrideIfExists = false) - assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1") - assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id") - - val m3 = intercept[AnalysisException] { - catalog.getTableMetadata(TableIdentifier("view1", Some("default"))) - }.getMessage - assert(m3.contains("Table or view 'view1' not found in database 'default'")) + assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier == + TableIdentifier("view1"), "the temporary view `view1` should exist") } test("list tables without pattern") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 15687ddd72..40aecafecf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -22,6 +22,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -37,7 +38,9 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend override def run(sparkSession: SparkSession): Seq[Row] = { val sessionState = sparkSession.sessionState val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentwithDB = TableIdentifier(tableIdent.table, Some(db)) + val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentwithDB)) relation match { case relation: CatalogRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index c0ccdca98e..b57b2d280d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -264,7 +264,7 @@ case class AlterTableUnsetPropertiesCommand( propKeys.foreach { k => if (!table.properties.contains(k)) { throw new AnalysisException( - s"Attempted to unset non-existent property '$k' in table '$tableName'") + s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") } } } @@ -317,11 +317,11 @@ case class AlterTableSerDePropertiesCommand( catalog.alterTable(newTable) } else { val spec = partSpec.get - val part = catalog.getPartition(tableName, spec) + val part = catalog.getPartition(table.identifier, spec) val newPart = part.copy(storage = part.storage.copy( serde = serdeClassName.orElse(part.storage.serde), properties = part.storage.properties ++ serdeProperties.getOrElse(Map()))) - catalog.alterPartitions(tableName, Seq(newPart)) + catalog.alterPartitions(table.identifier, Seq(newPart)) } Seq.empty[Row] } @@ -358,7 +358,7 @@ case class AlterTableAddPartitionCommand( // inherit table storage format (possibly except for location) CatalogTablePartition(spec, table.storage.copy(locationUri = location)) } - catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists) + catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) Seq.empty[Row] } @@ -422,7 +422,7 @@ case class AlterTableDropPartitionCommand( throw new AnalysisException( "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API") } - catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge) + catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge) Seq.empty[Row] } @@ -471,26 +471,20 @@ case class AlterTableRecoverPartitionsCommand( override def run(spark: SparkSession): Seq[Row] = { val catalog = spark.sessionState.catalog - if (!catalog.tableExists(tableName)) { - throw new AnalysisException(s"Table $tableName in $cmd does not exist.") - } - if (catalog.isTemporaryTable(tableName)) { - throw new AnalysisException( - s"Operation not allowed: $cmd on temporary tables: $tableName") - } val table = catalog.getTableMetadata(tableName) + val tableIdentWithDB = table.identifier.quotedString DDLUtils.verifyAlterTableType(catalog, table, isView = false) if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( - s"Operation not allowed: $cmd on datasource tables: $tableName") + s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB") } if (table.partitionColumnNames.isEmpty) { throw new AnalysisException( - s"Operation not allowed: $cmd only works on partitioned tables: $tableName") + s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") } if (table.storage.locationUri.isEmpty) { - throw new AnalysisException( - s"Operation not allowed: $cmd only works on table with location provided: $tableName") + throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + + s"location provided: $tableIdentWithDB") } val root = new Path(table.storage.locationUri.get) @@ -659,7 +653,7 @@ case class AlterTableSetLocationCommand( partitionSpec match { case Some(spec) => // Partition spec is specified, so we set the location only for this partition - val part = catalog.getPartition(tableName, spec) + val part = catalog.getPartition(table.identifier, spec) val newPart = if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( @@ -668,7 +662,7 @@ case class AlterTableSetLocationCommand( } else { part.copy(storage = part.storage.copy(locationUri = Some(location))) } - catalog.alterPartitions(tableName, Seq(newPart)) + catalog.alterPartitions(table.identifier, Seq(newPart)) case None => // No partition spec is specified, so we set the location for the table itself val newTable = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 60e6b5db62..94b46c5d97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.PartitioningUtils -import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -65,7 +64,11 @@ case class CreateTableLikeCommand( s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'") } - val sourceTableDesc = catalog.getTableMetadata(sourceTable) + val sourceTableDesc = if (sourceTable.database.isDefined) { + catalog.getTableMetadata(sourceTable) + } else { + catalog.getTempViewOrPermanentTableMetadata(sourceTable.table) + } // Storage format val newStorage = @@ -158,14 +161,13 @@ case class AlterTableRenameCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(oldName) - DDLUtils.verifyAlterTableType(catalog, table, isView) // If this is a temp view, just rename the view. // Otherwise, if this is a real table, we also need to uncache and invalidate the table. - val isTemporary = catalog.isTemporaryTable(oldName) - if (isTemporary) { + if (catalog.isTemporaryTable(oldName)) { catalog.renameTable(oldName, newName) } else { + val table = catalog.getTableMetadata(oldName) + DDLUtils.verifyAlterTableType(catalog, table, isView) val newTblName = TableIdentifier(newName, oldName.database) // If an exception is thrown here we can just assume the table is uncached; // this can happen with Hive tables when the underlying catalog is in-memory. @@ -215,40 +217,38 @@ case class LoadDataCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (!catalog.tableExists(table)) { - throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table") - } - val targetTable = catalog.getTableMetadataOption(table).getOrElse { - throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table") - } + val targetTable = catalog.getTableMetadata(table) + val tableIdentwithDB = targetTable.identifier.quotedString + if (targetTable.tableType == CatalogTableType.VIEW) { - throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table") + throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB") } if (DDLUtils.isDatasourceTable(targetTable)) { - throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table") + throw new AnalysisException( + s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB") } if (targetTable.partitionColumnNames.nonEmpty) { if (partition.isEmpty) { - throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but no partition spec is provided") } if (targetTable.partitionColumnNames.size != partition.get.size) { - throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but number of columns in provided partition spec (${partition.get.size}) " + s"do not match number of partitioned columns in table " + s"(s${targetTable.partitionColumnNames.size})") } partition.get.keys.foreach { colName => if (!targetTable.partitionColumnNames.contains(colName)) { - throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but the specified partition spec refers to a column that is not partitioned: " + s"'$colName'") } } } else { if (partition.nonEmpty) { - throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " + - s"but a partition spec was provided.") + throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " + + s"partitioned, but a partition spec was provided.") } } @@ -336,32 +336,27 @@ case class TruncateTableCommand( override def run(spark: SparkSession): Seq[Row] = { val catalog = spark.sessionState.catalog - if (!catalog.tableExists(tableName)) { - throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.") - } - if (catalog.isTemporaryTable(tableName)) { - throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName") - } val table = catalog.getTableMetadata(tableName) + val tableIdentwithDB = table.identifier.quotedString + if (table.tableType == CatalogTableType.EXTERNAL) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName") + s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB") } if (table.tableType == CatalogTableType.VIEW) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on views: $tableName") + s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB") } val isDatasourceTable = DDLUtils.isDatasourceTable(table) if (isDatasourceTable && partitionSpec.isDefined) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables created using the data sources API: $tableName") + s"for tables created using the data sources API: $tableIdentwithDB") } if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables that are not partitioned: $tableName") + s"for tables that are not partitioned: $tableIdentwithDB") } val locations = if (isDatasourceTable) { @@ -369,7 +364,7 @@ case class TruncateTableCommand( } else if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { - catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri) + catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri) } val hadoopConf = spark.sessionState.newHadoopConf() locations.foreach { location => @@ -382,7 +377,7 @@ case class TruncateTableCommand( } catch { case NonFatal(e) => throw new AnalysisException( - s"Failed to truncate table $tableName when removing data of the path: $path " + + s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " + s"because of ${e.toString}") } } @@ -392,10 +387,10 @@ case class TruncateTableCommand( spark.sessionState.refreshTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { - spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) + spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier)) } catch { case NonFatal(e) => - log.warn(s"Exception when attempting to uncache table $tableName", e) + log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e) } Seq.empty[Row] } @@ -600,13 +595,19 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; * }}} */ -case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { +case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand { override val output: Seq[Attribute] = { AttributeReference("col_name", StringType, nullable = false)() :: Nil } override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => + val catalog = sparkSession.sessionState.catalog + val table = if (tableName.database.isDefined) { + catalog.getTableMetadata(tableName) + } else { + catalog.getTempViewOrPermanentTableMetadata(tableName.table) + } + table.schema.map { c => Row(c.name) } } @@ -628,7 +629,7 @@ case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { * }}} */ case class ShowPartitionsCommand( - table: TableIdentifier, + tableName: TableIdentifier, spec: Option[TablePartitionSpec]) extends RunnableCommand { override val output: Seq[Attribute] = { AttributeReference("partition", StringType, nullable = false)() :: Nil @@ -642,13 +643,8 @@ case class ShowPartitionsCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - - if (catalog.isTemporaryTable(table)) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}") - } - - val tab = catalog.getTableMetadata(table) + val table = catalog.getTableMetadata(tableName) + val tableIdentWithDB = table.identifier.quotedString /** * Validate and throws an [[AnalysisException]] exception under the following conditions: @@ -656,19 +652,18 @@ case class ShowPartitionsCommand( * 2. If it is a datasource table. * 3. If it is a view. */ - if (tab.tableType == VIEW) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a view: ${tab.qualifiedName}") + if (table.tableType == VIEW) { + throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB") } - if (tab.partitionColumnNames.isEmpty) { + if (table.partitionColumnNames.isEmpty) { throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}") + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB") } - if (DDLUtils.isDatasourceTable(tab)) { + if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}") + s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB") } /** @@ -677,7 +672,7 @@ case class ShowPartitionsCommand( * thrown if the partitioning spec is invalid. */ if (spec.isDefined) { - val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains) + val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains) if (badColumns.nonEmpty) { val badCols = badColumns.mkString("[", ", ", "]") throw new AnalysisException( @@ -685,8 +680,8 @@ case class ShowPartitionsCommand( } } - val partNames = catalog.listPartitions(table, spec).map { p => - getPartName(p.spec, tab.partitionColumnNames) + val partNames = catalog.listPartitions(tableName, spec).map { p => + getPartName(p.spec, table.partitionColumnNames) } partNames.map(Row(_)) @@ -700,16 +695,6 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - - if (catalog.isTemporaryTable(table)) { - throw new AnalysisException( - s"SHOW CREATE TABLE cannot be applied to temporary table") - } - - if (!catalog.tableExists(table)) { - throw new AnalysisException(s"Table $table doesn't exist") - } - val tableMetadata = catalog.getTableMetadata(table) // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 3fa6298562..6fecda232a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -151,7 +151,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { - val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier) + val tableMetadata = if (tableIdentifier.database.isDefined) { + sessionCatalog.getTableMetadata(tableIdentifier) + } else { + sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table) + } + val partitionColumnNames = tableMetadata.partitionColumnNames.toSet val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet val columns = tableMetadata.schema.map { c => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4a171808c0..b5499f2884 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1646,7 +1646,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { (1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab") sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'") sql(s"CREATE VIEW my_view AS SELECT 1") - assertUnsupported("TRUNCATE TABLE my_temp_tab") + intercept[NoSuchTableException] { + sql("TRUNCATE TABLE my_temp_tab") + } assertUnsupported("TRUNCATE TABLE my_ext_tab") assertUnsupported("TRUNCATE TABLE my_view") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index df33731df2..b2103b3bfc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -406,25 +406,24 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |USING org.apache.spark.sql.parquet.DefaultSource """.stripMargin) // An empty sequence of row is returned for session temporary table. - val message1 = intercept[AnalysisException] { + intercept[NoSuchTableException] { sql("SHOW PARTITIONS parquet_temp") - }.getMessage - assert(message1.contains("is not allowed on a temporary table")) + } - val message2 = intercept[AnalysisException] { + val message1 = intercept[AnalysisException] { sql("SHOW PARTITIONS parquet_tab3") }.getMessage - assert(message2.contains("not allowed on a table that is not partitioned")) + assert(message1.contains("not allowed on a table that is not partitioned")) - val message3 = intercept[AnalysisException] { + val message2 = intercept[AnalysisException] { sql("SHOW PARTITIONS parquet_tab4 PARTITION(abcd=2015, xyz=1)") }.getMessage - assert(message3.contains("Non-partitioning column(s) [abcd, xyz] are specified")) + assert(message2.contains("Non-partitioning column(s) [abcd, xyz] are specified")) - val message4 = intercept[AnalysisException] { + val message3 = intercept[AnalysisException] { sql("SHOW PARTITIONS parquet_view1") }.getMessage - assert(message4.contains("is not allowed on a view")) + assert(message3.contains("is not allowed on a view")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index aa35a335fa..38482f66a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -300,7 +300,7 @@ class HiveDDLSuite sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") }.getMessage assert(message.contains( - "Attempted to unset non-existent property 'p' in table '`view1`'")) + "Attempted to unset non-existent property 'p' in table '`default`.`view1`'")) } } } @@ -678,8 +678,8 @@ class HiveDDLSuite .createTempView(sourceViewName) sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName") - val sourceTable = spark.sessionState.catalog.getTableMetadata( - TableIdentifier(sourceViewName, None)) + val sourceTable = + spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName) val targetTable = spark.sessionState.catalog.getTableMetadata( TableIdentifier(targetTabName, Some("default"))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index bc999d4724..a215c70da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -82,25 +82,53 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("error handling: insert/load/truncate table commands against a temp view") { + test("Issue exceptions for ALTER VIEW on the temporary view") { val viewName = "testView" withTempView(viewName) { - sql(s"CREATE TEMPORARY VIEW $viewName AS SELECT id FROM jt") - var e = intercept[AnalysisException] { + spark.range(10).createTempView(viewName) + assertNoSuchTable(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") + assertNoSuchTable(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") + } + } + + test("Issue exceptions for ALTER TABLE on the temporary view") { + val viewName = "testView" + withTempView(viewName) { + spark.range(10).createTempView(viewName) + assertNoSuchTable(s"ALTER TABLE $viewName SET SERDE 'whatever'") + assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a=1, b=2) SET SERDE 'whatever'") + assertNoSuchTable(s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')") + assertNoSuchTable(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'") + assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'") + assertNoSuchTable(s"ALTER TABLE $viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')") + assertNoSuchTable(s"ALTER TABLE $viewName DROP PARTITION (a='4', b='8')") + assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')") + assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS") + } + } + + test("Issue exceptions for other table DDL on the temporary view") { + val viewName = "testView" + withTempView(viewName) { + spark.range(10).createTempView(viewName) + + val e = intercept[AnalysisException] { sql(s"INSERT INTO TABLE $viewName SELECT 1") }.getMessage assert(e.contains("Inserting into an RDD-based table is not allowed")) val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath - e = intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") - }.getMessage - assert(e.contains(s"Target table in LOAD DATA cannot be temporary: `$viewName`")) + assertNoSuchTable(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") + assertNoSuchTable(s"TRUNCATE TABLE $viewName") + assertNoSuchTable(s"SHOW CREATE TABLE $viewName") + assertNoSuchTable(s"SHOW PARTITIONS $viewName") + assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + } + } - e = intercept[AnalysisException] { - sql(s"TRUNCATE TABLE $viewName") - }.getMessage - assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on temporary tables: `$viewName`")) + private def assertNoSuchTable(query: String): Unit = { + intercept[NoSuchTableException] { + sql(query) } } @@ -117,12 +145,12 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { e = intercept[AnalysisException] { sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") }.getMessage - assert(e.contains(s"Target table in LOAD DATA cannot be a view: `$viewName`")) + assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`")) e = intercept[AnalysisException] { sql(s"TRUNCATE TABLE $viewName") }.getMessage - assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `$viewName`")) + assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `default`.`testview`")) } } @@ -277,13 +305,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("should not allow ALTER VIEW AS when the view does not exist") { - intercept[NoSuchTableException]( - sql("ALTER VIEW testView AS SELECT 1, 2") - ) - - intercept[NoSuchTableException]( - sql("ALTER VIEW default.testView AS SELECT 1, 2") - ) + assertNoSuchTable("ALTER VIEW testView AS SELECT 1, 2") + assertNoSuchTable("ALTER VIEW default.testView AS SELECT 1, 2") } test("ALTER VIEW AS should try to alter temp view first if view name has no database part") { -- GitLab