diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index d9392de37a8158d7ddde272cb8f1d2efcf3d3103..843ced7f0e6971b8170ecc70f4526aec3d828f82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -17,19 +17,12 @@ package org.apache.spark.sql -import java.{lang => jl} -import java.sql.{Date, Timestamp} - import scala.collection.mutable -import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData.ArrayData import org.apache.spark.sql.types._ @@ -58,6 +51,37 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("analyzing views is not supported") { + def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { + val err = intercept[AnalysisException] { + sql(analyzeCommand) + } + assert(err.message.contains("ANALYZE TABLE is not supported")) + } + + val tableName = "tbl" + withTable(tableName) { + spark.range(10).write.saveAsTable(tableName) + val viewName = "view" + withView(viewName) { + sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") + assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + } + } + } + + test("statistics collection of a table with zero column") { + val table_no_cols = "table_no_cols" + withTable(table_no_cols) { + val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty) + val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) + dfNoCols.write.format("json").saveAsTable(table_no_cols) + sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS") + checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10)) + } + } + test("analyze column command - unsupported types and invalid columns") { val tableName = "column_stats_test1" withTable(tableName) { @@ -239,154 +263,3 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } - - -/** - * The base for test cases that we want to include in both the hive module (for verifying behavior - * when using the Hive external catalog) as well as in the sql/core module. - */ -abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils { - import testImplicits._ - - private val dec1 = new java.math.BigDecimal("1.000000000000000000") - private val dec2 = new java.math.BigDecimal("8.000000000000000000") - private val d1 = Date.valueOf("2016-05-08") - private val d2 = Date.valueOf("2016-05-09") - private val t1 = Timestamp.valueOf("2016-05-08 00:00:01") - private val t2 = Timestamp.valueOf("2016-05-09 00:00:02") - - /** - * Define a very simple 3 row table used for testing column serialization. - * Note: last column is seq[int] which doesn't support stats collection. - */ - protected val data = Seq[ - (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long, - jl.Double, jl.Float, java.math.BigDecimal, - String, Array[Byte], Date, Timestamp, - Seq[Int])]( - (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null), - (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null), - (null, null, null, null, null, null, null, null, null, null, null, null, null) - ) - - /** A mapping from column to the stats collected. */ - protected val stats = mutable.LinkedHashMap( - "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1), - "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1), - "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2), - "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4), - "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8), - "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8), - "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4), - "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16), - "cstring" -> ColumnStat(2, None, None, 1, 3, 3), - "cbinary" -> ColumnStat(2, None, None, 1, 3, 3), - "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)), - Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4), - "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)), - Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8) - ) - - private val randomName = new Random(31) - - def checkTableStats( - tableName: String, - hasSizeInBytes: Boolean, - expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { - val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats - if (hasSizeInBytes || expectedRowCounts.nonEmpty) { - assert(stats.isDefined) - assert(stats.get.sizeInBytes >= 0) - assert(stats.get.rowCount === expectedRowCounts) - } else { - assert(stats.isEmpty) - } - - stats - } - - /** - * Compute column stats for the given DataFrame and compare it with colStats. - */ - def checkColStats( - df: DataFrame, - colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = { - val tableName = "column_stats_test_" + randomName.nextInt(1000) - withTable(tableName) { - df.write.saveAsTable(tableName) - - // Collect statistics - sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + - colStats.keys.mkString(", ")) - - // Validate statistics - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) - assert(table.stats.isDefined) - assert(table.stats.get.colStats.size == colStats.size) - - colStats.foreach { case (k, v) => - withClue(s"column $k") { - assert(table.stats.get.colStats(k) == v) - } - } - } - } - - // This test will be run twice: with and without Hive support - test("SPARK-18856: non-empty partitioned table should not report zero size") { - withTable("ds_tbl", "hive_tbl") { - spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl") - val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats - assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") - - if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { - sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)") - sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1") - val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats - assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") - } - } - } - - // This test will be run twice: with and without Hive support - test("conversion from CatalogStatistics to Statistics") { - withTable("ds_tbl", "hive_tbl") { - // Test data source table - checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true) - // Test hive serde table - if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { - checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false) - } - } - } - - private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = { - // Create an empty table and run analyze command on it. - val createTableSql = if (isDatasourceTable) { - s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET" - } else { - s"CREATE TABLE $tableName (c1 INT, c2 STRING)" - } - sql(createTableSql) - // Analyze only one column. - sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") - val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { - case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta) - case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) - }.head - val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) - // Check catalog statistics - assert(catalogTable.stats.isDefined) - assert(catalogTable.stats.get.sizeInBytes == 0) - assert(catalogTable.stats.get.rowCount == Some(0)) - assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat)) - - // Check relation statistics - assert(relation.stats.sizeInBytes == 0) - assert(relation.stats.rowCount == Some(0)) - assert(relation.stats.attributeStats.size == 1) - val (attribute, colStat) = relation.stats.attributeStats.head - assert(attribute.name == "c1") - assert(colStat == emptyColStat) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala new file mode 100644 index 0000000000000000000000000000000000000000..41569762d3c598b67a49b74c011234d410955c52 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.{lang => jl} +import java.sql.{Date, Timestamp} + +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.Decimal + + +/** + * The base for statistics test cases that we want to include in both the hive module (for + * verifying behavior when using the Hive external catalog) as well as in the sql/core module. + */ +abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils { + import testImplicits._ + + private val dec1 = new java.math.BigDecimal("1.000000000000000000") + private val dec2 = new java.math.BigDecimal("8.000000000000000000") + private val d1 = Date.valueOf("2016-05-08") + private val d2 = Date.valueOf("2016-05-09") + private val t1 = Timestamp.valueOf("2016-05-08 00:00:01") + private val t2 = Timestamp.valueOf("2016-05-09 00:00:02") + + /** + * Define a very simple 3 row table used for testing column serialization. + * Note: last column is seq[int] which doesn't support stats collection. + */ + protected val data = Seq[ + (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long, + jl.Double, jl.Float, java.math.BigDecimal, + String, Array[Byte], Date, Timestamp, + Seq[Int])]( + (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null), + (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null), + (null, null, null, null, null, null, null, null, null, null, null, null, null) + ) + + /** A mapping from column to the stats collected. */ + protected val stats = mutable.LinkedHashMap( + "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1), + "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1), + "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2), + "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4), + "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8), + "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8), + "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4), + "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16), + "cstring" -> ColumnStat(2, None, None, 1, 3, 3), + "cbinary" -> ColumnStat(2, None, None, 1, 3, 3), + "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)), + Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4), + "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)), + Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8) + ) + + private val randomName = new Random(31) + + def getCatalogTable(tableName: String): CatalogTable = { + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + } + + def getCatalogStatistics(tableName: String): CatalogStatistics = { + getCatalogTable(tableName).stats.get + } + + def checkTableStats( + tableName: String, + hasSizeInBytes: Boolean, + expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { + val stats = getCatalogTable(tableName).stats + if (hasSizeInBytes || expectedRowCounts.nonEmpty) { + assert(stats.isDefined) + assert(stats.get.sizeInBytes >= 0) + assert(stats.get.rowCount === expectedRowCounts) + } else { + assert(stats.isEmpty) + } + + stats + } + + /** + * Compute column stats for the given DataFrame and compare it with colStats. + */ + def checkColStats( + df: DataFrame, + colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = { + val tableName = "column_stats_test_" + randomName.nextInt(1000) + withTable(tableName) { + df.write.saveAsTable(tableName) + + // Collect statistics + sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + + colStats.keys.mkString(", ")) + + // Validate statistics + val table = getCatalogTable(tableName) + assert(table.stats.isDefined) + assert(table.stats.get.colStats.size == colStats.size) + + colStats.foreach { case (k, v) => + withClue(s"column $k") { + assert(table.stats.get.colStats(k) == v) + } + } + } + } + + // This test will be run twice: with and without Hive support + test("SPARK-18856: non-empty partitioned table should not report zero size") { + withTable("ds_tbl", "hive_tbl") { + spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl") + val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats + assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") + + if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { + sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)") + sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1") + val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats + assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") + } + } + } + + // This test will be run twice: with and without Hive support + test("conversion from CatalogStatistics to Statistics") { + withTable("ds_tbl", "hive_tbl") { + // Test data source table + checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true) + // Test hive serde table + if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { + checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false) + } + } + } + + private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = { + // Create an empty table and run analyze command on it. + val createTableSql = if (isDatasourceTable) { + s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET" + } else { + s"CREATE TABLE $tableName (c1 INT, c2 STRING)" + } + sql(createTableSql) + // Analyze only one column. + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") + val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { + case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta) + case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) + }.head + val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) + // Check catalog statistics + assert(catalogTable.stats.isDefined) + assert(catalogTable.stats.get.sizeInBytes == 0) + assert(catalogTable.stats.get.rowCount == Some(0)) + assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat)) + + // Check relation statistics + assert(relation.stats.sizeInBytes == 0) + assert(relation.stats.rowCount == Some(0)) + assert(relation.stats.attributeStats.size == 1) + val (attribute, colStat) = relation.stats.attributeStats.head + assert(attribute.name == "c1") + assert(colStat == emptyColStat) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index c601038a2b0af51c2836bc210dae1a5cb9efb044..e00fa64e9f2ce46b8279b6c96825340038226702 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -25,7 +25,7 @@ import scala.util.matching.Regex import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types._ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { @@ -82,58 +81,42 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes // Non-partitioned table - sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() - sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() - sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() + val nonPartTable = "non_part_table" + withTable(nonPartTable) { + sql(s"CREATE TABLE $nonPartTable (key STRING, value STRING)") + sql(s"INSERT INTO TABLE $nonPartTable SELECT * FROM src") + sql(s"INSERT INTO TABLE $nonPartTable SELECT * FROM src") - sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan") + sql(s"ANALYZE TABLE $nonPartTable COMPUTE STATISTICS noscan") - assert(queryTotalSize("analyzeTable") === BigInt(11624)) - - sql("DROP TABLE analyzeTable").collect() + assert(queryTotalSize(nonPartTable) === BigInt(11624)) + } // Partitioned table - sql( - """ - |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED BY (ds STRING) - """.stripMargin).collect() - sql( - """ - |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01') - |SELECT * FROM src - """.stripMargin).collect() - sql( - """ - |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02') - |SELECT * FROM src - """.stripMargin).collect() - sql( - """ - |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03') - |SELECT * FROM src - """.stripMargin).collect() + val partTable = "part_table" + withTable(partTable) { + sql(s"CREATE TABLE $partTable (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-01') SELECT * FROM src") + sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-02') SELECT * FROM src") + sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-03') SELECT * FROM src") - assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes) + assert(queryTotalSize(partTable) === spark.sessionState.conf.defaultSizeInBytes) - sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") + sql(s"ANALYZE TABLE $partTable COMPUTE STATISTICS noscan") - assert(queryTotalSize("analyzeTable_part") === BigInt(17436)) - - sql("DROP TABLE analyzeTable_part").collect() + assert(queryTotalSize(partTable) === BigInt(17436)) + } // Try to analyze a temp table - sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable") - intercept[AnalysisException] { - sql("ANALYZE TABLE tempTable COMPUTE STATISTICS") + withView("tempTable") { + sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable") + intercept[AnalysisException] { + sql("ANALYZE TABLE tempTable COMPUTE STATISTICS") + } } - spark.sessionState.catalog.dropTable( - TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false) } test("SPARK-21079 - analyze table with location different than that of individual partitions") { - def queryTotalSize(tableName: String): BigInt = - spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes - val tableName = "analyzeTable_part" withTable(tableName) { withTempPath { path => @@ -148,15 +131,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") - assert(queryTotalSize(tableName) === BigInt(17436)) + assert(getCatalogStatistics(tableName).sizeInBytes === BigInt(17436)) } } } test("SPARK-21079 - analyze partitioned table with only a subset of partitions visible") { - def queryTotalSize(tableName: String): BigInt = - spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes - val sourceTableName = "analyzeTable_part" val tableName = "analyzeTable_part_vis" withTable(sourceTableName, tableName) { @@ -188,39 +168,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Register only one of the partitions found on disk val ds = partitionDates.head - sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')").collect() + sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')") // Analyze original table - expect 3 partitions sql(s"ANALYZE TABLE $sourceTableName COMPUTE STATISTICS noscan") - assert(queryTotalSize(sourceTableName) === BigInt(3 * 5812)) + assert(getCatalogStatistics(sourceTableName).sizeInBytes === BigInt(3 * 5812)) // Analyze partial-copy table - expect only 1 partition sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") - assert(queryTotalSize(tableName) === BigInt(5812)) + assert(getCatalogStatistics(tableName).sizeInBytes === BigInt(5812)) } } } - test("analyzing views is not supported") { - def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { - val err = intercept[AnalysisException] { - sql(analyzeCommand) - } - assert(err.message.contains("ANALYZE TABLE is not supported")) - } - - val tableName = "tbl" - withTable(tableName) { - spark.range(10).write.saveAsTable(tableName) - val viewName = "view" - withView(viewName) { - sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") - assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") - assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") - } - } - } - test("test table-level statistics for hive tables created in HiveExternalCatalog") { val textTable = "textTable" withTable(textTable) { @@ -290,8 +250,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE STATISTICS") val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") - val tableMetadata = - spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)).properties + val tableMetadata = getCatalogTable(tabName).properties // statistics info is not contained in the metadata of the original table assert(Seq(StatsSetupConst.COLUMN_STATS_ACCURATE, StatsSetupConst.NUM_FILES, @@ -327,8 +286,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val tabName = "tab1" withTable(tabName) { createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false) - checkTableStats( - tabName, hasSizeInBytes = true, expectedRowCounts = None) + checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None) // ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics // This is triggered by the Hive alterTable API @@ -370,10 +328,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("alter table should not have the side effect to store statistics in Spark side") { - def getCatalogTable(tableName: String): CatalogTable = { - spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) - } - val table = "alter_table_side_effect" withTable(table) { sql(s"CREATE TABLE $table (i string, j string)") @@ -637,12 +591,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it // for robustness - withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None) sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) } - withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") { + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { // We still can get tableSize from Hive before Analyze checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = None) sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS") @@ -759,8 +713,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val parquetTable = "parquetTable" withTable(parquetTable) { sql(createTableCmd) - val catalogTable = spark.sessionState.catalog.getTableMetadata( - TableIdentifier(parquetTable)) + val catalogTable = getCatalogTable(parquetTable) assert(DDLUtils.isDatasourceTable(catalogTable)) // Add a filter to avoid creating too many partitions @@ -795,17 +748,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto "partitioned data source table", "CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET PARTITIONED BY (key)") - test("statistics collection of a table with zero column") { - val table_no_cols = "table_no_cols" - withTable(table_no_cols) { - val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty) - val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) - dfNoCols.write.format("json").saveAsTable(table_no_cols) - sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS") - checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10)) - } - } - /** Used to test refreshing cached metadata once table stats are updated. */ private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean) : (CatalogStatistics, CatalogStatistics) = {