Skip to content
Snippets Groups Projects
Commit f34aadc5 authored by Wenchen Fan's avatar Wenchen Fan Committed by Andrew Or
Browse files

[SPARK-15718][SQL] better error message for writing bucketed data

## What changes were proposed in this pull request?

Currently we don't support bucketing for `save` and `insertInto`.

For `save`, we just write the data out into a directory users specified, and it's not a table, we don't keep its metadata. When we read it back, we have no idea if the data is bucketed or not, so it doesn't make sense to use `save` to write bucketed data, as we can't use the bucket information anyway.

We can support it in the future, once we have features like bucket discovery, or we save bucket information in the data directory too, so that we don't need to rely on a metastore.

For `insertInto`, it inserts data into an existing table, so it doesn't make sense to specify bucket information, as we should get the bucket information from the existing table.

This PR improves the error message for the above 2  cases.
## How was this patch tested?

new test in `BukctedWriteSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13452 from cloud-fan/error-msg.
parent 229f9022
No related branches found
No related tags found
No related merge requests found
......@@ -281,7 +281,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 1.4.0
*/
def save(): Unit = {
assertNotBucketed()
assertNotBucketed("save")
assertNotStreaming("save() can only be called on non-continuous queries")
val dataSource = DataSource(
df.sparkSession,
......@@ -330,7 +330,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
@Experimental
def startStream(): ContinuousQuery = {
assertNotBucketed()
assertNotBucketed("startStream")
assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {
......@@ -430,7 +430,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
private def insertInto(tableIdent: TableIdentifier): Unit = {
assertNotBucketed()
assertNotBucketed("insertInto")
assertNotStreaming("insertInto() can only be called on non-continuous queries")
val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
val overwrite = mode == SaveMode.Overwrite
......@@ -500,10 +500,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
s"existing columns (${validColumnNames.mkString(", ")})"))
}
private def assertNotBucketed(): Unit = {
private def assertNotBucketed(operation: String): Unit = {
if (numBuckets.isDefined || sortColumnNames.isDefined) {
throw new IllegalArgumentException(
"Currently we don't support writing bucketed data to this data source.")
s"'$operation' does not support bucketing right now.")
}
}
......
......@@ -456,7 +456,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
.stream()
val w = df.write
val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream())
assert(e.getMessage == "Currently we don't support writing bucketed data to this data source.")
assert(e.getMessage == "'startStream' does not support bucketing right now.")
}
test("check sortBy() can only be called on non-continuous queries;") {
......@@ -465,7 +465,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
.stream()
val w = df.write
val e = intercept[IllegalArgumentException](w.sortBy("text").startStream())
assert(e.getMessage == "Currently we don't support writing bucketed data to this data source.")
assert(e.getMessage == "'startStream' does not support bucketing right now.")
}
test("check save(path) can only be called on non-continuous queries") {
......
......@@ -59,11 +59,22 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
}
test("write bucketed data to non-hive-table or existing hive table") {
test("write bucketed data using save()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
intercept[IllegalArgumentException](df.write.bucketBy(2, "i").parquet("/tmp/path"))
intercept[IllegalArgumentException](df.write.bucketBy(2, "i").json("/tmp/path"))
intercept[IllegalArgumentException](df.write.bucketBy(2, "i").insertInto("tt"))
val e = intercept[IllegalArgumentException] {
df.write.bucketBy(2, "i").parquet("/tmp/path")
}
assert(e.getMessage == "'save' does not support bucketing right now.")
}
test("write bucketed data using insertInto()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
val e = intercept[IllegalArgumentException] {
df.write.bucketBy(2, "i").insertInto("tt")
}
assert(e.getMessage == "'insertInto' does not support bucketing right now.")
}
private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment