Skip to content
Snippets Groups Projects
Commit e13cd865 authored by Pei-Lun Lee's avatar Pei-Lun Lee Committed by Cheng Lian
Browse files

[SPARK-6352] [SQL] Custom parquet output committer

Add new config "spark.sql.parquet.output.committer.class" to allow custom parquet output committer and an output committer class specific to use on s3.
Fix compilation error introduced by https://github.com/apache/spark/pull/5042.
Respect ParquetOutputFormat.ENABLE_JOB_SUMMARY flag.

Author: Pei-Lun Lee <pllee@appier.com>

Closes #5525 from ypcat/spark-6352 and squashes the following commits:

54c6b15 [Pei-Lun Lee] error handling
472870e [Pei-Lun Lee] add back custom parquet output committer
ddd0f69 [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
9ece5c5 [Pei-Lun Lee] compatibility with hadoop 1.x
8413fcd [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
fe65915 [Pei-Lun Lee] add support for parquet config parquet.enable.summary-metadata
e17bf47 [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
9ae7545 [Pei-Lun Lee] [SPARL-6352] [SQL] Change to allow custom parquet output committer.
0d540b9 [Pei-Lun Lee] [SPARK-6352] [SQL] add license
c42468c [Pei-Lun Lee] [SPARK-6352] [SQL] add test case
0fc03ca [Pei-Lun Lee] [SPARK-6532] [SQL] hide class DirectParquetOutputCommitter
769bd67 [Pei-Lun Lee] DirectParquetOutputCommitter
f75e261 [Pei-Lun Lee] DirectParquetOutputCommitter
parent d94cd1a7
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import parquet.Log
import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
override def getWorkPath(): Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
override def setupJob(jobContext: JobContext): Unit = {}
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
override def commitJob(jobContext: JobContext) {
val configuration = ContextUtil.getConfiguration(jobContext)
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
try {
val outputStatus = fileSystem.getFileStatus(outputPath)
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
} catch {
case e: Exception => {
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
}
}
} catch {
case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
}
}
if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
try {
val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fileSystem.create(successPath).close()
} catch {
case e: Exception => LOG.warn("could not write success file for " + outputPath, e)
}
}
}
}
......@@ -381,6 +381,7 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
extends parquet.hadoop.ParquetOutputFormat[Row] {
// override to accept existing directories as valid output directory
override def checkOutputSpecs(job: JobContext): Unit = {}
var committer: OutputCommitter = null
// override to choose output filename so not overwrite existing ones
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
......@@ -403,6 +404,26 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
}
// override to create output committer from configuration
override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
if (committer == null) {
val output = getOutputPath(context)
val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
}
committer
}
// FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
private def getOutputPath(context: TaskAttemptContext): Path = {
context.getConfiguration().get("mapred.output.dir") match {
case null => null
case name => new Path(name)
}
}
}
/**
......
......@@ -381,6 +381,28 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
}
test("SPARK-6352 DirectParquetOutputCommitter") {
// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
configuration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(configuration)
assert(!fs.exists(path))
}
}
finally {
configuration.set("spark.sql.parquet.output.committer.class",
"parquet.hadoop.ParquetOutputCommitter")
}
}
}
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment