diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2a20651459d786e418b164b317ac0b97ad0b870d..710bce5da98112f9c6e07ff03d10557dba0758c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.util.quoteIdentifier /** @@ -109,6 +110,24 @@ case class CatalogTablePartition( storage: CatalogStorageFormat) +/** + * A container for bucketing information. + * Bucketing is a technology for decomposing data sets into more manageable parts, and the number + * of buckets is fixed so it does not fluctuate with data. + * + * @param numBuckets number of buckets. + * @param bucketColumnNames the names of the columns that used to generate the bucket id. + * @param sortColumnNames the names of the columns that used to sort data in each bucket. + */ +case class BucketSpec( + numBuckets: Int, + bucketColumnNames: Seq[String], + sortColumnNames: Seq[String]) { + if (numBuckets <= 0) { + throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") + } +} + /** * A table defined in the catalog. * @@ -124,9 +143,7 @@ case class CatalogTable( storage: CatalogStorageFormat, schema: Seq[CatalogColumn], partitionColumnNames: Seq[String] = Seq.empty, - sortColumnNames: Seq[String] = Seq.empty, - bucketColumnNames: Seq[String] = Seq.empty, - numBuckets: Int = -1, + bucketSpec: Option[BucketSpec] = None, owner: String = "", createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, @@ -143,8 +160,8 @@ case class CatalogTable( s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") } requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(sortColumnNames, "sort") - requireSubsetOfSchema(bucketColumnNames, "bucket") + requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") + requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") /** Columns this table is partitioned by. */ def partitionColumns: Seq[CatalogColumn] = @@ -172,9 +189,19 @@ case class CatalogTable( override def toString: String = { val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") - val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") - val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") - val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") + val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + val bucketStrings = bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => + val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") + Seq( + s"Num Buckets: $numBuckets", + if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "", + if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else "" + ) + + case _ => Nil + } val output = Seq(s"Table: ${identifier.quotedString}", @@ -183,10 +210,8 @@ case class CatalogTable( s"Last Access: ${new Date(lastAccessTime).toString}", s"Type: ${tableType.name}", if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "", - if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "", - if (numBuckets != -1) s"Num Buckets: $numBuckets" else "", - if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else "", - if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "", + if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "" + ) ++ bucketStrings ++ Seq( viewOriginalText.map("Original View: " + _).getOrElse(""), viewText.map("View: " + _).getOrElse(""), comment.map("Comment: " + _).getOrElse(""), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 5bb50cba53fb16921c2530823b51c1c3ac6590d4..3a0dcea903dbab5ac63aa91621cb16413f62071b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -692,7 +692,7 @@ abstract class CatalogTestUtils { CatalogColumn("a", "int"), CatalogColumn("b", "string")), partitionColumnNames = Seq("a", "b"), - bucketColumnNames = Seq("col1")) + bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil))) } def newFunc(name: String, database: Option[String] = None): CatalogFunction = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 753b64b983d99917f87f38cbca7002db4faeb11c..44189881ddd0c0e47ae8402193e61c78ef8f69b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,8 +23,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2a62b864a1c8cd990992e02b8de88a6596ea0087..03f81c46a81705c94f9d40086f7c2ed622b9de3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,12 +21,11 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index a62853b05f506c6a2fefc57e7115cab683823131..8f3adadbf3b5b4e84d713ea39680bd05a3a2c5f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -498,23 +498,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = { - append(buffer, "Num Buckets:", numBuckets.toString, "") - append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "") - append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "") + def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => + append(buffer, "Num Buckets:", numBuckets.toString, "") + append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "") + + case _ => } - DDLUtils.getBucketSpecFromTableProperties(metadata) match { - case Some(bucketSpec) => - appendBucketInfo( - bucketSpec.numBuckets, - bucketSpec.bucketColumnNames, - bucketSpec.sortColumnNames) - case None => - appendBucketInfo( - metadata.numBuckets, - metadata.bucketColumnNames, - metadata.sortColumnNames) + if (DDLUtils.isDatasourceTable(metadata)) { + appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata)) + } else { + appendBucketInfo(metadata.bucketSpec) } } @@ -808,7 +804,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") } - if (metadata.bucketColumnNames.nonEmpty) { + if (metadata.bucketSpec.isDefined) { throw new UnsupportedOperationException( "Creating Hive table with bucket spec is not supported yet.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala similarity index 72% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala index 961d035b768702ea3d97f457b9064581ea280ae4..377b818096757a089da5d6f182f4a7f30afb4d12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala @@ -17,26 +17,6 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.AnalysisException - -/** - * A container for bucketing information. - * Bucketing is a technology for decomposing data sets into more manageable parts, and the number - * of buckets is fixed so it does not fluctuate with data. - * - * @param numBuckets number of buckets. - * @param bucketColumnNames the names of the columns that used to generate the bucket id. - * @param sortColumnNames the names of the columns that used to sort data in each bucket. - */ -private[sql] case class BucketSpec( - numBuckets: Int, - bucketColumnNames: Seq[String], - sortColumnNames: Seq[String]) { - if (numBuckets <= 0) { - throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") - } -} - private[sql] object BucketingUtils { // The file name of bucketed data should have 3 parts: // 1. some other information in the head of file name diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f572b93991e0ca2bec5076ff326346fde2cb14d3..79024fda2f8ca20641902fb683e3e4daa9134fc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 04f166f8ff454255435757cbfead9661ac98fc62..32aa4713ebdbb75be2eebe663a020fa68fd1dc87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 1426dcf4697ff62e28572e74d1c26c831098f5c0..b49525c8ceda9aaa054ca970ee722282ff54c88f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.spark._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 9a0b46c1a4a5ee585385c06d87ca350576d850a9..c801436b0a6435ef1e045467f340b3885f0bbd6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -28,6 +28,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 31a2075d2ff99e6745bbdd7759a5c11094af197e..18369b51b93054bbaf701df5bbd66af56cc12f9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index d238da242f3f045ed71a62aea277cea38ae44dff..5ce8350de207fd18ac58cfc1aee465e407d15426 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ @@ -30,6 +29,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 1ae9b5524cec99b731008ad3fda55fb7c0ac1ced..05dfb8cb22b892c2ff546dcd8de755b8cfa069ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -153,7 +153,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier) val partitionColumnNames = tableMetadata.partitionColumnNames.toSet - val bucketColumnNames = tableMetadata.bucketColumnNames.toSet + val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet val columns = tableMetadata.schema.map { c => new Column( name = c.name, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b170a3a77ee048314d1198ecfe7de41d659c18a9..999afc9751fe1264b87592c22652bd5a13e9bd63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.command import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.catalog.FunctionResourceType import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing} +import org.apache.spark.sql.execution.datasources.CreateTableUsing import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} // TODO: merge this with DDLSuite (SPARK-14441) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 34c980e321bfc185028a0a1a1526e706baff585c..a354594a6d177a275be9248c8a42a218f31c20da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -26,13 +26,12 @@ import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 8d8a18fa9332b173f0703beddedc26050090140f..ddcc24a7f56b2c46be2a17ee92ed83c2379a0230 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util import org.apache.spark.sql.execution.DataSourceScanExec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 07aeaeb695a4b2c772e222cfd3e61d00d9409b00..8aa81854b22e8f2818bc8d42687e6675f39279d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -90,11 +90,12 @@ class CatalogSuite .getOrElse { spark.catalog.listColumns(tableName) } assume(tableMetadata.schema.nonEmpty, "bad test") assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test") - assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test") + assume(tableMetadata.bucketSpec.isDefined, "bad test") assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet) + val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet columns.collect().foreach { col => assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name)) - assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name)) + assert(col.isBucket == bucketColumnNames.contains(col.name)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 5ab585faa4acbdc059e88cf331fd00c8b1348cef..49153f77362b74f0400bc21bb23b1ae6a5e4f562 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -24,9 +24,9 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 9f5782f04525d825842131d969cda245ece1e2d9..2392cc0bdd8d0177fb2aaa35cd128fba37718058 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -365,9 +365,9 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), - sortColumnNames = Seq(), // TODO: populate this - bucketColumnNames = h.getBucketCols.asScala, - numBuckets = h.getNumBuckets, + // We can not populate bucketing information for Hive tables as Spark SQL has a different + // implementation of hash function from Hive. + bucketSpec = None, owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, @@ -764,10 +764,7 @@ private[hive] class HiveClientImpl( hiveTable.setFields(schema.asJava) } hiveTable.setPartCols(partCols.asJava) - // TODO: set sort columns here too - hiveTable.setBucketCols(table.bucketColumnNames.asJava) hiveTable.setOwner(conf.getUser) - hiveTable.setNumBuckets(table.numBuckets) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index a708434f5e13a6db56aef6636ad63b298565c949..5450fba7533e0d5bc30b16e559e828cb3fa4869a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -293,9 +293,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.tableType == CatalogTableType.MANAGED) assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string"))) assert(desc.partitionColumnNames.isEmpty) - assert(desc.sortColumnNames.isEmpty) - assert(desc.bucketColumnNames.isEmpty) - assert(desc.numBuckets == -1) + assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri.isEmpty) @@ -453,9 +451,7 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("name", "string"), CatalogColumn("month", "int"))) assert(desc.partitionColumnNames == Seq("month")) - assert(desc.sortColumnNames.isEmpty) - assert(desc.bucketColumnNames.isEmpty) - assert(desc.numBuckets == -1) + assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri == Some("/path/to/mercury")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index fc01ff3f5aa07b6936810f2d1bec0a25fbd6f26e..e461490310910ca1efc0c8d6294ec68e9e29b6e9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._