Skip to content
Snippets Groups Projects
Commit b4c32c49 authored by Yadong Qi's avatar Yadong Qi Committed by Wenchen Fan
Browse files

[SPARK-15549][SQL] Disable bucketing when the output doesn't contain all bucketing columns

## What changes were proposed in this pull request?
I create a bucketed table bucketed_table with bucket column i,
```scala
case class Data(i: Int, j: Int, k: Int)
sc.makeRDD(Array((1, 2, 3))).map(x => Data(x._1, x._2, x._3)).toDF.write.bucketBy(2, "i").saveAsTable("bucketed_table")
```

and I run the following SQLs:
```sql
SELECT j FROM bucketed_table;
Error in query: bucket column i not found in existing columns (j);

SELECT j, MAX(k) FROM bucketed_table GROUP BY j;
Error in query: bucket column i not found in existing columns (j, k);
```

I think we should add a check that, we only enable bucketing when it satisfies all conditions below:
1. the conf is enabled
2. the relation is bucketed
3. the output contains all bucketing columns

## How was this patch tested?
Updated test cases to reflect the changes.

Author: Yadong Qi <qiyadong2010@gmail.com>

Closes #13321 from watermen/SPARK-15549.
parent f1b220ee
No related branches found
No related tags found
No related merge requests found
......@@ -347,15 +347,14 @@ private[sql] object DataSourceScanExec {
case _ => None
}
def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
s"(${output.map(_.name).mkString(", ")})")
}
bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
HashPartitioning(bucketColumns, numBuckets)
val bucketColumns = spec.bucketColumnNames.flatMap { n => output.find(_.name == n) }
if (bucketColumns.size == spec.bucketColumnNames.size) {
HashPartitioning(bucketColumns, numBuckets)
} else {
UnknownPartitioning(0)
}
}.getOrElse {
UnknownPartitioning(0)
}
......
......@@ -362,4 +362,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(error.toString contains "Invalid bucket file")
}
}
test("disable bucketing when the output doesn't contain all bucketing columns") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j"))
checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
df1.groupBy("j").agg(max("k")))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment