Skip to content
Snippets Groups Projects
Commit 8a12580d authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-14127][SQL] "DESC <table>": Extracts schema information from table...

[SPARK-14127][SQL] "DESC <table>": Extracts schema information from table properties for data source tables

## What changes were proposed in this pull request?

This is a follow-up of #12934 and #12844. This PR adds a set of utility methods in `DDLUtils` to help extract schema information (user-defined schema, partition columns, and bucketing information) from data source table properties. These utility methods are then used in `DescribeTableCommand` to refine output for data source tables. Before this PR, the aforementioned schema information are only shown as table properties, which are hard to read.

Sample output:

```
+----------------------------+---------------------------------------------------------+-------+
|col_name                    |data_type                                                |comment|
+----------------------------+---------------------------------------------------------+-------+
|a                           |bigint                                                   |       |
|b                           |bigint                                                   |       |
|c                           |bigint                                                   |       |
|d                           |bigint                                                   |       |
|# Partition Information     |                                                         |       |
|# col_name                  |                                                         |       |
|d                           |                                                         |       |
|                            |                                                         |       |
|# Detailed Table Information|                                                         |       |
|Database:                   |default                                                  |       |
|Owner:                      |lian                                                     |       |
|Create Time:                |Tue May 10 03:20:34 PDT 2016                             |       |
|Last Access Time:           |Wed Dec 31 16:00:00 PST 1969                             |       |
|Location:                   |file:/Users/lian/local/src/spark/workspace-a/target/...  |       |
|Table Type:                 |MANAGED                                                  |       |
|Table Parameters:           |                                                         |       |
|  rawDataSize               |-1                                                       |       |
|  numFiles                  |1                                                        |       |
|  transient_lastDdlTime     |1462875634                                               |       |
|  totalSize                 |684                                                      |       |
|  spark.sql.sources.provider|parquet                                                  |       |
|  EXTERNAL                  |FALSE                                                    |       |
|  COLUMN_STATS_ACCURATE     |false                                                    |       |
|  numRows                   |-1                                                       |       |
|                            |                                                         |       |
|# Storage Information       |                                                         |       |
|SerDe Library:              |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       |       |
|InputFormat:                |org.apache.hadoop.mapred.SequenceFileInputFormat         |       |
|OutputFormat:               |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat|       |
|Compressed:                 |No                                                       |       |
|Num Buckets:                |2                                                        |       |
|Bucket Columns:             |[b]                                                      |       |
|Sort Columns:               |[c]                                                      |       |
|Storage Desc Parameters:    |                                                         |       |
|  path                      |file:/Users/lian/local/src/spark/workspace-a/target/...  |       |
|  serialization.format      |1                                                        |       |
+----------------------------+---------------------------------------------------------+-------+
```

## How was this patch tested?

Test cases are added in `HiveDDLSuite` to check command output.

Author: Cheng Lian <lian@databricks.com>

Closes #13025 from liancheng/spark-14127-extract-schema-info.
parent aab99d31
No related branches found
No related tags found
No related merge requests found
......@@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.types._
......@@ -457,7 +455,6 @@ case class AlterTableSetLocation(
}
Seq.empty[Row]
}
}
......@@ -489,9 +486,83 @@ private[sql] object DDLUtils {
case _ =>
})
}
def isTablePartitioned(table: CatalogTable): Boolean = {
table.partitionColumns.size > 0 ||
table.partitionColumns.nonEmpty ||
table.properties.contains("spark.sql.sources.schema.numPartCols")
}
}
def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
getSchemaFromTableProperties(metadata.properties)
}
// A persisted data source table may not store its schema in the catalog. In this case, its schema
// will be inferred at runtime when the table is referenced.
def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = {
require(isDatasourceTable(props))
val schemaParts = for {
numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
index <- 0 until numParts.toInt
} yield props.getOrElse(
s"spark.sql.sources.schema.part.$index",
throw new AnalysisException(
s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
)
)
if (schemaParts.isEmpty) {
None
} else {
Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType])
}
}
private def getColumnNamesByTypeFromTableProperties(
props: Map[String, String], colType: String, typeName: String): Seq[String] = {
require(isDatasourceTable(props))
for {
numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
index <- 0 until numCols.toInt
} yield props.getOrElse(
s"spark.sql.sources.schema.${colType}Col.$index",
throw new AnalysisException(
s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
)
)
}
def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
getPartitionColumnsFromTableProperties(metadata.properties)
}
def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns")
}
def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = {
getNumBucketFromTableProperties(metadata.properties)
}
def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = {
require(isDatasourceTable(props))
props.get("spark.sql.sources.schema.numBuckets").map(_.toInt)
}
def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
getBucketingColumnsFromTableProperties(metadata.properties)
}
def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns")
}
def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
getSortingColumnsFromTableProperties(metadata.properties)
}
def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns")
}
}
......@@ -309,12 +309,29 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
// Shows data columns and partitioned columns (if any)
private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(table.schema, buffer)
if (DDLUtils.isDatasourceTable(table)) {
val schema = DDLUtils.getSchemaFromTableProperties(table)
if (table.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
describeSchema(table.partitionColumns, buffer)
if (schema.isEmpty) {
append(buffer, "# Schema of this table is inferred at runtime", "", "")
} else {
schema.foreach(describeSchema(_, buffer))
}
val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table)
if (partCols.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", "", "")
partCols.foreach(col => append(buffer, col, "", ""))
}
} else {
describeSchema(table.schema, buffer)
if (table.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
describeSchema(table.partitionColumns, buffer)
}
}
}
......@@ -338,26 +355,47 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
append(buffer, "Table Type:", table.tableType.name, "")
append(buffer, "Table Parameters:", "", "")
table.properties.foreach { case (key, value) =>
table.properties.filterNot {
// Hides schema properties that hold user-defined schema, partition columns, and bucketing
// information since they are already extracted and shown in other parts.
case (key, _) => key.startsWith("spark.sql.sources.schema")
}.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
describeStorageInfo(table, buffer)
}
private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Storage Information", "", "")
table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "")
append(buffer, "Num Buckets:", table.numBuckets.toString, "")
append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "")
metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "")
describeBucketingInfo(metadata, buffer)
append(buffer, "Storage Desc Parameters:", "", "")
table.storage.serdeProperties.foreach { case (key, value) =>
metadata.storage.serdeProperties.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
}
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(metadata)) {
val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata)
val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata)
val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata)
append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "")
append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "")
} else {
append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "")
}
}
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
......
......@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
......@@ -531,4 +531,58 @@ class HiveDDLSuite
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
test("desc table for data source table - no user-defined schema") {
withTable("t1") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext.range(1).write.parquet(path)
sql(s"CREATE TABLE t1 USING parquet OPTIONS (PATH '$path')")
val desc = sql("DESC FORMATTED t1").collect().toSeq
assert(desc.contains(Row("# Schema of this table is inferred at runtime", "", "")))
}
}
}
test("desc table for data source table - partitioned bucketed table") {
withTable("t1") {
sqlContext
.range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
.bucketBy(2, "b").sortBy("c").partitionBy("d")
.saveAsTable("t1")
val formattedDesc = sql("DESC FORMATTED t1").collect()
assert(formattedDesc.containsSlice(
Seq(
Row("a", "bigint", ""),
Row("b", "bigint", ""),
Row("c", "bigint", ""),
Row("d", "bigint", ""),
Row("# Partition Information", "", ""),
Row("# col_name", "", ""),
Row("d", "", ""),
Row("", "", ""),
Row("# Detailed Table Information", "", ""),
Row("Database:", "default", "")
)
))
assert(formattedDesc.containsSlice(
Seq(
Row("Table Type:", "MANAGED", "")
)
))
assert(formattedDesc.containsSlice(
Seq(
Row("Num Buckets:", "2", ""),
Row("Bucket Columns:", "[b]", ""),
Row("Sort Columns:", "[c]", "")
)
))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment