From f041e55eefe1d8a995fed321c66bccbd8b8e5255 Mon Sep 17 00:00:00 2001
From: Tejas Patil <tejasp@fb.com>
Date: Wed, 15 Feb 2017 22:45:58 -0800
Subject: [PATCH] [SPARK-19618][SQL] Inconsistency wrt max. buckets allowed
 from Dataframe API vs SQL

## What changes were proposed in this pull request?

Jira: https://issues.apache.org/jira/browse/SPARK-19618

Moved the check for validating number of buckets from `DataFrameWriter` to `BucketSpec` creation

## How was this patch tested?

- Added more unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes #16948 from tejasapatil/SPARK-19618_max_buckets.
---
 .../sql/catalyst/catalog/interface.scala      |  5 ++--
 .../apache/spark/sql/DataFrameWriter.scala    |  1 -
 .../sources/CreateTableAsSelectSuite.scala    | 28 ++++++++++---------
 .../sql/sources/BucketedWriteSuite.scala      | 10 +++++--
 4 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 353e5954fd..2b3b575b4c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -135,8 +135,9 @@ case class BucketSpec(
     numBuckets: Int,
     bucketColumnNames: Seq[String],
     sortColumnNames: Seq[String]) {
-  if (numBuckets <= 0) {
-    throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
+  if (numBuckets <= 0 || numBuckets >= 100000) {
+    throw new AnalysisException(
+      s"Number of buckets should be greater than 0 but less than 100000. Got `$numBuckets`")
   }
 
   override def toString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1d834b1821..cdae8ea458 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -275,7 +275,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     }
 
     numBuckets.map { n =>
-      require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.")
       BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil))
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 99da1969fc..4a42f8ea79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -206,7 +206,7 @@ class CreateTableAsSelectSuite
     }
   }
 
-  test("create table using as select - with non-zero buckets") {
+  test("create table using as select - with valid number of buckets") {
     val catalog = spark.sessionState.catalog
     withTable("t") {
       sql(
@@ -222,19 +222,21 @@ class CreateTableAsSelectSuite
     }
   }
 
-  test("create table using as select - with zero buckets") {
+  test("create table using as select - with invalid number of buckets") {
     withTable("t") {
-      val e = intercept[AnalysisException] {
-        sql(
-          s"""
-             |CREATE TABLE t USING PARQUET
-             |OPTIONS (PATH '${path.toURI}')
-             |CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS
-             |AS SELECT 1 AS a, 2 AS b
-           """.stripMargin
-        )
-      }.getMessage
-      assert(e.contains("Expected positive number of buckets, but got `0`"))
+      Seq(0, 100000).foreach(numBuckets => {
+        val e = intercept[AnalysisException] {
+          sql(
+            s"""
+               |CREATE TABLE t USING PARQUET
+               |OPTIONS (PATH '${path.toURI}')
+               |CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS
+               |AS SELECT 1 AS a, 2 AS b
+             """.stripMargin
+          )
+        }.getMessage
+        assert(e.contains("Number of buckets should be greater than 0 but less than 100000"))
+      })
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 8528dfc4ce..61cef2a800 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -38,10 +38,14 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
   }
 
-  test("numBuckets not greater than 0 or less than 100000") {
+  test("numBuckets be greater than 0 but less than 100000") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
-    intercept[IllegalArgumentException](df.write.bucketBy(0, "i").saveAsTable("tt"))
-    intercept[IllegalArgumentException](df.write.bucketBy(100000, "i").saveAsTable("tt"))
+
+    Seq(-1, 0, 100000).foreach(numBuckets => {
+      val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt"))
+      assert(
+        e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000"))
+    })
   }
 
   test("specify sorting columns without bucketing columns") {
-- 
GitLab