From 1a45d2b2cc6466841fb73da21a61b61f14a5d5fb Mon Sep 17 00:00:00 2001
From: Xiao Li <gatorsmile@gmail.com>
Date: Tue, 21 Feb 2017 19:30:36 -0800
Subject: [PATCH] [SPARK-19670][SQL][TEST] Enable Bucketed Table Reading and
 Writing Testing Without Hive Support

### What changes were proposed in this pull request?
Bucketed table reading and writing does not need Hive support. We can move the test cases from `sql/hive` to `sql/core`. After this PR, we can improve the test case coverage. Bucket table reading and writing can be tested with and without Hive support.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17004 from gatorsmile/mvTestCaseForBuckets.
---
 .../spark/sql/sources/BucketedReadSuite.scala | 30 ++++++++++------
 .../sql/sources/BucketedWriteSuite.scala      | 34 +++++++++++++------
 .../BucketedReadWithHiveSupportSuite.scala    | 28 +++++++++++++++
 .../BucketedWriteWithHiveSupportSuite.scala   | 30 ++++++++++++++++
 4 files changed, 101 insertions(+), 21 deletions(-)
 rename sql/{hive => core}/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala (95%)
 rename sql/{hive => core}/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala (88%)
 create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
 create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala

diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
similarity index 95%
rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 4fc72b9e47..9b65419dba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,17 +29,25 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.BitSet
 
-class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+  }
+}
+
+
+abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
   import testImplicits._
 
-  private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
-  private val nullDF = (for {
+  private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+  private lazy val nullDF = (for {
     i <- 0 to 50
     s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
   } yield (i % 5, s, i % 13)).toDF("i", "j", "k")
@@ -224,8 +232,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     }
   }
 
-  private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
-  private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
+  private lazy val df1 =
+    (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
+  private lazy val df2 =
+    (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
 
   case class BucketedTableTestSpec(
       bucketSpec: Option[BucketSpec],
@@ -535,7 +545,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   test("error if there exists any malformed bucket files") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
-      val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath
+      val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
       val tableDir = new File(warehouseFilePath, "bucketed_table")
       Utils.deleteRecursively(tableDir)
       df1.write.parquet(tableDir.getAbsolutePath)
@@ -553,9 +563,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
 
-      checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j"))
+      checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j"))
 
-      checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
+      checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")),
         df1.groupBy("j").agg(max("k")))
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
similarity index 88%
rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 61cef2a800..9082261af7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -20,19 +20,29 @@ package org.apache.spark.sql.sources
 import java.io.File
 import java.net.URI
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 
-class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+  }
+
+  override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json")
+}
+
+abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
   import testImplicits._
 
+  protected def fileFormatsToTest: Seq[String]
+
   test("bucketed by non-existing column") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
     intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
@@ -76,11 +86,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     assert(e.getMessage == "'insertInto' does not support bucketing right now;")
   }
 
-  private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+  private lazy val df = {
+    (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+  }
 
   def tableDir: File = {
     val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
-    new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
+    new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier)))
   }
 
   /**
@@ -141,7 +153,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -157,7 +169,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data with sortBy") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -190,7 +202,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data without partitionBy") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -203,7 +215,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data without partitionBy with sortBy") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -219,7 +231,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   test("write bucketed data with bucketing disabled") {
     // The configuration BUCKETING_ENABLED does not affect the writing path
     withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
-      for (source <- Seq("parquet", "json", "orc")) {
+      for (source <- fileFormatsToTest) {
         withTable("bucketed_table") {
           df.write
             .format(source)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
new file mode 100644
index 0000000000..f277f99805
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
new file mode 100644
index 0000000000..454e2f65d5
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+  }
+
+  override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc")
+}
-- 
GitLab