diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 085bdaff4e03b55da2759f452b257135cf16ee24..0b0b6185c7c721e44d6c04f363c74c92960710b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal -import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types._ @@ -457,7 +455,6 @@ case class AlterTableSetLocation( } Seq.empty[Row] } - } @@ -489,9 +486,83 @@ private[sql] object DDLUtils { case _ => }) } + def isTablePartitioned(table: CatalogTable): Boolean = { - table.partitionColumns.size > 0 || + table.partitionColumns.nonEmpty || table.properties.contains("spark.sql.sources.schema.numPartCols") } -} + def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + getSchemaFromTableProperties(metadata.properties) + } + + // A persisted data source table may not store its schema in the catalog. In this case, its schema + // will be inferred at runtime when the table is referenced. + def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = { + require(isDatasourceTable(props)) + + val schemaParts = for { + numParts <- props.get("spark.sql.sources.schema.numParts").toSeq + index <- 0 until numParts.toInt + } yield props.getOrElse( + s"spark.sql.sources.schema.part.$index", + throw new AnalysisException( + s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing." + ) + ) + + if (schemaParts.isEmpty) { + None + } else { + Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType]) + } + } + + private def getColumnNamesByTypeFromTableProperties( + props: Map[String, String], colType: String, typeName: String): Seq[String] = { + require(isDatasourceTable(props)) + + for { + numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq + index <- 0 until numCols.toInt + } yield props.getOrElse( + s"spark.sql.sources.schema.${colType}Col.$index", + throw new AnalysisException( + s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." + ) + ) + } + + def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { + getPartitionColumnsFromTableProperties(metadata.properties) + } + + def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { + getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns") + } + + def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = { + getNumBucketFromTableProperties(metadata.properties) + } + + def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = { + require(isDatasourceTable(props)) + props.get("spark.sql.sources.schema.numBuckets").map(_.toInt) + } + + def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { + getBucketingColumnsFromTableProperties(metadata.properties) + } + + def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { + getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns") + } + + def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { + getSortingColumnsFromTableProperties(metadata.properties) + } + + def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = { + getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 954dcca1a1d465ebb6dda10339c00988c4db067b..0f90715a90e1235f61d10761ab0c1c51617af3cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -309,12 +309,29 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF // Shows data columns and partitioned columns (if any) private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - describeSchema(table.schema, buffer) + if (DDLUtils.isDatasourceTable(table)) { + val schema = DDLUtils.getSchemaFromTableProperties(table) - if (table.partitionColumns.nonEmpty) { - append(buffer, "# Partition Information", "", "") - append(buffer, s"# ${output(0).name}", output(1).name, output(2).name) - describeSchema(table.partitionColumns, buffer) + if (schema.isEmpty) { + append(buffer, "# Schema of this table is inferred at runtime", "", "") + } else { + schema.foreach(describeSchema(_, buffer)) + } + + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table) + if (partCols.nonEmpty) { + append(buffer, "# Partition Information", "", "") + append(buffer, s"# ${output.head.name}", "", "") + partCols.foreach(col => append(buffer, col, "", "")) + } + } else { + describeSchema(table.schema, buffer) + + if (table.partitionColumns.nonEmpty) { + append(buffer, "# Partition Information", "", "") + append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) + describeSchema(table.partitionColumns, buffer) + } } } @@ -338,26 +355,47 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF append(buffer, "Table Type:", table.tableType.name, "") append(buffer, "Table Parameters:", "", "") - table.properties.foreach { case (key, value) => + table.properties.filterNot { + // Hides schema properties that hold user-defined schema, partition columns, and bucketing + // information since they are already extracted and shown in other parts. + case (key, _) => key.startsWith("spark.sql.sources.schema") + }.foreach { case (key, value) => append(buffer, s" $key", value, "") } + describeStorageInfo(table, buffer) + } + + private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { append(buffer, "", "", "") append(buffer, "# Storage Information", "", "") - table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) - table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) - table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) - append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "") - append(buffer, "Num Buckets:", table.numBuckets.toString, "") - append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "") + metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) + metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) + metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) + append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "") + describeBucketingInfo(metadata, buffer) append(buffer, "Storage Desc Parameters:", "", "") - table.storage.serdeProperties.foreach { case (key, value) => + metadata.storage.serdeProperties.foreach { case (key, value) => append(buffer, s" $key", value, "") } } + private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { + if (DDLUtils.isDatasourceTable(metadata)) { + val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata) + val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata) + val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata) + append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "") + append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "") + } else { + append(buffer, "Num Buckets:", metadata.numBuckets.toString, "") + append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "") + } + } + private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a8ba952b497ea9f90330bb4943a5ef70fed4109b..0f23949d98da0ef035889478887e1e68d9e525df 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach -import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -531,4 +531,58 @@ class HiveDDLSuite .exists(_.getString(0) == "# Detailed Table Information")) } } + + test("desc table for data source table - no user-defined schema") { + withTable("t1") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext.range(1).write.parquet(path) + sql(s"CREATE TABLE t1 USING parquet OPTIONS (PATH '$path')") + + val desc = sql("DESC FORMATTED t1").collect().toSeq + + assert(desc.contains(Row("# Schema of this table is inferred at runtime", "", ""))) + } + } + } + + test("desc table for data source table - partitioned bucketed table") { + withTable("t1") { + sqlContext + .range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write + .bucketBy(2, "b").sortBy("c").partitionBy("d") + .saveAsTable("t1") + + val formattedDesc = sql("DESC FORMATTED t1").collect() + + assert(formattedDesc.containsSlice( + Seq( + Row("a", "bigint", ""), + Row("b", "bigint", ""), + Row("c", "bigint", ""), + Row("d", "bigint", ""), + Row("# Partition Information", "", ""), + Row("# col_name", "", ""), + Row("d", "", ""), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Database:", "default", "") + ) + )) + + assert(formattedDesc.containsSlice( + Seq( + Row("Table Type:", "MANAGED", "") + ) + )) + + assert(formattedDesc.containsSlice( + Seq( + Row("Num Buckets:", "2", ""), + Row("Bucket Columns:", "[b]", ""), + Row("Sort Columns:", "[c]", "") + ) + )) + } + } }