diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index adb12d2489a57bc955ad06bf78815a43dc713a1f..314ff6ef80d29637e39aa32a669b05cc8c70021c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -581,6 +581,114 @@ Starting from Spark 2.1, persistent datasource tables have per-partition metadat Note that partition information is not gathered by default when creating external datasource tables (those with a `path` option). To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`. +### Bucketing, Sorting and Partitioning + +For file-based data source, it is also possible to bucket and sort or partition the output. +Bucketing and sorting are applicable only to persistent tables: + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example write_sorting_and_bucketing scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example write_sorting_and_bucketing java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example write_sorting_and_bucketing python/sql/datasource.py %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TABLE users_bucketed_by_name( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING parquet +CLUSTERED BY(name) INTO 42 BUCKETS; + +{% endhighlight %} + +</div> + +</div> + +while partitioning can be used with both `save` and `saveAsTable` when using the Dataset APIs. + + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example write_partitioning scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example write_partitioning java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example write_partitioning python/sql/datasource.py %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TABLE users_by_favorite_color( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING csv PARTITIONED BY(favorite_color); + +{% endhighlight %} + +</div> + +</div> + +It is possible to use both partitioning and bucketing for a single table: + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example write_partition_and_bucket scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example write_partition_and_bucket java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example write_partition_and_bucket python/sql/datasource.py %} +</div> + +<div data-lang="sql" markdown="1"> + +{% highlight sql %} + +CREATE TABLE users_bucketed_and_partitioned( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING parquet +PARTITIONED BY (favorite_color) +CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS; + +{% endhighlight %} + +</div> + +</div> + +`partitionBy` creates a directory structure as described in the [Partition Discovery](#partition-discovery) section. +Thus, it has limited applicability to columns with high cardinality. In contrast + `bucketBy` distributes +data across a fixed number of buckets and can be used when a number of unique values is unbounded. + ## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index b66abaed66000d542785999c12f40fcf5331bdec..706856b5215e44e6f72d92859d5aa9d189e4c207 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -120,6 +120,22 @@ public class JavaSQLDataSourceExample { Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); // $example off:direct_sql$ + // $example on:write_sorting_and_bucketing$ + peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed"); + // $example off:write_sorting_and_bucketing$ + // $example on:write_partitioning$ + usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet"); + // $example off:write_partitioning$ + // $example on:write_partition_and_bucket$ + peopleDF + .write() + .partitionBy("favorite_color") + .bucketBy(42, "name") + .saveAsTable("people_partitioned_bucketed"); + // $example off:write_partition_and_bucket$ + + spark.sql("DROP TABLE IF EXISTS people_bucketed"); + spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed"); } private static void runBasicParquetExample(SparkSession spark) { diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index e4abb0933345d94ea5d567856adac99b7f1998aa..8777cca66bfe9064b1cc24d0e92e42dfff6dc57e 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -35,15 +35,35 @@ def basic_datasource_example(spark): df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") # $example off:generic_load_save_functions$ + # $example on:write_partitioning$ + df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") + # $example off:write_partitioning$ + + # $example on:write_partition_and_bucket$ + df = spark.read.parquet("examples/src/main/resources/users.parquet") + (df + .write + .partitionBy("favorite_color") + .bucketBy(42, "name") + .saveAsTable("people_partitioned_bucketed")) + # $example off:write_partition_and_bucket$ + # $example on:manual_load_options$ df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") # $example off:manual_load_options$ + # $example on:write_sorting_and_bucketing$ + df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") + # $example off:write_sorting_and_bucketing$ + # $example on:direct_sql$ df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") # $example off:direct_sql$ + spark.sql("DROP TABLE IF EXISTS people_bucketed") + spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed") + def parquet_example(spark): # $example on:basic_parquet_example$ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index ad74da72bd5e6b7b0934579486590ca034177442..6ff03bdb221291c67583d16ea0be114d7351e74f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -52,6 +52,22 @@ object SQLDataSourceExample { // $example on:direct_sql$ val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") // $example off:direct_sql$ + // $example on:write_sorting_and_bucketing$ + peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") + // $example off:write_sorting_and_bucketing$ + // $example on:write_partitioning$ + usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") + // $example off:write_partitioning$ + // $example on:write_partition_and_bucket$ + peopleDF + .write + .partitionBy("favorite_color") + .bucketBy(42, "name") + .saveAsTable("people_partitioned_bucketed") + // $example off:write_partition_and_bucket$ + + spark.sql("DROP TABLE IF EXISTS people_bucketed") + spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed") } private def runBasicParquetExample(spark: SparkSession): Unit = {