Skip to content
Snippets Groups Projects
Commit afa757c9 authored by Reynold Xin's avatar Reynold Xin Committed by Yin Huai
Browse files

[SPARK-9849] [SQL] DirectParquetOutputCommitter qualified name should be backward compatible

DirectParquetOutputCommitter was moved in SPARK-9763. However, users can explicitly set the class as a config option, so we must be able to resolve the old committer qualified name.

Author: Reynold Xin <rxin@databricks.com>

Closes #8114 from rxin/SPARK-9849.
parent 5a5bbc29
No related branches found
No related tags found
No related merge requests found
......@@ -209,6 +209,13 @@ private[sql] class ParquetRelation(
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[DirectParquetOutputCommitter].getCanonicalName)
}
val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
......
......@@ -390,7 +390,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
}
}
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
val clonedConf = new Configuration(configuration)
// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
configuration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(configuration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(configuration)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment