From 23ea8980809497d0372084adf5936602655e1685 Mon Sep 17 00:00:00 2001 From: Masha Basmanova <mbasmanova@fb.com> Date: Fri, 18 Aug 2017 09:54:39 -0700 Subject: [PATCH] [SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes ## What changes were proposed in this pull request? Added support for ANALYZE TABLE [db_name].tablename PARTITION (partcol1[=val1], partcol2[=val2], ...) COMPUTE STATISTICS [NOSCAN] SQL command to calculate total number of rows and size in bytes for a subset of partitions. Calculated statistics are stored in Hive Metastore as user-defined properties attached to partition objects. Property names are the same as the ones used to store table-level statistics: spark.sql.statistics.totalSize and spark.sql.statistics.numRows. When partition specification contains all partition columns with values, the command collects statistics for a single partition that matches the specification. When some partition columns are missing or listed without their values, the command collects statistics for all partitions which match a subset of partition column values specified. For example, table t has 4 partitions with the following specs: * Partition1: (ds='2008-04-08', hr=11) * Partition2: (ds='2008-04-08', hr=12) * Partition3: (ds='2008-04-09', hr=11) * Partition4: (ds='2008-04-09', hr=12) 'ANALYZE TABLE t PARTITION (ds='2008-04-09', hr=11)' command will collect statistics only for partition 3. 'ANALYZE TABLE t PARTITION (ds='2008-04-09')' command will collect statistics for partitions 3 and 4. 'ANALYZE TABLE t PARTITION (ds, hr)' command will collect statistics for all four partitions. When the optional parameter NOSCAN is specified, the command doesn't count number of rows and only gathers size in bytes. The statistics gathered by ANALYZE TABLE command can be fetched using DESC EXTENDED [db_name.]tablename PARTITION command. ## How was this patch tested? Added tests. Author: Masha Basmanova <mbasmanova@fb.com> Closes #18421 from mbasmanova/mbasmanova-analyze-partition. --- .../sql/catalyst/catalog/interface.scala | 7 +- .../spark/sql/execution/SparkSqlParser.scala | 36 ++- .../command/AnalyzePartitionCommand.scala | 149 ++++++++++ .../command/AnalyzeTableCommand.scala | 28 +- .../sql/execution/command/CommandUtils.scala | 27 +- .../inputs/describe-part-after-analyze.sql | 34 +++ .../describe-part-after-analyze.sql.out | 244 +++++++++++++++++ .../sql/execution/SparkSqlParserSuite.scala | 33 ++- .../spark/sql/hive/HiveExternalCatalog.scala | 169 ++++++++---- .../sql/hive/client/HiveClientImpl.scala | 2 + .../spark/sql/hive/StatisticsSuite.scala | 254 ++++++++++++++++++ 11 files changed, 888 insertions(+), 95 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala create mode 100644 sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5a8c4e7610..1965144e81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,12 +91,14 @@ object CatalogStorageFormat { * * @param spec partition spec values indexed by column name * @param storage storage format of the partition - * @param parameters some parameters for the partition, for example, stats. + * @param parameters some parameters for the partition + * @param stats optional statistics (number of rows, total size, etc.) */ case class CatalogTablePartition( spec: CatalogTypes.TablePartitionSpec, storage: CatalogStorageFormat, - parameters: Map[String, String] = Map.empty) { + parameters: Map[String, String] = Map.empty, + stats: Option[CatalogStatistics] = None) { def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() @@ -106,6 +108,7 @@ case class CatalogTablePartition( if (parameters.nonEmpty) { map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") } + stats.foreach(s => map.put("Partition Statistics", s.simpleString)) map } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d4414b6f78..8379e740a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } /** - * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. - * Example SQL for analyzing table : + * Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]] + * or an [[AnalyzeColumnCommand]] command. + * Example SQL for analyzing a table or a set of partitions : * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; + * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)] + * COMPUTE STATISTICS [NOSCAN]; * }}} + * * Example SQL for analyzing columns : * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2; + * ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2; * }}} */ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { - if (ctx.partitionSpec != null) { - logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}") + if (ctx.identifier != null && + ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") { + throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) } - if (ctx.identifier != null) { - if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") { - throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) + + val table = visitTableIdentifier(ctx.tableIdentifier) + if (ctx.identifierSeq() == null) { + if (ctx.partitionSpec != null) { + AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec), + noscan = ctx.identifier != null) + } else { + AnalyzeTableCommand(table, noscan = ctx.identifier != null) } - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier)) - } else if (ctx.identifierSeq() == null) { - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false) } else { + if (ctx.partitionSpec != null) { + logWarning("Partition specification is ignored when collecting column statistics: " + + ctx.partitionSpec.getText) + } AnalyzeColumnCommand( - visitTableIdentifier(ctx.tableIdentifier), + table, visitIdentifierSeq(ctx.identifierSeq())) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala new file mode 100644 index 0000000000..5b54b2270b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} +import org.apache.spark.sql.execution.datasources.PartitioningUtils + +/** + * Analyzes a given set of partitions to generate per-partition statistics, which will be used in + * query optimizations. + * + * When `partitionSpec` is empty, statistics for all partitions are collected and stored in + * Metastore. + * + * When `partitionSpec` mentions only some of the partition columns, all partitions with + * matching values for specified columns are processed. + * + * If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised. + * + * By default, total number of rows and total size in bytes are calculated. When `noscan` + * is `true`, only total size in bytes is computed. + */ +case class AnalyzePartitionCommand( + tableIdent: TableIdentifier, + partitionSpec: Map[String, Option[String]], + noscan: Boolean = true) extends RunnableCommand { + + private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { + val normalizedPartitionSpec = + PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames, + table.identifier.quotedString, conf.resolver) + + // Report an error if partition columns in partition specification do not form + // a prefix of the list of partition columns defined in the table schema + val isNotSpecified = + table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty) + if (isNotSpecified.init.zip(isNotSpecified.tail).contains((true, false))) { + val tableId = table.identifier + val schemaColumns = table.partitionColumnNames.mkString(",") + val specColumns = normalizedPartitionSpec.keys.mkString(",") + throw new AnalysisException("The list of partition columns with values " + + s"in partition specification for table '${tableId.table}' " + + s"in database '${tableId.database.get}' is not a prefix of the list of " + + "partition columns defined in the table schema. " + + s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].") + } + + val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get) + if (filteredSpec.isEmpty) { + None + } else { + Some(filteredSpec) + } + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val sessionState = sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") + } + + val partitionValueSpec = getPartitionSpec(tableMeta) + + val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec) + + if (partitions.isEmpty) { + if (partitionValueSpec.isDefined) { + throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get) + } else { + // the user requested to analyze all partitions for a table which has no partitions + // return normally, since there is nothing to do + return Seq.empty[Row] + } + } + + // Compute statistics for individual partitions + val rowCounts: Map[TablePartitionSpec, BigInt] = + if (noscan) { + Map.empty + } else { + calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec) + } + + // Update the metastore if newly computed statistics are different from those + // recorded in the metastore. + val newPartitions = partitions.flatMap { p => + val newTotalSize = CommandUtils.calculateLocationSize( + sessionState, tableMeta.identifier, p.storage.locationUri) + val newRowCount = rowCounts.get(p.spec) + val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) + newStats.map(_ => p.copy(stats = newStats)) + } + + if (newPartitions.nonEmpty) { + sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions) + } + + Seq.empty[Row] + } + + private def calculateRowCountsPerPartition( + sparkSession: SparkSession, + tableMeta: CatalogTable, + partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = { + val filter = if (partitionValueSpec.isDefined) { + val filters = partitionValueSpec.get.map { + case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) + } + filters.reduce(And) + } else { + Literal.TrueLiteral + } + + val tableDf = sparkSession.table(tableMeta.identifier) + val partitionColumns = tableMeta.partitionColumnNames.map(Column(_)) + + val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count() + + df.collect().map { r => + val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) + val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap + val count = BigInt(r.getLong(partitionColumns.size)) + (spec, count) + }.toMap + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index cba147c35d..04715bd314 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType /** @@ -37,31 +37,15 @@ case class AnalyzeTableCommand( if (tableMeta.tableType == CatalogTableType.VIEW) { throw new AnalysisException("ANALYZE TABLE is not supported on views.") } + + // Compute stats for the whole table val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) + val newRowCount = + if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count())) - val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L) - val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) - var newStats: Option[CatalogStatistics] = None - if (newTotalSize >= 0 && newTotalSize != oldTotalSize) { - newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) - } - // We only set rowCount when noscan is false, because otherwise: - // 1. when total size is not changed, we don't need to alter the table; - // 2. when total size is changed, `oldRowCount` becomes invalid. - // This is to make sure that we only record the right statistics. - if (!noscan) { - val newRowCount = sparkSession.table(tableIdentWithDB).count() - if (newRowCount >= 0 && newRowCount != oldRowCount) { - newStats = if (newStats.isDefined) { - newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) - } else { - Some(CatalogStatistics( - sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) - } - } - } // Update the metastore if the above statistics of the table are different from those // recorded in the metastore. + val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) if (newStats.isDefined) { sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) // Refresh the cached data source table in the catalog. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index de45be8522..b22958d593 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.internal.SessionState @@ -112,4 +112,29 @@ object CommandUtils extends Logging { size } + def compareAndGetNewStats( + oldStats: Option[CatalogStatistics], + newTotalSize: BigInt, + newRowCount: Option[BigInt]): Option[CatalogStatistics] = { + val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(-1L) + val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + var newStats: Option[CatalogStatistics] = None + if (newTotalSize >= 0 && newTotalSize != oldTotalSize) { + newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) + } + // We only set rowCount when noscan is false, because otherwise: + // 1. when total size is not changed, we don't need to alter the table; + // 2. when total size is changed, `oldRowCount` becomes invalid. + // This is to make sure that we only record the right statistics. + if (newRowCount.isDefined) { + if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) { + newStats = if (newStats.isDefined) { + newStats.map(_.copy(rowCount = newRowCount)) + } else { + Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount)) + } + } + } + newStats + } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql new file mode 100644 index 0000000000..f4239da906 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql @@ -0,0 +1,34 @@ +CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet + PARTITIONED BY (ds, hr); + +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10) +VALUES ('k1', 100), ('k2', 200), ('k3', 300); + +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11) +VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401); + +INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5) +VALUES ('k1', 102), ('k2', 202); + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); + +-- Collect stats for a single partition +ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS; + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); + +-- Collect stats for 2 partitions +ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS; + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11); + +-- Collect stats for all partitions +ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS; + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11); +DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5); + +-- DROP TEST TABLES/VIEWS +DROP TABLE t; diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out new file mode 100644 index 0000000000..51dac11102 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out @@ -0,0 +1,244 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 15 + + +-- !query 0 +CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet + PARTITIONED BY (ds, hr) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10) +VALUES ('k1', 100), ('k2', 200), ('k3', 300) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11) +VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401) +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5) +VALUES ('k1', 102), ('k2', 202) +-- !query 3 schema +struct<> +-- !query 3 output + + + +-- !query 4 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 4 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 4 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 5 +ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS +-- !query 5 schema +struct<> +-- !query 5 output + + + +-- !query 6 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 6 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 6 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 +Partition Statistics 1067 bytes, 3 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 7 +ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS +-- !query 7 schema +struct<> +-- !query 7 output + + + +-- !query 8 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 8 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 8 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 +Partition Statistics 1067 bytes, 3 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 9 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11) +-- !query 9 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 9 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=11] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11 +Partition Statistics 1080 bytes, 4 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 10 +ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS +-- !query 10 schema +struct<> +-- !query 10 output + + + +-- !query 11 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 11 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 11 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 +Partition Statistics 1067 bytes, 3 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 12 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11) +-- !query 12 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 12 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=11] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11 +Partition Statistics 1080 bytes, 4 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 13 +DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5) +-- !query 13 schema +struct<col_name:string,data_type:string,comment:string> +-- !query 13 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-09-01, hr=5] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5 +Partition Statistics 1054 bytes, 2 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 14 +DROP TABLE t +-- !query 14 schema +struct<> +-- !query 14 output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index d238c76fbe..fa7a866f4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -259,17 +259,33 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("analyze table t compute statistics noscan", AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) assertEqual("analyze table t partition (a) compute statistics nOscAn", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true)) - // Partitions specified - we currently parse them but don't do anything with it + // Partitions specified assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")))) + assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan", + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> Some("2008-04-09")))) + assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS", + AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None))) + assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan", + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None))) + assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan", + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> None, "hr" -> Some("11")))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Map("ds" -> None, "hr" -> None))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> None, "hr" -> None))) intercept("analyze table t compute statistics xxxx", "Expected `NOSCAN` instead of `xxxx`") @@ -282,6 +298,11 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value", AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value"))) + + // Partition specified - should be ignored + assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " + + "COMPUTE STATISTICS FOR COLUMNS key, value", + AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value"))) } test("query organization") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e9d48f95aa..547447b31f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -639,26 +639,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireTableExists(db, table) val rawTable = getRawTable(db, table) - // convert table statistics to properties so that we can persist them through hive client - val statsProperties = new mutable.HashMap[String, String]() - if (stats.isDefined) { - statsProperties += STATISTICS_TOTAL_SIZE -> stats.get.sizeInBytes.toString() - if (stats.get.rowCount.isDefined) { - statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString() - } - - // For datasource tables and hive serde tables created by spark 2.1 or higher, - // the data schema is stored in the table properties. - val schema = restoreTableMetadata(rawTable).schema + // For datasource tables and hive serde tables created by spark 2.1 or higher, + // the data schema is stored in the table properties. + val schema = restoreTableMetadata(rawTable).schema - val colNameTypeMap: Map[String, DataType] = - schema.fields.map(f => (f.name, f.dataType)).toMap - stats.get.colStats.foreach { case (colName, colStat) => - colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => - statsProperties += (columnStatKeyPropName(colName, k) -> v) - } + // convert table statistics to properties so that we can persist them through hive client + var statsProperties = + if (stats.isDefined) { + statsToProperties(stats.get, schema) + } else { + new mutable.HashMap[String, String]() } - } val oldTableNonStatsProps = rawTable.properties.filterNot(_._1.startsWith(STATISTICS_PREFIX)) val updatedTable = rawTable.copy(properties = oldTableNonStatsProps ++ statsProperties) @@ -704,36 +695,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val version: String = table.properties.getOrElse(CREATED_SPARK_VERSION, "2.2 or prior") // Restore Spark's statistics from information in Metastore. - val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) - - // Currently we have two sources of statistics: one from Hive and the other from Spark. - // In our design, if Spark's statistics is available, we respect it over Hive's statistics. - if (statsProps.nonEmpty) { - val colStats = new mutable.HashMap[String, ColumnStat] - - // For each column, recover its column stats. Note that this is currently a O(n^2) operation, - // but given the number of columns it usually not enormous, this is probably OK as a start. - // If we want to map this a linear operation, we'd need a stronger contract between the - // naming convention used for serialization. - table.schema.foreach { field => - if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { - // If "version" field is defined, then the column stat is defined. - val keyPrefix = columnStatKeyPropName(field.name, "") - val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => - (k.drop(keyPrefix.length), v) - } - - ColumnStat.fromMap(table.identifier.table, field, colStatMap).foreach { - colStat => colStats += field.name -> colStat - } - } - } - - table = table.copy( - stats = Some(CatalogStatistics( - sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)), - rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), - colStats = colStats.toMap))) + val restoredStats = + statsFromProperties(table.properties, table.identifier.table, table.schema) + if (restoredStats.isDefined) { + table = table.copy(stats = restoredStats) } // Get the original table properties as defined by the user. @@ -1037,17 +1002,92 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat currentFullPath } + private def statsToProperties( + stats: CatalogStatistics, + schema: StructType): Map[String, String] = { + + var statsProperties: Map[String, String] = + Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) + if (stats.rowCount.isDefined) { + statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() + } + + val colNameTypeMap: Map[String, DataType] = + schema.fields.map(f => (f.name, f.dataType)).toMap + stats.colStats.foreach { case (colName, colStat) => + colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => + statsProperties += (columnStatKeyPropName(colName, k) -> v) + } + } + + statsProperties + } + + private def statsFromProperties( + properties: Map[String, String], + table: String, + schema: StructType): Option[CatalogStatistics] = { + + val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) + if (statsProps.isEmpty) { + None + } else { + + val colStats = new mutable.HashMap[String, ColumnStat] + + // For each column, recover its column stats. Note that this is currently a O(n^2) operation, + // but given the number of columns it usually not enormous, this is probably OK as a start. + // If we want to map this a linear operation, we'd need a stronger contract between the + // naming convention used for serialization. + schema.foreach { field => + if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { + // If "version" field is defined, then the column stat is defined. + val keyPrefix = columnStatKeyPropName(field.name, "") + val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => + (k.drop(keyPrefix.length), v) + } + + ColumnStat.fromMap(table, field, colStatMap).foreach { + colStat => colStats += field.name -> colStat + } + } + } + + Some(CatalogStatistics( + sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)), + rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)), + colStats = colStats.toMap)) + } + } + override def alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withClient { val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + + val rawTable = getRawTable(db, table) + + // For datasource tables and hive serde tables created by spark 2.1 or higher, + // the data schema is stored in the table properties. + val schema = restoreTableMetadata(rawTable).schema + + // convert partition statistics to properties so that we can persist them through hive api + val withStatsProps = lowerCasedParts.map(p => { + if (p.stats.isDefined) { + val statsProperties = statsToProperties(p.stats.get, schema) + p.copy(parameters = p.parameters ++ statsProperties) + } else { + p + } + }) + // Note: Before altering table partitions in Hive, you *must* set the current database // to the one that contains the table of interest. Otherwise you will end up with the // most helpful error message ever: "Unable to alter partition. alter is not possible." // See HIVE-2742 for more detail. client.setCurrentDatabase(db) - client.alterPartitions(db, table, lowerCasedParts) + client.alterPartitions(db, table, withStatsProps) } override def getPartition( @@ -1055,7 +1095,34 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, spec: TablePartitionSpec): CatalogTablePartition = withClient { val part = client.getPartition(db, table, lowerCasePartitionSpec(spec)) - part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) + restorePartitionMetadata(part, getTable(db, table)) + } + + /** + * Restores partition metadata from the partition properties. + * + * Reads partition-level statistics from partition properties, puts these + * into [[CatalogTablePartition#stats]] and removes these special entries + * from the partition properties. + */ + private def restorePartitionMetadata( + partition: CatalogTablePartition, + table: CatalogTable): CatalogTablePartition = { + val restoredSpec = restorePartitionSpec(partition.spec, table.partitionColumnNames) + + // Restore Spark's statistics from information in Metastore. + // Note: partition-level statistics were introduced in 2.3. + val restoredStats = + statsFromProperties(partition.parameters, table.identifier.table, table.schema) + if (restoredStats.isDefined) { + partition.copy( + spec = restoredSpec, + stats = restoredStats, + parameters = partition.parameters.filterNot { + case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) + } else { + partition.copy(spec = restoredSpec) + } } /** @@ -1066,7 +1133,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part => - part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) + restorePartitionMetadata(part, getTable(db, table)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5e5c0a2a50..995280e0e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -21,6 +21,7 @@ import java.io.{File, PrintStream} import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration @@ -960,6 +961,7 @@ private[hive] object HiveClientImpl { tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) + tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava) new HivePartition(ht, tpart) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 71cf79c473..dc6140756d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.StringUtils @@ -256,6 +257,259 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze single partition") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String): CatalogStatistics = { + val partition = + spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds)) + partition.stats.get + } + + def createPartition(ds: String, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query") + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + createPartition("2010-01-01", "SELECT '1', 'A' from src") + createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") + createPartition("2010-01-03", "SELECT '1', 'A' from src") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") + + assert(queryStats("2010-01-01").rowCount === None) + assert(queryStats("2010-01-01").sizeInBytes === 2000) + + assert(queryStats("2010-01-02").rowCount === None) + assert(queryStats("2010-01-02").sizeInBytes === 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") + + assert(queryStats("2010-01-01").rowCount.get === 500) + assert(queryStats("2010-01-01").sizeInBytes === 2000) + + assert(queryStats("2010-01-02").rowCount.get === 2*500) + assert(queryStats("2010-01-02").sizeInBytes === 2*2000) + } + } + + test("analyze a set of partitions") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String, hr: String): Option[CatalogStatistics] = { + val tableId = TableIdentifier(tableName) + val partition = + spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr)) + partition.stats + } + + def assertPartitionStats( + ds: String, + hr: String, + rowCount: Option[BigInt], + sizeInBytes: BigInt): Unit = { + val stats = queryStats(ds, hr).get + assert(stats.rowCount === rowCount) + assert(stats.sizeInBytes === sizeInBytes) + } + + def createPartition(ds: String, hr: Int, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + + createPartition("2010-01-01", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-01", 11, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 11, + "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") + + assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assert(queryStats("2010-01-02", "10") === None) + assert(queryStats("2010-01-02", "11") === None) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") + + assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") + + assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") + + assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000) + } + } + + test("analyze all partitions") { + val tableName = "analyzeTable_part" + + def assertPartitionStats( + ds: String, + hr: String, + rowCount: Option[BigInt], + sizeInBytes: BigInt): Unit = { + val stats = spark.sessionState.catalog.getPartition(TableIdentifier(tableName), + Map("ds" -> ds, "hr" -> hr)).stats.get + assert(stats.rowCount === rowCount) + assert(stats.sizeInBytes === sizeInBytes) + } + + def createPartition(ds: String, hr: Int, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + + createPartition("2010-01-01", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-01", 11, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 11, + "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS NOSCAN") + + assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS") + + assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000) + } + } + + test("analyze partitions for an empty table") { + val tableName = "analyzeTable_part" + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + // make sure there is no exception + sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS NOSCAN") + + // make sure there is no exception + sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS") + } + } + + test("analyze partitions case sensitivity") { + val tableName = "analyzeTable_part" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS") + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val message = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS") + }.getMessage + assert(message.contains( + s"DS is not a valid partition column in table `default`.`${tableName.toLowerCase}`")) + } + } + } + + test("analyze partial partition specifications") { + + val tableName = "analyzeTable_part" + + def assertAnalysisException(partitionSpec: String): Unit = { + val message = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $tableName $partitionSpec COMPUTE STATISTICS") + }.getMessage + assert(message.contains("The list of partition columns with values " + + s"in partition specification for table '${tableName.toLowerCase}' in database 'default' " + + "is not a prefix of the list of partition columns defined in the table schema")) + } + + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName (key STRING, value STRING) + |PARTITIONED BY (a STRING, b INT, c STRING) + """.stripMargin) + + sql(s"INSERT INTO TABLE $tableName PARTITION (a='a1', b=10, c='c1') SELECT * FROM src") + + sql(s"ANALYZE TABLE $tableName PARTITION (a='a1') COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (a='a1', b=10) COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (A='a1', b=10) COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (b=10, a='a1') COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (b=10, A='a1') COMPUTE STATISTICS") + + assertAnalysisException("PARTITION (b=10)") + assertAnalysisException("PARTITION (a, b=10)") + assertAnalysisException("PARTITION (b=10, c='c1')") + assertAnalysisException("PARTITION (a, b=10, c='c1')") + assertAnalysisException("PARTITION (c='c1')") + assertAnalysisException("PARTITION (a, b, c='c1')") + assertAnalysisException("PARTITION (a='a1', c='c1')") + assertAnalysisException("PARTITION (a='a1', b, c='c1')") + } + } + + test("analyze non-existent partition") { + + def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = { + val message = intercept[AnalysisException] { + sql(analyzeCommand) + }.getMessage + assert(message.contains(errorMessage)) + } + + val tableName = "analyzeTable_part" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") + + assertAnalysisException( + s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS", + s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`" + ) + + assertAnalysisException( + s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS", + s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`" + ) + + intercept[NoSuchPartitionException] { + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS") + } + } + } + test("test table-level statistics for hive tables created in HiveExternalCatalog") { val textTable = "textTable" withTable(textTable) { -- GitLab