From d2416925c439b308b3e903171bd28d8cfc2bf9cf Mon Sep 17 00:00:00 2001
From: Tejas Patil <tejasp@fb.com>
Date: Tue, 16 May 2017 01:47:23 +0800
Subject: [PATCH] [SPARK-17729][SQL] Enable creating hive bucketed tables

## What changes were proposed in this pull request?

Hive allows inserting data to bucketed table without guaranteeing bucketed and sorted-ness based on these two configs : `hive.enforce.bucketing` and `hive.enforce.sorting`.

What does this PR achieve ?
- Spark will disallow users from writing outputs to hive bucketed tables by default (given that output won't adhere with Hive's semantics).
- IF user still wants to write to hive bucketed table, the only resort is to use `hive.enforce.bucketing=false` and `hive.enforce.sorting=false` which means user does NOT care about bucketing guarantees.

Changes done in this PR:
- Extract table's bucketing information in `HiveClientImpl`
- While writing table info to metastore, `HiveClientImpl` now populates the bucketing information in the hive `Table` object
- `InsertIntoHiveTable` allows inserts to bucketed table only if both `hive.enforce.bucketing` and `hive.enforce.sorting` are `false`

Ability to create bucketed tables will enable adding test cases to Spark while I add more changes related to hive bucketing support. Design doc for hive hive bucketing support : https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit#

## How was this patch tested?
- Added test for creating bucketed and sorted table.
- Added test to ensure that INSERTs fail if strict bucket / sort is enforced
- Added test to ensure that INSERTs can go through if strict bucket / sort is NOT enforced
- Added test to validate that bucketing information shows up in output of DESC FORMATTED
- Added test to ensure that `SHOW CREATE TABLE` works for hive bucketed tables

Author: Tejas Patil <tejasp@fb.com>

Closes #17644 from tejasapatil/SPARK-17729_create_bucketed_table.
---
 .../catalog/ExternalCatalogSuite.scala        |  2 +-
 .../spark/sql/execution/SparkSqlParser.scala  |  6 +-
 .../spark/sql/execution/command/tables.scala  |  9 ++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  2 -
 .../sql/hive/client/HiveClientImpl.scala      | 56 +++++++++++++++----
 .../hive/execution/InsertIntoHiveTable.scala  | 21 +++++++
 .../spark/sql/hive/HiveDDLCommandSuite.scala  | 33 ++++++++---
 .../sql/hive/InsertIntoHiveTableSuite.scala   | 47 ++++++++++++++++
 .../spark/sql/hive/ShowCreateTableSuite.scala | 11 +---
 .../sql/hive/execution/HiveDDLSuite.scala     | 20 +++++++
 10 files changed, 174 insertions(+), 33 deletions(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 42db4398e5..1759ac04c0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -247,7 +247,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     val tbl1 = catalog.getTable("db2", "tbl1")
     val newSchema = StructType(Seq(
-      StructField("new_field_1", IntegerType),
+      StructField("col1", IntegerType),
       StructField("new_field_2", StringType),
       StructField("a", IntegerType),
       StructField("b", StringType)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c2c5289486..3c58c6e1b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1072,13 +1072,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
     if (ctx.skewSpec != null) {
       operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
     }
-    if (ctx.bucketSpec != null) {
-      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
-    }
+
     val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
     val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
     val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val selectQuery = Option(ctx.query).map(plan)
+    val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
     // Note: Hive requires partition columns to be distinct from the schema, so we need
     // to include the partition columns here explicitly
@@ -1119,6 +1118,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
       tableType = tableType,
       storage = storage,
       schema = schema,
+      bucketSpec = bucketSpec,
       provider = Some(DDLUtils.HIVE_PROVIDER),
       partitionColumnNames = partitionCols.map(_.name),
       properties = properties,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index ebf03e1bf8..9ccd6792e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -903,8 +903,13 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
     }
 
     if (metadata.bucketSpec.isDefined) {
-      throw new UnsupportedOperationException(
-        "Creating Hive table with bucket spec is not supported yet.")
+      val bucketSpec = metadata.bucketSpec.get
+      builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n"
+
+      if (bucketSpec.sortColumnNames.nonEmpty) {
+        builder ++= s"SORTED BY (${bucketSpec.sortColumnNames.map(_ + " ASC").mkString(", ")})\n"
+      }
+      builder ++= s"INTO ${bucketSpec.numBuckets} BUCKETS\n"
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6b98066cb7..9dd8279efc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -171,7 +171,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             location = fileIndex,
             partitionSchema = partitionSchema,
             dataSchema = dataSchema,
-            // We don't support hive bucketed tables, only ones we write out.
             bucketSpec = None,
             fileFormat = fileFormat,
             options = options)(sparkSession = sparkSession)
@@ -199,7 +198,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 sparkSession = sparkSession,
                 paths = rootPath.toString :: Nil,
                 userSpecifiedSchema = Option(dataSchema),
-                // We don't support hive bucketed tables, only ones we write out.
                 bucketSpec = None,
                 options = options,
                 className = fileType).resolveRelation(),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 74e15a5777..04f2751e79 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
 import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.security.UserGroupInformation
@@ -373,10 +374,30 @@ private[hive] class HiveClientImpl(
     Option(client.getTable(dbName, tableName, false)).map { h =>
       // Note: Hive separates partition columns and the schema, but for us the
       // partition columns are part of the schema
+      val cols = h.getCols.asScala.map(fromHiveColumn)
       val partCols = h.getPartCols.asScala.map(fromHiveColumn)
-      val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols)
+      val schema = StructType(cols ++ partCols)
+
+      val bucketSpec = if (h.getNumBuckets > 0) {
+        val sortColumnOrders = h.getSortCols.asScala
+        // Currently Spark only supports columns to be sorted in ascending order
+        // but Hive can support both ascending and descending order. If all the columns
+        // are sorted in ascending order, only then propagate the sortedness information
+        // to downstream processing / optimizations in Spark
+        // TODO: In future we can have Spark support columns sorted in descending order
+        val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC)
+
+        val sortColumnNames = if (allAscendingSorted) {
+          sortColumnOrders.map(_.getCol)
+        } else {
+          Seq()
+        }
+        Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
+      } else {
+        None
+      }
 
-      // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
+      // Skew spec and storage handler can't be mapped to CatalogTable (yet)
       val unsupportedFeatures = ArrayBuffer.empty[String]
 
       if (!h.getSkewedColNames.isEmpty) {
@@ -387,10 +408,6 @@ private[hive] class HiveClientImpl(
         unsupportedFeatures += "storage handler"
       }
 
-      if (!h.getBucketCols.isEmpty) {
-        unsupportedFeatures += "bucketing"
-      }
-
       if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) {
         unsupportedFeatures += "partitioned view"
       }
@@ -408,9 +425,11 @@ private[hive] class HiveClientImpl(
         },
         schema = schema,
         partitionColumnNames = partCols.map(_.name),
-        // We can not populate bucketing information for Hive tables as Spark SQL has a different
-        // implementation of hash function from Hive.
-        bucketSpec = None,
+        // If the table is written by Spark, we will put bucketing information in table properties,
+        // and will always overwrite the bucket spec in hive metastore by the bucketing information
+        // in table properties. This means, if we have bucket spec in both hive metastore and
+        // table properties, we will trust the one in table properties.
+        bucketSpec = bucketSpec,
         owner = h.getOwner,
         createTime = h.getTTable.getCreateTime.toLong * 1000,
         lastAccessTime = h.getLastAccessTime.toLong * 1000,
@@ -870,6 +889,23 @@ private[hive] object HiveClientImpl {
       hiveTable.setViewOriginalText(t)
       hiveTable.setViewExpandedText(t)
     }
+
+    table.bucketSpec match {
+      case Some(bucketSpec) if DDLUtils.isHiveTable(table) =>
+        hiveTable.setNumBuckets(bucketSpec.numBuckets)
+        hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)
+
+        if (bucketSpec.sortColumnNames.nonEmpty) {
+          hiveTable.setSortCols(
+            bucketSpec.sortColumnNames
+              .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
+              .toList
+              .asJava
+          )
+        }
+      case _ =>
+    }
+
     hiveTable
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3facf9f67b..10e17c5f73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -307,6 +307,27 @@ case class InsertIntoHiveTable(
       }
     }
 
+    table.bucketSpec match {
+      case Some(bucketSpec) =>
+        // Writes to bucketed hive tables are allowed only if user does not care about maintaining
+        // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
+        // set to false
+        val enforceBucketingConfig = "hive.enforce.bucketing"
+        val enforceSortingConfig = "hive.enforce.sorting"
+
+        val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
+          "currently does NOT populate bucketed output which is compatible with Hive."
+
+        if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
+          hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
+          throw new AnalysisException(message)
+        } else {
+          logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
+            s"$enforceSortingConfig are set to false.")
+        }
+      case _ => // do nothing since table has no bucketing
+    }
+
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 59cc6605a1..7584f1741c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -367,13 +367,32 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
   }
 
   test("create table - clustered by") {
-    val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)"
-    val query1 = s"$baseQuery INTO 10 BUCKETS"
-    val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS"
-    val e1 = intercept[ParseException] { parser.parsePlan(query1) }
-    val e2 = intercept[ParseException] { parser.parsePlan(query2) }
-    assert(e1.getMessage.contains("Operation not allowed"))
-    assert(e2.getMessage.contains("Operation not allowed"))
+    val numBuckets = 10
+    val bucketedColumn = "id"
+    val sortColumn = "id"
+    val baseQuery =
+      s"""
+         CREATE TABLE my_table (
+           $bucketedColumn int,
+           name string)
+         CLUSTERED BY($bucketedColumn)
+       """
+
+    val query1 = s"$baseQuery INTO $numBuckets BUCKETS"
+    val (desc1, _) = extractTableDesc(query1)
+    assert(desc1.bucketSpec.isDefined)
+    val bucketSpec1 = desc1.bucketSpec.get
+    assert(bucketSpec1.numBuckets == numBuckets)
+    assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn))
+    assert(bucketSpec1.sortColumnNames.isEmpty)
+
+    val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS"
+    val (desc2, _) = extractTableDesc(query2)
+    assert(desc2.bucketSpec.isDefined)
+    val bucketSpec2 = desc2.bucketSpec.get
+    assert(bucketSpec2.numBuckets == numBuckets)
+    assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn))
+    assert(bucketSpec2.sortColumnNames.head.equals(sortColumn))
   }
 
   test("create table - skewed by") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 2c724f8388..7bd3973550 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -495,6 +495,53 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
       }
   }
 
+  private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Hive SerDe table - $testName") {
+      val hiveTable = "hive_table"
+
+      withTable(hiveTable) {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          sql(
+            s"""
+               |CREATE TABLE $hiveTable (a INT, d INT)
+               |PARTITIONED BY (b INT, c INT)
+               |CLUSTERED BY(a)
+               |SORTED BY(a, d) INTO 256 BUCKETS
+               |STORED AS TEXTFILE
+            """.stripMargin)
+          f(hiveTable)
+        }
+      }
+    }
+  }
+
+  testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
+    tableName =>
+      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
+        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+      }
+  }
+
+  testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
+    tableName =>
+      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+  }
+
   test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
     // Set hive.exec.stagingdir under the table directory without start with ".".
     withSQLConf("hive.exec.stagingdir" -> "./test") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index 4bfab0f9cf..081153df8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -247,21 +247,16 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
     }
   }
 
-  test("hive bucketing is not supported") {
+  test("hive bucketing is supported") {
     withTable("t1") {
-      createRawHiveTable(
+      sql(
         s"""CREATE TABLE t1 (a INT, b STRING)
            |CLUSTERED BY (a)
            |SORTED BY (b)
            |INTO 2 BUCKETS
          """.stripMargin
       )
-
-      val cause = intercept[AnalysisException] {
-        sql("SHOW CREATE TABLE t1")
-      }
-
-      assert(cause.getMessage.contains(" - bucketing"))
+      checkCreateTable("t1")
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c3d734e5a0..13f5c5dd8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -777,6 +777,26 @@ class HiveDDLSuite
     }
   }
 
+  test("desc table for Hive table - bucketed + sorted table") {
+    withTable("tbl") {
+      sql(s"""
+        CREATE TABLE tbl (id int, name string)
+        PARTITIONED BY (ds string)
+        CLUSTERED BY(id)
+        SORTED BY(id, name) INTO 1024 BUCKETS
+        """)
+
+      val x = sql("DESC FORMATTED tbl").collect()
+      assert(x.containsSlice(
+        Seq(
+          Row("Num Buckets", "1024", ""),
+          Row("Bucket Columns", "[`id`]", ""),
+          Row("Sort Columns", "[`id`, `name`]", "")
+        )
+      ))
+    }
+  }
+
   test("desc table for data source table using Hive Metastore") {
     assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
     val tabName = "tab1"
-- 
GitLab