Skip to content
Snippets Groups Projects
Commit ae33abf7 authored by zero323's avatar zero323 Committed by Xiao Li
Browse files

[SPARK-20694][DOCS][SQL] Document DataFrameWriter partitionBy, bucketBy and sortBy in SQL guide

## What changes were proposed in this pull request?

- Add Scala, Python and Java examples for `partitionBy`, `sortBy` and `bucketBy`.
- Add _Bucketing, Sorting and Partitioning_ section to SQL Programming Guide
- Remove bucketing from Unsupported Hive Functionalities.

## How was this patch tested?

Manual tests, docs build.

Author: zero323 <zero323@users.noreply.github.com>

Closes #17938 from zero323/DOCS-BUCKETING-AND-PARTITIONING.
parent 473d7552
No related branches found
No related tags found
No related merge requests found
......@@ -581,6 +581,114 @@ Starting from Spark 2.1, persistent datasource tables have per-partition metadat
Note that partition information is not gathered by default when creating external datasource tables (those with a `path` option). To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`.
### Bucketing, Sorting and Partitioning
For file-based data source, it is also possible to bucket and sort or partition the output.
Bucketing and sorting are applicable only to persistent tables:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example write_sorting_and_bucketing scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example write_sorting_and_bucketing java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example write_sorting_and_bucketing python/sql/datasource.py %}
</div>
<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TABLE users_bucketed_by_name(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;
{% endhighlight %}
</div>
</div>
while partitioning can be used with both `save` and `saveAsTable` when using the Dataset APIs.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example write_partitioning scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example write_partitioning java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example write_partitioning python/sql/datasource.py %}
</div>
<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TABLE users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);
{% endhighlight %}
</div>
</div>
It is possible to use both partitioning and bucketing for a single table:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example write_partition_and_bucket scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example write_partition_and_bucket java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example write_partition_and_bucket python/sql/datasource.py %}
</div>
<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TABLE users_bucketed_and_partitioned(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;
{% endhighlight %}
</div>
</div>
`partitionBy` creates a directory structure as described in the [Partition Discovery](#partition-discovery) section.
Thus, it has limited applicability to columns with high cardinality. In contrast
`bucketBy` distributes
data across a fixed number of buckets and can be used when a number of unique values is unbounded.
## Parquet Files
[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
......
......@@ -120,6 +120,22 @@ public class JavaSQLDataSourceExample {
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
// $example off:direct_sql$
// $example on:write_sorting_and_bucketing$
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
// $example off:write_sorting_and_bucketing$
// $example on:write_partitioning$
usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet");
// $example off:write_partitioning$
// $example on:write_partition_and_bucket$
peopleDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed");
// $example off:write_partition_and_bucket$
spark.sql("DROP TABLE IF EXISTS people_bucketed");
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed");
}
private static void runBasicParquetExample(SparkSession spark) {
......
......@@ -35,15 +35,35 @@ def basic_datasource_example(spark):
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
# $example off:generic_load_save_functions$
# $example on:write_partitioning$
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
# $example off:write_partitioning$
# $example on:write_partition_and_bucket$
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed"))
# $example off:write_partition_and_bucket$
# $example on:manual_load_options$
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
# $example off:manual_load_options$
# $example on:write_sorting_and_bucketing$
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
# $example off:write_sorting_and_bucketing$
# $example on:direct_sql$
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_sql$
spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
def parquet_example(spark):
# $example on:basic_parquet_example$
......
......@@ -52,6 +52,22 @@ object SQLDataSourceExample {
// $example on:direct_sql$
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
// $example off:direct_sql$
// $example on:write_sorting_and_bucketing$
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
// $example off:write_sorting_and_bucketing$
// $example on:write_partitioning$
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
// $example off:write_partitioning$
// $example on:write_partition_and_bucket$
peopleDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed")
// $example off:write_partition_and_bucket$
spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
}
private def runBasicParquetExample(spark: SparkSession): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment