From ae33abf71b353c638487948b775e966c7127cd46 Mon Sep 17 00:00:00 2001
From: zero323 <zero323@users.noreply.github.com>
Date: Fri, 26 May 2017 15:01:01 -0700
Subject: [PATCH] [SPARK-20694][DOCS][SQL] Document DataFrameWriter
 partitionBy, bucketBy and sortBy in SQL guide

## What changes were proposed in this pull request?

- Add Scala, Python and Java examples for `partitionBy`, `sortBy` and `bucketBy`.
- Add _Bucketing, Sorting and Partitioning_ section to SQL Programming Guide
- Remove bucketing from Unsupported Hive Functionalities.

## How was this patch tested?

Manual tests, docs build.

Author: zero323 <zero323@users.noreply.github.com>

Closes #17938 from zero323/DOCS-BUCKETING-AND-PARTITIONING.
---
 docs/sql-programming-guide.md                 | 108 ++++++++++++++++++
 .../sql/JavaSQLDataSourceExample.java         |  16 +++
 examples/src/main/python/sql/datasource.py    |  20 ++++
 .../examples/sql/SQLDataSourceExample.scala   |  16 +++
 4 files changed, 160 insertions(+)

diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index adb12d2489..314ff6ef80 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -581,6 +581,114 @@ Starting from Spark 2.1, persistent datasource tables have per-partition metadat
 
 Note that partition information is not gathered by default when creating external datasource tables (those with a `path` option). To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`.
 
+### Bucketing, Sorting and Partitioning
+
+For file-based data source, it is also possible to bucket and sort or partition the output. 
+Bucketing and sorting are applicable only to persistent tables:
+
+<div class="codetabs">
+
+<div data-lang="scala"  markdown="1">
+{% include_example write_sorting_and_bucketing scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+</div>
+
+<div data-lang="java"  markdown="1">
+{% include_example write_sorting_and_bucketing java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% include_example write_sorting_and_bucketing python/sql/datasource.py %}
+</div>
+
+<div data-lang="sql"  markdown="1">
+
+{% highlight sql %}
+
+CREATE TABLE users_bucketed_by_name(
+  name STRING,
+  favorite_color STRING,
+  favorite_numbers array<integer>
+) USING parquet 
+CLUSTERED BY(name) INTO 42 BUCKETS;
+
+{% endhighlight %}
+
+</div>
+
+</div>
+
+while partitioning can be used with both `save` and `saveAsTable` when using the Dataset APIs.
+
+
+<div class="codetabs">
+
+<div data-lang="scala"  markdown="1">
+{% include_example write_partitioning scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+</div>
+
+<div data-lang="java"  markdown="1">
+{% include_example write_partitioning java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% include_example write_partitioning python/sql/datasource.py %}
+</div>
+
+<div data-lang="sql"  markdown="1">
+
+{% highlight sql %}
+
+CREATE TABLE users_by_favorite_color(
+  name STRING, 
+  favorite_color STRING,
+  favorite_numbers array<integer>
+) USING csv PARTITIONED BY(favorite_color);
+
+{% endhighlight %}
+
+</div>
+
+</div>
+
+It is possible to use both partitioning and bucketing for a single table:
+
+<div class="codetabs">
+
+<div data-lang="scala"  markdown="1">
+{% include_example write_partition_and_bucket scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+</div>
+
+<div data-lang="java"  markdown="1">
+{% include_example write_partition_and_bucket java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% include_example write_partition_and_bucket python/sql/datasource.py %}
+</div>
+
+<div data-lang="sql"  markdown="1">
+
+{% highlight sql %}
+
+CREATE TABLE users_bucketed_and_partitioned(
+  name STRING,
+  favorite_color STRING,
+  favorite_numbers array<integer>
+) USING parquet 
+PARTITIONED BY (favorite_color)
+CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;
+
+{% endhighlight %}
+
+</div>
+
+</div>
+
+`partitionBy` creates a directory structure as described in the [Partition Discovery](#partition-discovery) section.
+Thus, it has limited applicability to columns with high cardinality. In contrast 
+ `bucketBy` distributes
+data across a fixed number of buckets and can be used when a number of unique values is unbounded.
+
 ## Parquet Files
 
 [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index b66abaed66..706856b521 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -120,6 +120,22 @@ public class JavaSQLDataSourceExample {
     Dataset<Row> sqlDF =
       spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
     // $example off:direct_sql$
+    // $example on:write_sorting_and_bucketing$
+    peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
+    // $example off:write_sorting_and_bucketing$
+    // $example on:write_partitioning$
+    usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet");
+    // $example off:write_partitioning$
+    // $example on:write_partition_and_bucket$
+    peopleDF
+      .write()
+      .partitionBy("favorite_color")
+      .bucketBy(42, "name")
+      .saveAsTable("people_partitioned_bucketed");
+    // $example off:write_partition_and_bucket$
+
+    spark.sql("DROP TABLE IF EXISTS people_bucketed");
+    spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed");
   }
 
   private static void runBasicParquetExample(SparkSession spark) {
diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py
index e4abb09333..8777cca66b 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -35,15 +35,35 @@ def basic_datasource_example(spark):
     df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
     # $example off:generic_load_save_functions$
 
+    # $example on:write_partitioning$
+    df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
+    # $example off:write_partitioning$
+
+    # $example on:write_partition_and_bucket$
+    df = spark.read.parquet("examples/src/main/resources/users.parquet")
+    (df
+        .write
+        .partitionBy("favorite_color")
+        .bucketBy(42, "name")
+        .saveAsTable("people_partitioned_bucketed"))
+    # $example off:write_partition_and_bucket$
+
     # $example on:manual_load_options$
     df = spark.read.load("examples/src/main/resources/people.json", format="json")
     df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
     # $example off:manual_load_options$
 
+    # $example on:write_sorting_and_bucketing$
+    df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
+    # $example off:write_sorting_and_bucketing$
+
     # $example on:direct_sql$
     df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
     # $example off:direct_sql$
 
+    spark.sql("DROP TABLE IF EXISTS people_bucketed")
+    spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
+
 
 def parquet_example(spark):
     # $example on:basic_parquet_example$
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index ad74da72bd..6ff03bdb22 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -52,6 +52,22 @@ object SQLDataSourceExample {
     // $example on:direct_sql$
     val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
     // $example off:direct_sql$
+    // $example on:write_sorting_and_bucketing$
+    peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
+    // $example off:write_sorting_and_bucketing$
+    // $example on:write_partitioning$
+    usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
+    // $example off:write_partitioning$
+    // $example on:write_partition_and_bucket$
+    peopleDF
+      .write
+      .partitionBy("favorite_color")
+      .bucketBy(42, "name")
+      .saveAsTable("people_partitioned_bucketed")
+    // $example off:write_partition_and_bucket$
+
+    spark.sql("DROP TABLE IF EXISTS people_bucketed")
+    spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
   }
 
   private def runBasicParquetExample(spark: SparkSession): Unit = {
-- 
GitLab