From d93976d8660b68eeef646d1fe687cfce01f50f9d Mon Sep 17 00:00:00 2001 From: Dilip Biswal <dbiswal@us.ibm.com> Date: Wed, 27 Apr 2016 09:28:24 +0800 Subject: [PATCH] [SPARK-14445][SQL] Support native execution of SHOW COLUMNS and SHOW PARTITIONS ## What changes were proposed in this pull request? This PR adds Native execution of SHOW COLUMNS and SHOW PARTITION commands. Command Syntax: ``` SQL SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database] ``` ``` SQL SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] ``` ## How was this patch tested? Added test cases in HiveCommandSuite to verify execution and DDLCommandSuite to verify plans. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #12222 from dilipbiswal/dkb_show_columns. --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../catalyst/catalog/InMemoryCatalog.scala | 7 +- .../sql/catalyst/catalog/SessionCatalog.scala | 13 +- .../sql/catalyst/catalog/interface.scala | 16 +- .../spark/sql/execution/SparkSqlParser.scala | 38 +++++ .../sql/execution/command/commands.scala | 107 ++++++++++++- .../spark/sql/execution/command/ddl.scala | 4 + .../datasources/PartitioningUtils.scala | 2 +- .../execution/command/DDLCommandSuite.scala | 40 +++++ .../spark/sql/hive/HiveExternalCatalog.scala | 8 +- .../spark/sql/hive/MetastoreRelation.scala | 2 +- .../spark/sql/hive/client/HiveClient.scala | 21 ++- .../sql/hive/client/HiveClientImpl.scala | 19 ++- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- .../sql/hive/execution/HiveCommandSuite.scala | 145 +++++++++++++++++- .../hive/execution/HiveComparisonTest.scala | 4 +- 16 files changed, 401 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 64f68c9e9e..9de313ae87 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -106,6 +106,9 @@ statement | SHOW DATABASES (LIKE pattern=STRING)? #showDatabases | SHOW TBLPROPERTIES table=tableIdentifier ('(' key=tablePropertyKey ')')? #showTblProperties + | SHOW COLUMNS (FROM | IN) tableIdentifier + ((FROM | IN) db=identifier)? #showColumns + | SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions | SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? @@ -128,7 +131,6 @@ hiveNativeCommands : DELETE FROM tableIdentifier (WHERE booleanExpression)? | TRUNCATE TABLE tableIdentifier partitionSpec? (COLUMNS identifierList)? - | SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM|IN) identifier)? | START TRANSACTION (transactionMode (',' transactionMode)*)? | COMMIT WORK? | ROLLBACK WORK? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index b8f0e458fa..28a67067d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -300,8 +300,13 @@ class InMemoryCatalog extends ExternalCatalog { override def listPartitions( db: String, - table: String): Seq[CatalogTablePartition] = synchronized { + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized { requireTableExists(db, table) + if (partialSpec.nonEmpty) { + throw new AnalysisException("listPartition does not support partition spec in " + + "InMemoryCatalog.") + } catalog(db).tables(table).partitions.values.toSeq } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b36a76a888..402aacfc1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -471,13 +471,18 @@ class SessionCatalog( } /** - * List all partitions in a table, assuming it exists. - * If no database is specified, assume the table is in the current database. + * List the metadata of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. */ - def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { + def listPartitions( + tableName: TableIdentifier, + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = { val db = tableName.database.getOrElse(currentDb) val table = formatTableName(tableName.table) - externalCatalog.listPartitions(db, table) + externalCatalog.listPartitions(db, table, partialSpec) } // ---------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index fd5bcad0f8..9e90987731 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -157,8 +157,20 @@ abstract class ExternalCatalog { def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition - // TODO: support listing by pattern - def listPartitions(db: String, table: String): Seq[CatalogTablePartition] + /** + * List the metadata of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. + * @param db database name + * @param table table name + * @param partialSpec partition spec + */ + def listPartitions( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] // -------------------------------------------------------------------------- // Functions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a1862f59a0..ebc60edcba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -152,6 +152,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { Option(ctx.key).map(visitTablePropertyKey)) } + /** + * A command for users to list the columm names for a table. + * This function creates a [[ShowColumnsCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; + * }}} + */ + override def visitShowColumns(ctx: ShowColumnsContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier) + + val lookupTable = Option(ctx.db) match { + case None => table + case Some(db) if table.database.isDefined => + throw new ParseException("Duplicates the declaration for database", ctx) + case Some(db) => TableIdentifier(table.identifier, Some(db.getText)) + } + ShowColumnsCommand(lookupTable) + } + + /** + * A command for users to list the partition names of a table. If partition spec is specified, + * partitions that match the spec are returned. Otherwise an empty result set is returned. + * + * This function creates a [[ShowPartitionsCommand]] logical plan + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW PARTITIONS table_identifier [partition_spec]; + * }}} + */ + override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier) + val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + ShowPartitionsCommand(table, partitionKeys) + } + /** * Create a [[RefreshTable]] logical plan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 7bb59b7803..6b1d413845 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -17,14 +17,19 @@ package org.apache.spark.sql.execution.command +import java.io.File + import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.types._ @@ -112,3 +117,101 @@ case class ExplainCommand( ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } + +/** + * A command to list the column names for a table. This function creates a + * [[ShowColumnsCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; + * }}} + */ +case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { + // The result of SHOW COLUMNS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => + Row(c.name) + } + } +} + +/** + * A command to list the partition names of a table. If the partition spec is specified, + * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under + * the following conditions: + * + * 1. If the command is called for a non partitioned table. + * 2. If the partition spec refers to the columns that are not defined as partitioning columns. + * + * This function creates a [[ShowPartitionsCommand]] logical plan + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] + * }}} + */ +case class ShowPartitionsCommand( + table: TableIdentifier, + spec: Option[TablePartitionSpec]) extends RunnableCommand { + // The result of SHOW PARTITIONS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { + partColNames.map { name => + PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name)) + }.mkString(File.separator) + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val db = table.database.getOrElse(catalog.getCurrentDatabase) + if (catalog.isTemporaryTable(table)) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " + + s"${table.unquotedString}") + } else { + val tab = catalog.getTableMetadata(table) + /** + * Validate and throws an [[AnalysisException]] exception under the following conditions: + * 1. If the table is not partitioned. + * 2. If it is a datasource table. + * 3. If it is a view or index table. + */ + if (tab.tableType == CatalogTableType.VIRTUAL_VIEW || + tab.tableType == CatalogTableType.INDEX_TABLE) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " + + s"${tab.qualifiedName}") + } + if (!DDLUtils.isTablePartitioned(tab)) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " + + s"partitioned: ${tab.qualifiedName}") + } + if (DDLUtils.isDatasourceTable(tab)) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " + + s"${tab.qualifiedName}") + } + /** + * Validate the partitioning spec by making sure all the referenced columns are + * defined as partitioning columns in table definition. An AnalysisException exception is + * thrown if the partitioning spec is invalid. + */ + if (spec.isDefined) { + val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) + if (badColumns.nonEmpty) { + throw new AnalysisException( + s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " + + s"specified for SHOW PARTITIONS") + } + } + val partNames = + catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames)) + partNames.map { p => Row(p) } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f5aa8fb6fa..ecde3320b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -536,5 +536,9 @@ private[sql] object DDLUtils { case _ => }) } + def isTablePartitioned(table: CatalogTable): Boolean = { + table.partitionColumns.size > 0 || + table.properties.contains("spark.sql.sources.schema.numPartCols") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 1065bb1047..74f2993754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -424,7 +424,7 @@ private[sql] object PartitioningUtils { path.foreach { c => if (needsEscaping(c)) { builder.append('%') - builder.append(f"${c.asInstanceOf[Int]}%02x") + builder.append(f"${c.asInstanceOf[Int]}%02X") } else { builder.append(c) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 934c6f362d..9db5ccbbd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -678,4 +678,44 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed3, expected3) comparePlans(parsed4, expected4) } + + test("show columns") { + val sql1 = "SHOW COLUMNS FROM t1" + val sql2 = "SHOW COLUMNS IN db1.t1" + val sql3 = "SHOW COLUMNS FROM t1 IN db1" + val sql4 = "SHOW COLUMNS FROM db1.t1 IN db2" + + val parsed1 = parser.parsePlan(sql1) + val expected1 = ShowColumnsCommand(TableIdentifier("t1", None)) + val parsed2 = parser.parsePlan(sql2) + val expected2 = ShowColumnsCommand(TableIdentifier("t1", Some("db1"))) + val parsed3 = parser.parsePlan(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected2) + val message = intercept[ParseException] { + parser.parsePlan(sql4) + }.getMessage + assert(message.contains("Duplicates the declaration for database")) + } + + test("show partitions") { + val sql1 = "SHOW PARTITIONS t1" + val sql2 = "SHOW PARTITIONS db1.t1" + val sql3 = "SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')" + + val parsed1 = parser.parsePlan(sql1) + val expected1 = + ShowPartitionsCommand(TableIdentifier("t1", None), None) + val parsed2 = parser.parsePlan(sql2) + val expected2 = + ShowPartitionsCommand(TableIdentifier("t1", Some("db1")), None) + val expected3 = + ShowPartitionsCommand(TableIdentifier("t1", None), + Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue"))) + val parsed3 = parser.parsePlan(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index a92a94cae5..313093818f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -283,10 +283,14 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat client.getPartition(db, table, spec) } + /** + * Returns the partition names from hive metastore for a given table in a database. + */ override def listPartitions( db: String, - table: String): Seq[CatalogTablePartition] = withClient { - client.getAllPartitions(db, table) + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { + client.getPartitions(db, table, partialSpec) } // -------------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 0520e75306..367fcf13d2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -130,7 +130,7 @@ private[hive] case class MetastoreRelation( // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable) + private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ae719f86aa..ef08a39c17 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -163,13 +163,24 @@ private[hive] trait HiveClient { table: CatalogTable, spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] - /** Returns all partitions for the given table. */ - final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = { - getAllPartitions(getTable(db, table)) + /** + * Returns the partitions for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + final def getPartitions( + db: String, + table: String, + partialSpec: Option[ExternalCatalog.TablePartitionSpec]): Seq[CatalogTablePartition] = { + getPartitions(getTable(db, table), partialSpec) } - /** Returns all partitions for the given table. */ - def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] + /** + * Returns the partitions for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + def getPartitions( + table: CatalogTable, + partialSpec: Option[ExternalCatalog.TablePartitionSpec] = None): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ def getPartitionsByFilter( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6327431368..6a7345f758 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,8 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} -import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState @@ -41,6 +40,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -422,15 +422,24 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, - spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { + spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(fromHivePartition) } - override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState { + /** + * Returns the partitions for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + override def getPartitions( + table: CatalogTable, + spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table) - shim.getAllPartitions(client, hiveTable).map(fromHivePartition) + spec match { + case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) + case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) + } } override def getPartitionsByFilter( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e0288ff98f..916a470aa5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -189,7 +189,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: getPartitions") { - client.getAllPartitions(client.getTable("default", "src_part")) + client.getPartitions(client.getTable("default", "src_part")) } test(s"$version: getPartitionsByFilter") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 8b4e4dced8..a4c6d3c185 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} import org.apache.spark.sql.test.SQLTestUtils class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - protected override def beforeAll(): Unit = { + import testImplicits._ + + protected override def beforeAll(): Unit = { super.beforeAll() sql( """ @@ -30,18 +33,44 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |USING org.apache.spark.sql.parquet.DefaultSource """.stripMargin) - sql( + sql( """ |CREATE EXTERNAL TABLE parquet_tab2 (c1 INT, c2 STRING) |STORED AS PARQUET |TBLPROPERTIES('prop1Key'="prop1Val", '`prop2Key`'="prop2Val") """.stripMargin) + sql("CREATE TABLE parquet_tab3(col1 int, `col 2` int)") + sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (year int, month int)") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 1) SELECT 1, 1") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 2) SELECT 2, 2") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 2) SELECT 3, 3") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 3) SELECT 3, 3") + sql( + """ + |CREATE TABLE parquet_tab5 (price int, qty int) + |PARTITIONED BY (year int, month int, hour int, minute int, sec int, extra int) + """.stripMargin) + sql( + """ + |INSERT INTO parquet_tab5 + |PARTITION(year = 2016, month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + """.stripMargin) + sql( + """ + |INSERT INTO parquet_tab5 + |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + """.stripMargin) + sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") } override protected def afterAll(): Unit = { try { sql("DROP TABLE IF EXISTS parquet_tab1") sql("DROP TABLE IF EXISTS parquet_tab2") + sql("DROP TABLE IF EXISTS parquet_tab3") + sql("DROP VIEW IF EXISTS parquet_view1") + sql("DROP TABLE IF EXISTS parquet_tab4") + sql("DROP TABLE IF EXISTS parquet_tab5") } finally { super.afterAll() } @@ -247,4 +276,112 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName) } } + + test("show columns") { + checkAnswer( + sql("SHOW COLUMNS IN parquet_tab3"), + Row("col1") :: Row("col 2") :: Nil) + + checkAnswer( + sql("SHOW COLUMNS IN default.parquet_tab3"), + Row("col1") :: Row("col 2") :: Nil) + + checkAnswer( + sql("SHOW COLUMNS IN parquet_tab3 FROM default"), + Row("col1") :: Row("col 2") :: Nil) + + checkAnswer( + sql("SHOW COLUMNS IN parquet_tab4 IN default"), + Row("price") :: Row("qty") :: Row("year") :: Row("month") :: Nil) + + val message = intercept[NoSuchTableException] { + sql("SHOW COLUMNS IN badtable FROM default") + }.getMessage + assert(message.contains("badtable not found in database")) + } + + test("show partitions - show everything") { + checkAnswer( + sql("show partitions parquet_tab4"), + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: + Row("year=2016/month=3") :: Nil) + + checkAnswer( + sql("show partitions default.parquet_tab4"), + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: + Row("year=2016/month=3") :: Nil) + } + + test("show partitions - show everything more than 5 part keys") { + checkAnswer( + sql("show partitions parquet_tab5"), + Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") :: + Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) + } + + test("show partitions - filter") { + checkAnswer( + sql("show partitions default.parquet_tab4 PARTITION(year=2015)"), + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: Nil) + + checkAnswer( + sql("show partitions default.parquet_tab4 PARTITION(year=2015, month=1)"), + Row("year=2015/month=1") :: Nil) + + checkAnswer( + sql("show partitions default.parquet_tab4 PARTITION(month=2)"), + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: Nil) + } + + test("show partitions - empty row") { + withTempTable("parquet_temp") { + sql( + """ + |CREATE TEMPORARY TABLE parquet_temp (c1 INT, c2 STRING) + |USING org.apache.spark.sql.parquet.DefaultSource + """.stripMargin) + // An empty sequence of row is returned for session temporary table. + val message1 = intercept[AnalysisException] { + sql("SHOW PARTITIONS parquet_temp") + }.getMessage + assert(message1.contains("is not allowed on a temporary table")) + + val message2 = intercept[AnalysisException] { + sql("SHOW PARTITIONS parquet_tab3") + }.getMessage + assert(message2.contains("not allowed on a table that is not partitioned")) + + val message3 = intercept[AnalysisException] { + sql("SHOW PARTITIONS parquet_tab4 PARTITION(abcd=2015, xyz=1)") + }.getMessage + assert(message3.contains("Non-partitioning column(s) [abcd, xyz] are specified")) + + val message4 = intercept[AnalysisException] { + sql("SHOW PARTITIONS parquet_view1") + }.getMessage + assert(message4.contains("is not allowed on a view or index table")) + } + } + + test("show partitions - datasource") { + withTable("part_datasrc") { + val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") + df.write + .partitionBy("a") + .format("parquet") + .mode(SaveMode.Overwrite) + .saveAsTable("part_datasrc") + + val message1 = intercept[AnalysisException] { + sql("SHOW PARTITIONS part_datasrc") + }.getMessage + assert(message1.contains("is not allowed on a datasource table")) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index aac5cc6d40..3a9c981be4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.SQLBuilder import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand, HiveNativeCommand, SetCommand} +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand, HiveNativeCommand, SetCommand, ShowColumnsCommand} import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable} import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution} @@ -175,7 +175,7 @@ abstract class HiveComparisonTest .filterNot(_ == "") case _: HiveNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer - case _: DescribeTableCommand => + case _: DescribeTableCommand | ShowColumnsCommand(_) => // Filter out non-deterministic lines and lines which do not have actual results but // can introduce problems because of the way Hive formats these lines. // Then, remove empty lines. Do not sort the results. -- GitLab