diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index ffb7a097ee6454a94b2ced2ac7b30b5c5e7f203e..06ac37b7f83ed6f0ef6d25d71d7fa128aca58edc 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -45,7 +45,9 @@ statement | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase | createTableHeader ('(' colTypeList ')')? tableProvider - (OPTIONS tablePropertyList)? #createTableUsing + (OPTIONS tablePropertyList)? + (PARTITIONED BY partitionColumnNames=identifierList)? + bucketSpec? #createTableUsing | createTableHeader tableProvider (OPTIONS tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierList)? @@ -102,6 +104,7 @@ statement ((FROM | IN) db=identifier)? #showColumns | SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions | SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions + | SHOW CREATE TABLE tableIdentifier #showCreateTable | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? #describeTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index 7d0584511fafe09e479d044ddd220c51a67a9229..d7b48ceca591ac11aaa68449978ff2a56b385388 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -44,7 +44,7 @@ sealed trait IdentifierWithDatabase { /** * Identifies a table in a database. * If `database` is not defined, the current database is used. - * When we register a permenent function in the FunctionRegistry, we use + * When we register a permanent function in the FunctionRegistry, we use * unquotedString as the function name. */ case class TableIdentifier(table: String, database: Option[String]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 53aba1f2069f84f524cbd6ae6d5ac8c0dbf89b6d..b6e074bf59f4ad037e770ca06e5401730cbbbbdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -181,6 +181,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ShowPartitionsCommand(table, partitionKeys) } + /** + * Creates a [[ShowCreateTableCommand]] + */ + override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier()) + ShowCreateTableCommand(table) + } + /** * Create a [[RefreshTable]] logical plan. */ @@ -287,6 +295,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText + val partitionColumnNames = + Option(ctx.partitionColumnNames) + .map(visitIdentifierList(_).toArray) + .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) if (ctx.query != null) { @@ -302,16 +314,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { SaveMode.ErrorIfExists } - val partitionColumnNames = - Option(ctx.partitionColumnNames) - .map(visitIdentifierList(_).toArray) - .getOrElse(Array.empty[String]) - CreateTableUsingAsSelect( table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query) } else { val struct = Option(ctx.colTypeList()).map(createStructType) - CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true) + CreateTableUsing( + table, + struct, + provider, + temp, + options, + partitionColumnNames, + bucketSpec, + ifNotExists, + managedIfNoPath = true) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 9747e58f43717cb6e8531abafae2f97a3214f494..faf359f54838e88efdfa69c53f71a19c1f060a4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -372,10 +372,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, opts, false, _) => + case c: CreateTableUsing if c.temporary && !c.allowExisting => ExecutedCommandExec( CreateTempTableUsing( - tableIdent, userSpecifiedSchema, provider, opts)) :: Nil + c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil case c: CreateTableUsing if !c.temporary => val cmd = @@ -384,6 +384,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { c.userSpecifiedSchema, c.provider, c.options, + c.partitionColumns, + c.bucketSpec, c.allowExisting, c.managedIfNoPath) ExecutedCommandExec(cmd) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 576e12a94bb21ef753ad49a2471a24a09b69dfa1..d5aaccc4bdd90a28da8a8b974aaaf24535f0b7f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -17,19 +17,14 @@ package org.apache.spark.sql.execution.command -import java.io.File - import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.types._ @@ -117,101 +112,3 @@ case class ExplainCommand( ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } - -/** - * A command to list the column names for a table. This function creates a - * [[ShowColumnsCommand]] logical plan. - * - * The syntax of using this command in SQL is: - * {{{ - * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; - * }}} - */ -case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { - // The result of SHOW COLUMNS has one column called 'result' - override val output: Seq[Attribute] = { - AttributeReference("result", StringType, nullable = false)() :: Nil - } - - override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => - Row(c.name) - } - } -} - -/** - * A command to list the partition names of a table. If the partition spec is specified, - * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under - * the following conditions: - * - * 1. If the command is called for a non partitioned table. - * 2. If the partition spec refers to the columns that are not defined as partitioning columns. - * - * This function creates a [[ShowPartitionsCommand]] logical plan - * - * The syntax of using this command in SQL is: - * {{{ - * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] - * }}} - */ -case class ShowPartitionsCommand( - table: TableIdentifier, - spec: Option[TablePartitionSpec]) extends RunnableCommand { - // The result of SHOW PARTITIONS has one column called 'result' - override val output: Seq[Attribute] = { - AttributeReference("result", StringType, nullable = false)() :: Nil - } - - private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { - partColNames.map { name => - PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name)) - }.mkString(File.separator) - } - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val db = table.database.getOrElse(catalog.getCurrentDatabase) - if (catalog.isTemporaryTable(table)) { - throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " + - s"${table.unquotedString}") - } else { - val tab = catalog.getTableMetadata(table) - /** - * Validate and throws an [[AnalysisException]] exception under the following conditions: - * 1. If the table is not partitioned. - * 2. If it is a datasource table. - * 3. If it is a view or index table. - */ - if (tab.tableType == CatalogTableType.VIEW || - tab.tableType == CatalogTableType.INDEX) { - throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " + - s"${tab.qualifiedName}") - } - if (!DDLUtils.isTablePartitioned(tab)) { - throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " + - s"partitioned: ${tab.qualifiedName}") - } - if (DDLUtils.isDatasourceTable(tab)) { - throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " + - s"${tab.qualifiedName}") - } - /** - * Validate the partitioning spec by making sure all the referenced columns are - * defined as partitioning columns in table definition. An AnalysisException exception is - * thrown if the partitioning spec is invalid. - */ - if (spec.isDefined) { - val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) - if (badColumns.nonEmpty) { - throw new AnalysisException( - s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " + - s"specified for SHOW PARTITIONS") - } - } - val partNames = - catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames)) - partNames.map { p => Row(p) } - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3525111e4691fed539ed21fab03433d11420600f..de3c868176d49e151ea5743cabdc97020b7858ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -51,6 +51,8 @@ case class CreateDataSourceTableCommand( userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], + partitionColumns: Array[String], + bucketSpec: Option[BucketSpec], ignoreIfExists: Boolean, managedIfNoPath: Boolean) extends RunnableCommand { @@ -103,8 +105,8 @@ case class CreateDataSourceTableCommand( sparkSession = sparkSession, tableIdent = tableIdent, userSpecifiedSchema = userSpecifiedSchema, - partitionColumns = Array.empty[String], - bucketSpec = None, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, provider = provider, options = optionsWithPath, isExternal = isExternal) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 0f90715a90e1235f61d10761ab0c1c51617af3cd..e6dcd1ee95b98b9d2a3333c8948cfc97331b38c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,10 +26,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} -import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( @@ -490,3 +493,241 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio } } } + +/** + * A command to list the column names for a table. This function creates a + * [[ShowColumnsCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; + * }}} + */ +case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { + // The result of SHOW COLUMNS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => + Row(c.name) + } + } +} + +/** + * A command to list the partition names of a table. If the partition spec is specified, + * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under + * the following conditions: + * + * 1. If the command is called for a non partitioned table. + * 2. If the partition spec refers to the columns that are not defined as partitioning columns. + * + * This function creates a [[ShowPartitionsCommand]] logical plan + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] + * }}} + */ +case class ShowPartitionsCommand( + table: TableIdentifier, + spec: Option[TablePartitionSpec]) extends RunnableCommand { + // The result of SHOW PARTITIONS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { + partColNames.map { name => + PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name)) + }.mkString(File.separator) + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + + if (catalog.isTemporaryTable(table)) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}") + } + + val tab = catalog.getTableMetadata(table) + + /** + * Validate and throws an [[AnalysisException]] exception under the following conditions: + * 1. If the table is not partitioned. + * 2. If it is a datasource table. + * 3. If it is a view or index table. + */ + if (tab.tableType == VIEW || + tab.tableType == INDEX) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a view or index table: ${tab.qualifiedName}") + } + + if (!DDLUtils.isTablePartitioned(tab)) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}") + } + + if (DDLUtils.isDatasourceTable(tab)) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}") + } + + /** + * Validate the partitioning spec by making sure all the referenced columns are + * defined as partitioning columns in table definition. An AnalysisException exception is + * thrown if the partitioning spec is invalid. + */ + if (spec.isDefined) { + val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) + if (badColumns.nonEmpty) { + val badCols = badColumns.mkString("[", ", ", "]") + throw new AnalysisException( + s"Non-partitioning column(s) $badCols are specified for SHOW PARTITIONS") + } + } + + val partNames = catalog.listPartitions(table, spec).map { p => + getPartName(p.spec, tab.partitionColumnNames) + } + + partNames.map(Row(_)) + } +} + +case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand { + override val output: Seq[Attribute] = Seq( + AttributeReference("createtab_stmt", StringType, nullable = false)() + ) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + + if (catalog.isTemporaryTable(table)) { + throw new AnalysisException( + s"SHOW CREATE TABLE cannot be applied to temporary table") + } + + if (!catalog.tableExists(table)) { + throw new AnalysisException(s"Table $table doesn't exist") + } + + val tableMetadata = catalog.getTableMetadata(table) + + val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { + showCreateDataSourceTable(tableMetadata) + } else { + throw new UnsupportedOperationException( + "SHOW CREATE TABLE only supports Spark SQL data source tables.") + } + + Seq(Row(stmt)) + } + + private def showCreateDataSourceTable(metadata: CatalogTable): String = { + val builder = StringBuilder.newBuilder + + builder ++= s"CREATE TABLE ${table.quotedString} " + showDataSourceTableDataCols(metadata, builder) + showDataSourceTableOptions(metadata, builder) + showDataSourceTableNonDataColumns(metadata, builder) + + builder.toString() + } + + private def showDataSourceTableDataCols(metadata: CatalogTable, builder: StringBuilder): Unit = { + val props = metadata.properties + val schemaParts = for { + numParts <- props.get("spark.sql.sources.schema.numParts").toSeq + index <- 0 until numParts.toInt + } yield props.getOrElse( + s"spark.sql.sources.schema.part.$index", + throw new AnalysisException( + s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing." + ) + ) + + if (schemaParts.nonEmpty) { + val fields = DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields + val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") + builder ++= colTypeList.mkString("(", ", ", ")") + } + + builder ++= "\n" + } + + private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { + val props = metadata.properties + + builder ++= s"USING ${props("spark.sql.sources.provider")}\n" + + val dataSourceOptions = metadata.storage.serdeProperties.filterNot { + case (key, value) => + // If it's a managed table, omit PATH option. Spark SQL always creates external table + // when the table creation DDL contains the PATH option. + key.toLowerCase == "path" && metadata.tableType == MANAGED + }.map { + case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" + } + + if (dataSourceOptions.nonEmpty) { + builder ++= "OPTIONS (\n" + builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n") + builder ++= ")\n" + } + } + + private def showDataSourceTableNonDataColumns( + metadata: CatalogTable, builder: StringBuilder): Unit = { + val props = metadata.properties + + def getColumnNamesByType(colType: String, typeName: String): Seq[String] = { + (for { + numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq + index <- 0 until numCols.toInt + } yield props.getOrElse( + s"spark.sql.sources.schema.${colType}Col.$index", + throw new AnalysisException( + s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." + ) + )).map(quoteIdentifier) + } + + val partCols = getColumnNamesByType("part", "partitioning columns") + if (partCols.nonEmpty) { + builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n" + } + + val bucketCols = getColumnNamesByType("bucket", "bucketing columns") + if (bucketCols.nonEmpty) { + builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n" + + val sortCols = getColumnNamesByType("sort", "sorting columns") + if (sortCols.nonEmpty) { + builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n" + } + + val numBuckets = props.getOrElse( + "spark.sql.sources.schema.numBuckets", + throw new AnalysisException("Corrupted bucket spec in catalog: missing bucket number") + ) + + builder ++= s"INTO $numBuckets BUCKETS\n" + } + } + + private def escapeSingleQuotedString(str: String): String = { + val builder = StringBuilder.newBuilder + + str.foreach { + case '\'' => builder ++= s"\\\'" + case ch => builder += ch + } + + builder.toString() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 7d0a3d9756e9002a89baa23d419b143206bcc070..3863be5768f5fd59d248985cde104b3a8796f33e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -38,6 +38,8 @@ case class CreateTableUsing( provider: String, temporary: Boolean, options: Map[String, String], + partitionColumns: Array[String], + bucketSpec: Option[BucketSpec], allowExisting: Boolean, managedIfNoPath: Boolean) extends LogicalPlan with logical.Command { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index d08dca32c043de3068d162326cb2932184e400e6..fdfb188b38bd80aa4d2685ab26a38375458aff7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -225,7 +225,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { userSpecifiedSchema = None, source, temporary = false, - options, + options = options, + partitionColumns = Array.empty[String], + bucketSpec = None, allowExisting = false, managedIfNoPath = false) sparkSession.executePlan(cmd).toRdd @@ -272,6 +274,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { source, temporary = false, options, + partitionColumns = Array.empty[String], + bucketSpec = None, allowExisting = false, managedIfNoPath = false) sparkSession.executePlan(cmd).toRdd diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index aeb613acb5edf5c8e48d96b4a05f38df55512e23..13df4493e24d7103766163de8fef001fdb7fcc9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.internal.SQLConf - // TODO: merge this with DDLSuite (SPARK-14441) class DDLCommandSuite extends PlanTest { private val parser = new SparkSqlParser(new SQLConf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b0a3a803d299f495349ec1613b88a566d21fc676..8cfcec79cda938172f2d4c922b6e8170011e57e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewCommand} +import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} @@ -44,7 +44,6 @@ import org.apache.spark.sql.types._ * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { - private val conf = sparkSession.conf private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive @@ -110,7 +109,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType]) // We only need names at here since userSpecifiedSchema we loaded from the metastore - // contains partition columns. We can always get datatypes of partitioning columns + // contains partition columns. We can always get data types of partitioning columns // from userSpecifiedSchema. val partitionColumns = getColumnNames("part") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 6d418c1dcf4619301b4e98fa2fe761ebdde1f40e..2f6aa36f95827f76af377566d17df850b1cfee6d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -25,10 +25,8 @@ import scala.collection.mutable import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.FunctionRegistry -import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..12a1ad8987c170c79b4d9ecae1ebfbee93a3beb0 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import testImplicits._ + + test("data source table with user specified schema") { + withTable("ddl_test1") { + val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + + sql( + s"""CREATE TABLE ddl_test1 ( + | a STRING, + | b STRING, + | `extra col` ARRAY<INT>, + | `<another>` STRUCT<x: INT, y: ARRAY<BOOLEAN>> + |) + |USING json + |OPTIONS ( + | PATH '$jsonFilePath' + |) + """.stripMargin + ) + + checkCreateTable("ddl_test1") + } + } + + test("data source table CTAS") { + withTable("ddl_test2") { + sql( + s"""CREATE TABLE ddl_test2 + |USING json + |AS SELECT 1 AS a, "foo" AS b + """.stripMargin + ) + + checkCreateTable("ddl_test2") + } + } + + test("partitioned data source table") { + withTable("ddl_test3") { + sql( + s"""CREATE TABLE ddl_test3 + |USING json + |PARTITIONED BY (b) + |AS SELECT 1 AS a, "foo" AS b + """.stripMargin + ) + + checkCreateTable("ddl_test3") + } + } + + test("bucketed data source table") { + withTable("ddl_test3") { + sql( + s"""CREATE TABLE ddl_test3 + |USING json + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, "foo" AS b + """.stripMargin + ) + + checkCreateTable("ddl_test3") + } + } + + test("partitioned bucketed data source table") { + withTable("ddl_test4") { + sql( + s"""CREATE TABLE ddl_test4 + |USING json + |PARTITIONED BY (c) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, "foo" AS b, 2.5 AS c + """.stripMargin + ) + + checkCreateTable("ddl_test4") + } + } + + test("data source table using Dataset API") { + withTable("ddl_test5") { + spark + .range(3) + .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e) + .write + .mode("overwrite") + .partitionBy("a", "b") + .bucketBy(2, "c", "d") + .saveAsTable("ddl_test5") + + checkCreateTable(TableIdentifier("ddl_test5", Some("default"))) + } + } + + private def checkCreateTable(table: String): Unit = { + checkCreateTable(TableIdentifier(table, Some("default"))) + } + + private def checkCreateTable(table: TableIdentifier): Unit = { + val db = table.database.getOrElse("default") + val expected = spark.externalCatalog.getTable(db, table.table) + val shownDDL = sql(s"SHOW CREATE TABLE ${table.quotedString}").head().getString(0) + sql(s"DROP TABLE ${table.quotedString}") + + withTable(table.table) { + sql(shownDDL) + val actual = spark.externalCatalog.getTable(db, table.table) + checkCatalogTables(expected, actual) + } + } + + private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = { + def normalize(table: CatalogTable): CatalogTable = { + val nondeterministicProps = Set( + "CreateTime", + "transient_lastDdlTime", + "grantTime", + "lastUpdateTime", + "last_modified_by", + "last_modified_time", + "Owner:", + "COLUMN_STATS_ACCURATE", + // The following are hive specific schema parameters which we do not need to match exactly. + "numFiles", + "numRows", + "rawDataSize", + "totalSize", + "totalNumberFiles", + "maxFileSize", + "minFileSize" + ) + + table.copy( + createTime = 0L, + lastAccessTime = 0L, + properties = table.properties.filterKeys(!nondeterministicProps.contains(_))) + } + + assert(normalize(expected) == normalize(actual)) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index bbb775ef770afd1ca0f54fa37ca5d9aa5aae99c9..19f8cb3877b323a1cac80b1b87fac421861b4e93 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1166,7 +1166,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } test("some show commands are not supported") { - assertUnsupportedFeature { sql("SHOW CREATE TABLE my_table") } assertUnsupportedFeature { sql("SHOW COMPACTIONS") } assertUnsupportedFeature { sql("SHOW TRANSACTIONS") } assertUnsupportedFeature { sql("SHOW INDEXES ON my_table") }